热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Flink实现Mysql的数据同步MysqlSinkMysql

FlinkSinkMysq

Flink Sink Mysql 简单实现

  • 环境
  • 需求
  • 解析
  • 完整代码
  • 源码下载
  • 附:flink-cdc

环境
组件版本
scala2.12
netcat*
kafka*
mysql*
flink1.13.3
需求

监听mysql某个表的动态,实时同步到另一个数据库中。
当然使用maxwell或canal也可以实现同样的效果。这里只是简单演示
解析

创建环境

//创建环境
val settings = EnvironmentSettings.newInstance().inStreamingMode().build()
val tEnv = TableEnvironment.create(settings)

创建表user01,mysql-cdc是第三方连接器

//创建表user01 - 使用 mysql-cdc connector
tEnv.executeSql(
"""
|create table user01 (
|id int ,
|name string,
|PRIMARY KEY (id) NOT ENFORCED
|)with(
|'connector' = 'mysql-cdc',
|'hostname' = 'server120',
|'port' = '3306',
|'username' = 'flink_test',
|'password' = 'flink_test',
|'database-name' = 'flink_test',
|'table-name' = 'user01',
|'scan.incremental.snapshot.enabled' = 'false'
|)
|"""
.stripMargin)

创建表user02,jdbc 是flink自带的连接器

//创建表user02 - 使用jdbc connector
tEnv.executeSql(
"""
|create table user02 (
|id int PRIMARY KEY,
|name string
|)with(
'connector' = 'jdbc',
| 'url' = 'jdbc:mysql://server120:3306/flink_test',
| 'table-name' = 'user02',
| 'username' = 'flink_test',
| 'password' = 'flink_test'
|)
|"""
.stripMargin)

将user01同步到user02

//将user01同步到user02
tEnv.from("user01").executeInsert("user02")

这种方式简单的实现了实时同步mysql某个表的增删改查。
完整代码

package com.z.tableapi
import org.apache.flink.table.api._
/**
* @Author wenz.ma
* @Date 2021/10/27 17:52
* @Desc cdc 实时同步mysql表数据
*/

object Mysql2MysqlWithCDC {
def main(args: Array[String]): Unit = {
//创建环境
val settings = EnvironmentSettings.newInstance().inStreamingMode().build()
val tEnv = TableEnvironment.create(settings)
//创建表user01 - 使用 mysql-cdc connector
tEnv.executeSql(
"""
|create table user01 (
|id int ,
|name string,
|PRIMARY KEY (id) NOT ENFORCED
|)with(
|'connector' = 'mysql-cdc',
|'hostname' = 'server120',
|'port' = '3306',
|'username' = 'flink_test',
|'password' = 'flink_test',
|'database-name' = 'flink_test',
|'table-name' = 'user01',
|'scan.incremental.snapshot.enabled' = 'false'
|)
|"""
.stripMargin)
//创建表user02 - 使用jdbc connector
tEnv.executeSql(
"""
|create table user02 (
|id int PRIMARY KEY,
|name string
|)with(
'connector' = 'jdbc',
| 'url' = 'jdbc:mysql://server120:3306/flink_test',
| 'table-name' = 'user02',
| 'username' = 'flink_test',
| 'password' = 'flink_test'
|)
|"""
.stripMargin)
//将user01同步到user02
tEnv.from("user01").executeInsert("user02")
}
}

flink mysql 依赖

<dependency>
<groupId>com.ververicagroupId>
<artifactId>flink-connector-mysql-cdcartifactId>
<version>${mysql.cdc}version>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-connector-jdbc_${scala.version}artifactId>
<version>${flink.version}version>
dependency>
<dependency>
<groupId>mysqlgroupId>
<artifactId>mysql-connector-javaartifactId>
<version>5.1.47version>
dependency>

flink 依赖

<dependencies>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-clients_${scala.version}artifactId>
<version>${flink.version}version>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-scala_${scala.version}artifactId>
<version>${flink.version}version>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-streaming-scala_${scala.version}artifactId>
<version>${flink.version}version>
dependency>
dependencies>
源码下载

https://download.csdn.net/download/sinat_25528181/44038825
hive-catalog-demo
在这里插入图片描述

附:flink-cdc

目前有mysql-cdc、postgres-cdc、MongoDB-cdc、oracle-cdc

Github官网:https://github.com/ververica/flink-cdc-connectors
在这里插入图片描述


