热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

实时计算之FlinkCDC

什么是CDCCDC是ChangeDataCapture(变更数据获取)的简称。核心思想是,监测并捕获数据库的变动(包括数据或数据表的插入、更新以及删除

什么是CDC

CDC是Change Data Capture(变更数据获取)的简称。核心思想是,监测并捕获数据库的变动(包括数据或数据表的插入、更新以及删除等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其他服务进行订阅及消费。

CDC的种类

CDC主要分为基于查询和基于Binlog两种方式,我们主要了解一下这两种之间的区别:

基于查询的CDC基于Binlog的CDC
开源产品Sqoop、Kafka JDBC SourceCanal、Maxwell、Debezium
执行模式BatchStreaming
是否可以捕获所有数据变化
延迟性高延迟低延迟
是否增加数据库压力

Flink-CDC

Flink社区开发了 flink-cdc-connectors 组件,这是一个可以直接从 MySQL、PostgreSQL 等数据库直接读取全量数据增量变更数据的 source 组件。目前也已开源,开源地址:https://github.com/ververica/flink-cdc-connectors
在这里插入图片描述

主要代码

//2.Flink-CDC将读取binlog的位置信息以状态的方式保存在CK,如果想要做到断点续传,需要从Checkpoint或者Savepoint启动程序//2.1 开启Checkpoint,每隔5秒钟做一次CKenv.enableCheckpointing(5000L);//2.2 指定CK的一致性语义env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);//2.3 设置任务关闭的时候保留最后一次CK数据env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);//2.4 指定从CK自动重启策略env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 2000L));//2.5 设置状态后端env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/flinkCDC"));//2.6 设置访问HDFS的用户名System.setProperty("HADOOP_USER_NAME", "bigdata");//3.创建Flink-MySQL-CDC的SourceProperties properties = new Properties();//initial (default): Performs an initial snapshot on the monitored database tables upon first startup, and continue to read the latest binlog.//latest-offset: Never to perform snapshot on the monitored database tables upon first startup, just read from the end of the binlog which means only have the changes since the connector was started.//timestamp: Never to perform snapshot on the monitored database tables upon first startup, and directly read binlog from the specified timestamp. The consumer will traverse the binlog from the beginning and ignore change events whose timestamp is smaller than the specified timestamp.//specific-offset: Never to perform snapshot on the monitored database tables upon first startup, and directly read binlog from the specified offset.properties.setProperty("scan.startup.mode", "initial");DebeziumSourceFunction mysqlSource = MySQLSource.builder().hostname("hadoop102").port(3306).username("root").password("000000").databaseList("test").tableList("test.z_user_info") //可选配置项,如果不指定该参数,则会读取上一个配置下的所有表的数据
//注意:指定的时候需要使用"db.table"的方式.debeziumProperties(properties).deserializer(new StringDebeziumDeserializationSchema()).build();

给当前的Flink程序创建Savepoint

[bigdata@hadoop102 flink-standalone]$ bin/flink savepoint JobId hdfs://hadoop102:8020/flink/save

FlinkSQL方式的应用

//2.创建Flink-MySQL-CDC的SourcetableEnv.executeSql("CREATE TABLE user_info (" +" id INT," +" name STRING," +" phone_num STRING" +") WITH (" +" 'connector' = 'mysql-cdc'," +" 'hostname' = 'hadoop102'," +" 'port' = '3306'," +" 'username' = 'root'," +" 'password' = '000000'," +" 'database-name' = 'test'," +" 'table-name' = 'z_user_info'" +")");tableEnv.executeSql("select * from user_info").print();

自定义反序列化器

public class Flink_CDCWithCustomerSchema {public static void main(String[] args) throws Exception {//1.创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//2.创建Flink-MySQL-CDC的SourceProperties properties = new Properties();//initial (default): Performs an initial snapshot on the monitored database tables upon first startup, and continue to read the latest binlog.//latest-offset: Never to perform snapshot on the monitored database tables upon first startup, just read from the end of the binlog which means only have the changes since the connector was started.//timestamp: Never to perform snapshot on the monitored database tables upon first startup, and directly read binlog from the specified timestamp. The consumer will traverse the binlog from the beginning and ignore change events whose timestamp is smaller than the specified timestamp.//specific-offset: Never to perform snapshot on the monitored database tables upon first startup, and directly read binlog from the specified offset.properties.setProperty("debezium.snapshot.mode", "initial");DebeziumSourceFunction mysqlSource = MySQLSource.builder().hostname("hadoop102").port(3306).username("root").password("000000").databaseList("test").tableList("test.z_user_info") //可选配置项,如果不指定该参数,则会读取上一个配置下的所有表的数据,注意:指定的时候需要使用"db.table"的方式.debeziumProperties(properties).deserializer(new DebeziumDeserializationSchema() { //自定义数据解析器@Overridepublic void deserialize(SourceRecord sourceRecord, Collector collector) throws Exception {//获取主题信息,包含着数据库和表名 mysql_binlog_source.test.z_user_infoString topic = sourceRecord.topic();String[] arr = topic.split("\\.");String db = arr[1];String tableName = arr[2];//获取操作类型 READ DELETE UPDATE CREATEEnvelope.Operation operation = Envelope.operationFor(sourceRecord);//获取值信息并转换为Struct类型Struct value = (Struct) sourceRecord.value();//获取变化后的数据Struct after = value.getStruct("after");//创建JSON对象用于存储数据信息JSONObject data = new JSONObject();for (Field field : after.schema().fields()) {Object o = after.get(field);data.put(field.name(), o);}//创建JSON对象用于封装最终返回值数据信息JSONObject result = new JSONObject();result.put("operation", operation.toString().toLowerCase());result.put("data", data);result.put("database", db);result.put("table", tableName);//发送数据至下游collector.collect(result.toJSONString());}@Overridepublic TypeInformation getProducedType() {return TypeInformation.of(String.class);}}).build();//3.使用CDC Source从MySQL读取数据DataStreamSource mysqlDS = env.addSource(mysqlSource);//4.打印数据mysqlDS.print();//5.执行任务env.execute();}
}


推荐阅读
  • 本文整理了Java中java.lang.NoSuchMethodError.getMessage()方法的一些代码示例,展示了NoSuchMethodErr ... [详细]
  • 本文介绍了解决java开源项目apache commons email简单使用报错的方法,包括使用正确的JAR包和正确的代码配置,以及相关参数的设置。详细介绍了如何使用apache commons email发送邮件。 ... [详细]
  • 使用freemaker生成Java代码的步骤及示例代码
    本文介绍了使用freemaker这个jar包生成Java代码的步骤,通过提前编辑好的模板,可以避免写重复代码。首先需要在springboot的pom.xml文件中加入freemaker的依赖包。然后编写模板,定义要生成的Java类的属性和方法。最后编写生成代码的类,通过加载模板文件和数据模型,生成Java代码文件。本文提供了示例代码,并展示了文件目录结构。 ... [详细]
  • 如何自行分析定位SAP BSP错误
    The“BSPtag”Imentionedintheblogtitlemeansforexamplethetagchtmlb:configCelleratorbelowwhichi ... [详细]
  • 在Docker中,将主机目录挂载到容器中作为volume使用时,常常会遇到文件权限问题。这是因为容器内外的UID不同所导致的。本文介绍了解决这个问题的方法,包括使用gosu和suexec工具以及在Dockerfile中配置volume的权限。通过这些方法,可以避免在使用Docker时出现无写权限的情况。 ... [详细]
  • 本文详细介绍了SQL日志收缩的方法,包括截断日志和删除不需要的旧日志记录。通过备份日志和使用DBCC SHRINKFILE命令可以实现日志的收缩。同时,还介绍了截断日志的原理和注意事项,包括不能截断事务日志的活动部分和MinLSN的确定方法。通过本文的方法,可以有效减小逻辑日志的大小,提高数据库的性能。 ... [详细]
  • 关于我们EMQ是一家全球领先的开源物联网基础设施软件供应商,服务新产业周期的IoT&5G、边缘计算与云计算市场,交付全球领先的开源物联网消息服务器和流处理数据 ... [详细]
  • 本文介绍了在Linux下安装Perl的步骤,并提供了一个简单的Perl程序示例。同时,还展示了运行该程序的结果。 ... [详细]
  • 自动轮播,反转播放的ViewPagerAdapter的使用方法和效果展示
    本文介绍了如何使用自动轮播、反转播放的ViewPagerAdapter,并展示了其效果。该ViewPagerAdapter支持无限循环、触摸暂停、切换缩放等功能。同时提供了使用GIF.gif的示例和github地址。通过LoopFragmentPagerAdapter类的getActualCount、getActualItem和getActualPagerTitle方法可以实现自定义的循环效果和标题展示。 ... [详细]
  • 个人学习使用:谨慎参考1Client类importcom.thoughtworks.gauge.Step;importcom.thoughtworks.gauge.T ... [详细]
  • imx6ull开发板驱动MT7601U无线网卡的方法和步骤详解
    本文详细介绍了在imx6ull开发板上驱动MT7601U无线网卡的方法和步骤。首先介绍了开发环境和硬件平台,然后说明了MT7601U驱动已经集成在linux内核的linux-4.x.x/drivers/net/wireless/mediatek/mt7601u文件中。接着介绍了移植mt7601u驱动的过程,包括编译内核和配置设备驱动。最后,列举了关键词和相关信息供读者参考。 ... [详细]
  • 闭包一直是Java社区中争论不断的话题,很多语言都支持闭包这个语言特性,闭包定义了一个依赖于外部环境的自由变量的函数,这个函数能够访问外部环境的变量。本文以JavaScript的一个闭包为例,介绍了闭包的定义和特性。 ... [详细]
  • 本文介绍了Android 7的学习笔记总结,包括最新的移动架构视频、大厂安卓面试真题和项目实战源码讲义。同时还分享了开源的完整内容,并提醒读者在使用FileProvider适配时要注意不同模块的AndroidManfiest.xml中配置的xml文件名必须不同,否则会出现问题。 ... [详细]
  • 本文介绍了在Linux下安装和配置Kafka的方法,包括安装JDK、下载和解压Kafka、配置Kafka的参数,以及配置Kafka的日志目录、服务器IP和日志存放路径等。同时还提供了单机配置部署的方法和zookeeper地址和端口的配置。通过实操成功的案例,帮助读者快速完成Kafka的安装和配置。 ... [详细]
  • 开发笔记:实验7的文件读写操作
    本文介绍了使用C++的ofstream和ifstream类进行文件读写操作的方法,包括创建文件、写入文件和读取文件的过程。同时还介绍了如何判断文件是否成功打开和关闭文件的方法。通过本文的学习,读者可以了解如何在C++中进行文件读写操作。 ... [详细]
author-avatar
手机用户2502870143_120
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有