热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

实时计算之FlinkCDC

什么是CDCCDC是ChangeDataCapture(变更数据获取)的简称。核心思想是,监测并捕获数据库的变动(包括数据或数据表的插入、更新以及删除

什么是CDC

CDC是Change Data Capture(变更数据获取)的简称。核心思想是,监测并捕获数据库的变动(包括数据或数据表的插入、更新以及删除等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其他服务进行订阅及消费。

CDC的种类

CDC主要分为基于查询和基于Binlog两种方式,我们主要了解一下这两种之间的区别:

基于查询的CDC基于Binlog的CDC
开源产品Sqoop、Kafka JDBC SourceCanal、Maxwell、Debezium
执行模式BatchStreaming
是否可以捕获所有数据变化
延迟性高延迟低延迟
是否增加数据库压力

Flink-CDC

Flink社区开发了 flink-cdc-connectors 组件,这是一个可以直接从 MySQL、PostgreSQL 等数据库直接读取全量数据增量变更数据的 source 组件。目前也已开源,开源地址:https://github.com/ververica/flink-cdc-connectors
在这里插入图片描述

主要代码

//2.Flink-CDC将读取binlog的位置信息以状态的方式保存在CK,如果想要做到断点续传,需要从Checkpoint或者Savepoint启动程序//2.1 开启Checkpoint,每隔5秒钟做一次CKenv.enableCheckpointing(5000L);//2.2 指定CK的一致性语义env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);//2.3 设置任务关闭的时候保留最后一次CK数据env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);//2.4 指定从CK自动重启策略env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 2000L));//2.5 设置状态后端env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/flinkCDC"));//2.6 设置访问HDFS的用户名System.setProperty("HADOOP_USER_NAME", "bigdata");//3.创建Flink-MySQL-CDC的SourceProperties properties = new Properties();//initial (default): Performs an initial snapshot on the monitored database tables upon first startup, and continue to read the latest binlog.//latest-offset: Never to perform snapshot on the monitored database tables upon first startup, just read from the end of the binlog which means only have the changes since the connector was started.//timestamp: Never to perform snapshot on the monitored database tables upon first startup, and directly read binlog from the specified timestamp. The consumer will traverse the binlog from the beginning and ignore change events whose timestamp is smaller than the specified timestamp.//specific-offset: Never to perform snapshot on the monitored database tables upon first startup, and directly read binlog from the specified offset.properties.setProperty("scan.startup.mode", "initial");DebeziumSourceFunction mysqlSource = MySQLSource.builder().hostname("hadoop102").port(3306).username("root").password("000000").databaseList("test").tableList("test.z_user_info") //可选配置项,如果不指定该参数,则会读取上一个配置下的所有表的数据
//注意:指定的时候需要使用"db.table"的方式.debeziumProperties(properties).deserializer(new StringDebeziumDeserializationSchema()).build();

给当前的Flink程序创建Savepoint

[bigdata@hadoop102 flink-standalone]$ bin/flink savepoint JobId hdfs://hadoop102:8020/flink/save

FlinkSQL方式的应用

//2.创建Flink-MySQL-CDC的SourcetableEnv.executeSql("CREATE TABLE user_info (" +" id INT," +" name STRING," +" phone_num STRING" +") WITH (" +" 'connector' = 'mysql-cdc'," +" 'hostname' = 'hadoop102'," +" 'port' = '3306'," +" 'username' = 'root'," +" 'password' = '000000'," +" 'database-name' = 'test'," +" 'table-name' = 'z_user_info'" +")");tableEnv.executeSql("select * from user_info").print();

自定义反序列化器

public class Flink_CDCWithCustomerSchema {public static void main(String[] args) throws Exception {//1.创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//2.创建Flink-MySQL-CDC的SourceProperties properties = new Properties();//initial (default): Performs an initial snapshot on the monitored database tables upon first startup, and continue to read the latest binlog.//latest-offset: Never to perform snapshot on the monitored database tables upon first startup, just read from the end of the binlog which means only have the changes since the connector was started.//timestamp: Never to perform snapshot on the monitored database tables upon first startup, and directly read binlog from the specified timestamp. The consumer will traverse the binlog from the beginning and ignore change events whose timestamp is smaller than the specified timestamp.//specific-offset: Never to perform snapshot on the monitored database tables upon first startup, and directly read binlog from the specified offset.properties.setProperty("debezium.snapshot.mode", "initial");DebeziumSourceFunction mysqlSource = MySQLSource.builder().hostname("hadoop102").port(3306).username("root").password("000000").databaseList("test").tableList("test.z_user_info") //可选配置项,如果不指定该参数,则会读取上一个配置下的所有表的数据,注意:指定的时候需要使用"db.table"的方式.debeziumProperties(properties).deserializer(new DebeziumDeserializationSchema() { //自定义数据解析器@Overridepublic void deserialize(SourceRecord sourceRecord, Collector collector) throws Exception {//获取主题信息,包含着数据库和表名 mysql_binlog_source.test.z_user_infoString topic = sourceRecord.topic();String[] arr = topic.split("\\.");String db = arr[1];String tableName = arr[2];//获取操作类型 READ DELETE UPDATE CREATEEnvelope.Operation operation = Envelope.operationFor(sourceRecord);//获取值信息并转换为Struct类型Struct value = (Struct) sourceRecord.value();//获取变化后的数据Struct after = value.getStruct("after");//创建JSON对象用于存储数据信息JSONObject data = new JSONObject();for (Field field : after.schema().fields()) {Object o = after.get(field);data.put(field.name(), o);}//创建JSON对象用于封装最终返回值数据信息JSONObject result = new JSONObject();result.put("operation", operation.toString().toLowerCase());result.put("data", data);result.put("database", db);result.put("table", tableName);//发送数据至下游collector.collect(result.toJSONString());}@Overridepublic TypeInformation getProducedType() {return TypeInformation.of(String.class);}}).build();//3.使用CDC Source从MySQL读取数据DataStreamSource mysqlDS = env.addSource(mysqlSource);//4.打印数据mysqlDS.print();//5.执行任务env.execute();}
}


推荐阅读
  • 从0到1搭建大数据平台
    从0到1搭建大数据平台 ... [详细]
  • 本文介绍如何使用 Python 的 DOM 和 SAX 方法解析 XML 文件,并通过示例展示了如何动态创建数据库表和处理大量数据的实时插入。 ... [详细]
  • 如何将TS文件转换为M3U8直播流:HLS与M3U8格式详解
    在视频传输领域,MP4虽然常见,但在直播场景中直接使用MP4格式存在诸多问题。例如,MP4文件的头部信息(如ftyp、moov)较大,导致初始加载时间较长,影响用户体验。相比之下,HLS(HTTP Live Streaming)协议及其M3U8格式更具优势。HLS通过将视频切分成多个小片段,并生成一个M3U8播放列表文件,实现低延迟和高稳定性。本文详细介绍了如何将TS文件转换为M3U8直播流,包括技术原理和具体操作步骤,帮助读者更好地理解和应用这一技术。 ... [详细]
  • 本文详细介绍了在 Android 7.1 系统中调整屏幕分辨率和默认音量设置的方法。针对系统默认音量过大的问题,提供了具体的步骤来降低系统、铃声、媒体和闹钟的默认音量,以提升用户体验。此外,还涵盖了如何通过系统设置或使用第三方工具来优化屏幕分辨率,确保设备显示效果更加清晰和流畅。 ... [详细]
  • HBase在金融大数据迁移中的应用与挑战
    随着最后一台设备的下线,标志着超过10PB的HBase数据迁移项目顺利完成。目前,新的集群已在新机房稳定运行超过两个月,监控数据显示,新集群的查询响应时间显著降低,系统稳定性大幅提升。此外,数据消费的波动也变得更加平滑,整体性能得到了显著优化。 ... [详细]
  • 技术日志:深入探讨Spark Streaming与Spark SQL的融合应用
    技术日志:深入探讨Spark Streaming与Spark SQL的融合应用 ... [详细]
  • 深入解析十大经典排序算法:动画演示、原理分析与代码实现
    本文深入探讨了十种经典的排序算法,不仅通过动画直观展示了每种算法的运行过程,还详细解析了其背后的原理与机制,并提供了相应的代码实现,帮助读者全面理解和掌握这些算法的核心要点。 ... [详细]
  • sh cca175problem03evolveavroschema.sh ... [详细]
  • Hudi是一种数据湖的存储格式,在Hadoop文件系统之上提供了更新数据和删除数据的能力以及流式消费变化数据的能力。应用场景近实时数据摄取Hudi支持插入、更新和删除数据的能力。您 ... [详细]
  • 马蜂窝数据总监分享:从数仓到数据中台,大数据演进技术选型最优解
    大家好,今天分享的议题主要包括几大内容:带大家回顾一下大数据在国内的发展,从传统数仓到当前数据中台的演进过程;我个人认为数 ... [详细]
  • 不会搭建大数据平台,我被老板优化了...
    不会,搭建,大数,据,平台,我 ... [详细]
  • 数据仓库中基本概念
    一、数据仓库数据仓库(DataWarehouse)是一个面向主题的、集成的、稳定的且随时间变化的数据集合,用于支持管理人员的决策面向主题主题就是类型的意思。传统数 ... [详细]
  • 大数据开发笔记(一):HDFS介绍
    ✨大数据开发笔记推荐:大数据开发面试知识点总结_GoAI的博客-CSDN博客_大数据开发面试​本文详细介绍大数据hadoop生态圈各部分知识,包括不限 ... [详细]
  • 基于Net Core 3.0与Web API的前后端分离开发:Vue.js在前端的应用
    本文介绍了如何使用Net Core 3.0和Web API进行前后端分离开发,并重点探讨了Vue.js在前端的应用。后端采用MySQL数据库和EF Core框架进行数据操作,开发环境为Windows 10和Visual Studio 2019,MySQL服务器版本为8.0.16。文章详细描述了API项目的创建过程、启动步骤以及必要的插件安装,为开发者提供了一套完整的开发指南。 ... [详细]
  • 【数据结构与算法】——快速排序
    Sqoop是一款开源的工具,主要用于在Hadoop(Hive)与传统的数据库(mysql、postgresql)间进行数据的传递,可以将一个关系型数据库(例如:MySQL,O ... [详细]
author-avatar
手机用户2502870143_120
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有