推荐阅读
  • 本文介绍了如何使用php限制数据库插入的条数并显示每次插入数据库之间的数据数目,以及避免重复提交的方法。同时还介绍了如何限制某一个数据库用户的并发连接数,以及设置数据库的连接数和连接超时时间的方法。最后提供了一些关于浏览器在线用户数和数据库连接数量比例的参考值。 ... [详细]
  • Spring特性实现接口多类的动态调用详解
    本文详细介绍了如何使用Spring特性实现接口多类的动态调用。通过对Spring IoC容器的基础类BeanFactory和ApplicationContext的介绍,以及getBeansOfType方法的应用,解决了在实际工作中遇到的接口及多个实现类的问题。同时,文章还提到了SPI使用的不便之处,并介绍了借助ApplicationContext实现需求的方法。阅读本文,你将了解到Spring特性的实现原理和实际应用方式。 ... [详细]
  • 本文介绍了计算机网络的定义和通信流程,包括客户端编译文件、二进制转换、三层路由设备等。同时,还介绍了计算机网络中常用的关键词,如MAC地址和IP地址。 ... [详细]
  • 本文介绍了Python爬虫技术基础篇面向对象高级编程(中)中的多重继承概念。通过继承,子类可以扩展父类的功能。文章以动物类层次的设计为例,讨论了按照不同分类方式设计类层次的复杂性和多重继承的优势。最后给出了哺乳动物和鸟类的设计示例,以及能跑、能飞、宠物类和非宠物类的增加对类数量的影响。 ... [详细]
  • ConsumerConfiguration在kafka0.9使用JavaConsumer替代了老版本的scalaConsumer。新版的配置如下:bootstrap. ... [详细]
  • 你知道Kafka和Redis的各自优缺点吗?一文带你优化选择,不走弯路 ... [详细]
  • kafka教程基本概念
    kafka教程基本概念 ... [详细]
  • 本文介绍了Python高级网络编程及TCP/IP协议簇的OSI七层模型。首先简单介绍了七层模型的各层及其封装解封装过程。然后讨论了程序开发中涉及到的网络通信内容,主要包括TCP协议、UDP协议和IPV4协议。最后还介绍了socket编程、聊天socket实现、远程执行命令、上传文件、socketserver及其源码分析等相关内容。 ... [详细]
  • 本文介绍了Hyperledger Fabric外部链码构建与运行的相关知识,包括在Hyperledger Fabric 2.0版本之前链码构建和运行的困难性,外部构建模式的实现原理以及外部构建和运行API的使用方法。通过本文的介绍,读者可以了解到如何利用外部构建和运行的方式来实现链码的构建和运行,并且不再受限于特定的语言和部署环境。 ... [详细]
  • 图解redis的持久化存储机制RDB和AOF的原理和优缺点
    本文通过图解的方式介绍了redis的持久化存储机制RDB和AOF的原理和优缺点。RDB是将redis内存中的数据保存为快照文件,恢复速度较快但不支持拉链式快照。AOF是将操作日志保存到磁盘,实时存储数据但恢复速度较慢。文章详细分析了两种机制的优缺点,帮助读者更好地理解redis的持久化存储策略。 ... [详细]
  • 本文介绍了通过mysql命令查看mysql的安装路径的方法,提供了相应的sql语句,并希望对读者有参考价值。 ... [详细]
  • 本文介绍了在mac环境下使用nginx配置nodejs代理服务器的步骤,包括安装nginx、创建目录和文件、配置代理的域名和日志记录等。 ... [详细]
  • 本文介绍了在Linux下安装和配置Kafka的方法,包括安装JDK、下载和解压Kafka、配置Kafka的参数,以及配置Kafka的日志目录、服务器IP和日志存放路径等。同时还提供了单机配置部署的方法和zookeeper地址和端口的配置。通过实操成功的案例,帮助读者快速完成Kafka的安装和配置。 ... [详细]
  • 解决nginx启动报错epoll_wait() reported that client prematurely closed connection的方法
    本文介绍了解决nginx启动报错epoll_wait() reported that client prematurely closed connection的方法,包括检查location配置是否正确、pass_proxy是否需要加“/”等。同时,还介绍了修改nginx的error.log日志级别为debug,以便查看详细日志信息。 ... [详细]
  • 我正在使用sql-serverkafka-connect和debezium监视sqlserver数据库,但是当我发布并运行我的wo ... [详细]
author-avatar
嗳沫沫情深
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有