作者:Ben_Design_114 | 来源:互联网 | 2023-06-23 10:15
FlinkStreamETL
0.功能说明 概括 :利用Flink实时统计Mysql数据库BinLog日志数据,并将流式数据注册为流表,利用Flink SQL将流表与Mysql的维表进行JOIN,最后将计算结果实时写入Greenplum/Mysql。
1.需求分析 1.1需求 实时统计各个地区会议室的空置率,预定率,并在前端看板上实时展示。源系统的数据库是Mysql
,它有三张表,分别是:t_meeting_info(会议室预定信息表)、t_meeting_location(属地表,维度表)、t_meeting_address(会议室属地表,维度表)。
1.2说明 t_meeting_info
表中的数据每时每刻都在更新数据,若通过JDBC
方式定时查询Mysql
,会给源系统数据库造成大量无形的压力,甚至会影响正常业务的使用,并且时效性也不高。需要在基本不影响Mysql
正常使用的情况下完成对增量数据的处理。
上面三张表的DDL
语句如下:
t_meeting_info(会议室预定信息表,这张表数据会实时更新) CREATE TABLE ` t_meeting_info` ( ` id` int ( 11 ) NOT NULL AUTO_INCREMENT COMMENT '主键id' , ` meeting_code` varchar ( 255 ) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '会议业务唯一编号' , ` msite` varchar ( 255 ) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '会议名称' , ` mcontent` varchar ( 4096 ) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '会议内容' , ` attend_count` int ( 5 ) DEFAULT NULL COMMENT '参会人数' , ` type ` int ( 5 ) DEFAULT NULL COMMENT '会议类型 1 普通会议 2 融合会议 3 视频会议 4 电话会议' , ` status ` int ( 255 ) DEFAULT NULL COMMENT '会议状态 ' , ` address_id` int ( 11 ) DEFAULT NULL COMMENT '会议室id' , ` email` varchar ( 255 ) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '创建人邮箱' , ` contact_tel` varchar ( 255 ) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '联系电话' , ` create_user_name` varchar ( 255 ) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '创建人姓名' , ` create_user_id` varchar ( 100 ) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '创建人工号' , ` creator_org` varchar ( 255 ) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '创建人组织' , ` mstart_date` datetime DEFAULT NULL COMMENT '会议开始时间' , ` mend_date` datetime DEFAULT NULL COMMENT '会议结束时间' , ` create_time` datetime DEFAULT NULL COMMENT '创建时间' , ` update_user` varchar ( 255 ) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '更新人' , ` update_time` datetime DEFAULT NULL COMMENT '更新时间' , ` company` int ( 10 ) DEFAULT NULL COMMENT '会议所在属地code' , ` sign_status` varchar ( 255 ) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '预留字段' , PRIMARY KEY ( ` id` ) USING BTREE , KEY ` t_meeting_info_meeting_code_index` ( ` meeting_code` ) USING BTREE , KEY ` t_meeting_info_address_id_index` ( ` address_id` ) USING BTREE , KEY ` t_meeting_info_create_user_id_index` ( ` create_user_id` ) ) ENGINE = InnoDB AUTO_INCREMENT = 65216 DEFAULT CHARSET = utf8 ROW_FORMAT= DYNAMIC COMMENT = '会议主表' ;
t_meeting_location(属地表,地区维表)
CREATE TABLE ` t_meeting_location` ( ` id` int ( 11 ) NOT NULL AUTO_INCREMENT , ` short_name` varchar ( 255 ) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '属地简称' , ` full_name` varchar ( 255 ) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '属地全称' , ` code` varchar ( 255 ) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '属地code' , ` region_id` int ( 11 ) DEFAULT NULL COMMENT '地区id' , ` create_user` varchar ( 255 ) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '创建人' , ` update_user` varchar ( 255 ) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '更新人' , ` create_time` datetime DEFAULT NULL COMMENT '创建时间' , ` update_time` datetime DEFAULT NULL COMMENT '更新时间' , PRIMARY KEY ( ` id` ) USING BTREE , UNIQUE KEY ` t_meeting_location_code_uindex` ( ` code` ) USING BTREE ) ENGINE = InnoDB AUTO_INCREMENT = 103 DEFAULT CHARSET = utf8 ROW_FORMAT= DYNAMIC COMMENT = '属地表' ;
t_meeting_address(会议室属地表,会议室维表)
CREATE TABLE ` t_meeting_address` ( ` id` int ( 11 ) NOT NULL AUTO_INCREMENT COMMENT '主键id' , ` name` varchar ( 255 ) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '会议室名称' , ` location` varchar ( 255 ) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '所在属地' , ` shared` int ( 3 ) DEFAULT NULL COMMENT '是否共享 0 默认不共享 1 全部共享 2 选择性共享' , ` cost` int ( 10 ) DEFAULT NULL COMMENT '每小时成本' , ` size` varchar ( 255 ) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '会议室容量大小' , ` bvm_ip` varchar ( 255 ) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT 'BVM IP' , ` type ` int ( 2 ) DEFAULT NULL COMMENT '会议室类型 1 普通会议室 2 视频会议室' , ` create_time` datetime DEFAULT NULL COMMENT '创建时间' , ` create_user` varchar ( 255 ) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '创建人' , ` update_time` datetime DEFAULT NULL COMMENT '更新时间' , ` update_user` varchar ( 255 ) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '更新人' , ` status ` int ( 2 ) DEFAULT NULL COMMENT '是否启用 ,0 未启用 1已启用 2已删除' , ` order ` int ( 5 ) DEFAULT NULL COMMENT '排序' , ` approve` int ( 2 ) DEFAULT NULL COMMENT '是否审批 0 不审批 1 审批' , PRIMARY KEY ( ` id` ) USING BTREE , KEY ` t_meeting_address_location_index` ( ` location` ) USING BTREE , KEY ` order ` ( ` order ` ) USING BTREE ) ENGINE = InnoDB AUTO_INCREMENT = 554 DEFAULT CHARSET = utf8 ROW_FORMAT= DYNAMIC COMMENT = '会议室表' ;
2.实现方案 方案如下图所示:
利用Canal 监听Mysql
数据库的增量BinLog
日志数据(JSON格式
) 将增量日志数据作为Kafka 的生产者,Flink解析Kafka 的Topic
中的数据并消费 将计算后的流式数据(Stream)注册为Flink 中的表(Table) 最后利用Flink与t_meeting_location、t_meeting_address维表进行JOIN,将最终的结果写入数据库。 需要服务器:CentOS7,JDK8、Scala 2.12.6、Mysql、Canal、Flink1.9、Zookkeeper、Kafka
3.可视化方案 Tableau实时刷新Greenplum,FineBI也可以(秒级) DataV也可以每几秒刷新一次 Flink计算后的结果,写入到缓存,前端开发可视化组件进行展示(实时展示)。 4.项目地址 由于CSDN不方便粘贴图片,详细内容请见: FlinkStreamETL
https://github.com/liwei199411/FlinkStreamETL/tree/master
5.参考目录 [1].基于Spark Streaming + Canal + Kafka对Mysql增量数据实时进行监测分析
[2].Canal
[3].Canal 的 .NET 客户端
[4].如何基于MYSQL
做实时计算?
[5].基于Canal与Flink
实现数据实时增量同步(一)
[6].美团DB数据同步到数据仓库的架构与实践
[7].处理JSON
格式的日志数据,然后进行流式Join
[8].Flink
继续实践:从日志清洗到实时统计内容PV
等多个指标
[9].实时数据架构体系建设思路
[10].Flink` 流与维表的关联
[11].Flink DataStream流表与维表Join(
Async` I/O)
12. `flink 流表join mysql表
作者:岳过山丘
链接:https://www.jianshu.com/p/44583b98ecbb
13. `flink1.9 使用LookupableTableSource实现异步维表关联
作者:todd5167
链接:https://www.jianshu.com/p/7ebe1ec8aa7c
14. Flink异步之矛盾-锋利的Async I/O
作者:王知无
链接:https://www.jianshu.com/p/85ee258aa41f
15.Flink 的时间属性及原理解析
https://blog.csdn.net/zhengzhaoyang122/article/details/107352934?utm_medium=distribute.pc_relevant.none-task-blog-baidujs-3
16.大屏数据可视化
https://yyhsong.github.io/iDataV/xiang.top/2020/03/05/基于Canal与Flink实现数据实时增量同步-一/)