作者:嗳沫沫情深 | 来源:互联网 | 2023-07-09 12:51
Flink Sink Mysql 简单实现
- 环境
- 需求
- 解析
- 完整代码
- 源码下载
- 附:flink-cdc
环境
组件 | 版本 |
---|
scala | 2.12 |
netcat | * |
kafka | * |
mysql | * |
flink | 1.13.3 |
需求
监听mysql某个表的动态,实时同步到另一个数据库中。
当然使用maxwell或canal也可以实现同样的效果。这里只是简单演示
解析
创建环境
val settings = EnvironmentSettings.newInstance().inStreamingMode().build()
val tEnv = TableEnvironment.create(settings)
创建表user01,mysql-cdc是第三方连接器
tEnv.executeSql(
"""
|create table user01 (
|id int ,
|name string,
|PRIMARY KEY (id) NOT ENFORCED
|)with(
|'connector' = 'mysql-cdc',
|'hostname' = 'server120',
|'port' = '3306',
|'username' = 'flink_test',
|'password' = 'flink_test',
|'database-name' = 'flink_test',
|'table-name' = 'user01',
|'scan.incremental.snapshot.enabled' = 'false'
|)
|""".stripMargin)
创建表user02,jdbc 是flink自带的连接器
tEnv.executeSql(
"""
|create table user02 (
|id int PRIMARY KEY,
|name string
|)with(
'connector' = 'jdbc',
| 'url' = 'jdbc:mysql://server120:3306/flink_test',
| 'table-name' = 'user02',
| 'username' = 'flink_test',
| 'password' = 'flink_test'
|)
|""".stripMargin)
将user01同步到user02
tEnv.from("user01").executeInsert("user02")
这种方式简单的实现了实时同步mysql某个表的增删改查。
完整代码
package com.z.tableapi
import org.apache.flink.table.api._
object Mysql2MysqlWithCDC {
def main(args: Array[String]): Unit = {
val settings = EnvironmentSettings.newInstance().inStreamingMode().build()
val tEnv = TableEnvironment.create(settings)
tEnv.executeSql(
"""
|create table user01 (
|id int ,
|name string,
|PRIMARY KEY (id) NOT ENFORCED
|)with(
|'connector' = 'mysql-cdc',
|'hostname' = 'server120',
|'port' = '3306',
|'username' = 'flink_test',
|'password' = 'flink_test',
|'database-name' = 'flink_test',
|'table-name' = 'user01',
|'scan.incremental.snapshot.enabled' = 'false'
|)
|""".stripMargin)
tEnv.executeSql(
"""
|create table user02 (
|id int PRIMARY KEY,
|name string
|)with(
'connector' = 'jdbc',
| 'url' = 'jdbc:mysql://server120:3306/flink_test',
| 'table-name' = 'user02',
| 'username' = 'flink_test',
| 'password' = 'flink_test'
|)
|""".stripMargin)
tEnv.from("user01").executeInsert("user02")
}
}
flink mysql 依赖
<dependency>
<groupId>com.ververicagroupId>
<artifactId>flink-connector-mysql-cdcartifactId>
<version>${mysql.cdc}version>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-connector-jdbc_${scala.version}artifactId>
<version>${flink.version}version>
dependency>
<dependency>
<groupId>mysqlgroupId>
<artifactId>mysql-connector-javaartifactId>
<version>5.1.47version>
dependency>
flink 依赖
<dependencies>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-clients_${scala.version}artifactId>
<version>${flink.version}version>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-scala_${scala.version}artifactId>
<version>${flink.version}version>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-streaming-scala_${scala.version}artifactId>
<version>${flink.version}version>
dependency>
dependencies>
源码下载
https://download.csdn.net/download/sinat_25528181/44038825
hive-catalog-demo
附:flink-cdc
目前有mysql-cdc、postgres-cdc、MongoDB-cdc、oracle-cdc
Github官网:https://github.com/ververica/flink-cdc-connectors