热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Flink实时读取Mysql增量日志数据并写入GreenPlum/Mysql

FlinkStreamETL0.功能说明

FlinkStreamETL

0.功能说明

概括:利用Flink实时统计Mysql数据库BinLog日志数据,并将流式数据注册为流表,利用Flink SQL将流表与Mysql的维表进行JOIN,最后将计算结果实时写入Greenplum/Mysql。

1.需求分析

1.1需求

实时统计各个地区会议室的空置率,预定率,并在前端看板上实时展示。源系统的数据库是Mysql,它有三张表,分别是:t_meeting_info(会议室预定信息表)、t_meeting_location(属地表,维度表)、t_meeting_address(会议室属地表,维度表)。

1.2说明

t_meeting_info表中的数据每时每刻都在更新数据,若通过JDBC方式定时查询Mysql,会给源系统数据库造成大量无形的压力,甚至会影响正常业务的使用,并且时效性也不高。需要在基本不影响Mysql正常使用的情况下完成对增量数据的处理。

上面三张表的DDL语句如下:

  • t_meeting_info(会议室预定信息表,这张表数据会实时更新)

CREATE TABLE `t_meeting_info` (
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主键id',
`meeting_code` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '会议业务唯一编号',
`msite` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '会议名称',
`mcontent` varchar(4096) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '会议内容',
`attend_count` int(5) DEFAULT NULL COMMENT '参会人数',
`type` int(5) DEFAULT NULL COMMENT '会议类型 1 普通会议 2 融合会议 3 视频会议 4 电话会议',
`status` int(255) DEFAULT NULL COMMENT '会议状态 ',
`address_id` int(11) DEFAULT NULL COMMENT '会议室id',
`email` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '创建人邮箱',
`contact_tel` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '联系电话',
`create_user_name` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '创建人姓名',
`create_user_id` varchar(100) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '创建人工号',
`creator_org` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '创建人组织',
`mstart_date` datetime DEFAULT NULL COMMENT '会议开始时间',
`mend_date` datetime DEFAULT NULL COMMENT '会议结束时间',
`create_time` datetime DEFAULT NULL COMMENT '创建时间',
`update_user` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '更新人',
`update_time` datetime DEFAULT NULL COMMENT '更新时间',
`company` int(10) DEFAULT NULL COMMENT '会议所在属地code',
`sign_status` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '预留字段',
PRIMARY KEY (`id`) USING BTREE,
KEY `t_meeting_info_meeting_code_index` (`meeting_code`) USING BTREE,
KEY `t_meeting_info_address_id_index` (`address_id`) USING BTREE,
KEY `t_meeting_info_create_user_id_index` (`create_user_id`)
) ENGINE=InnoDB AUTO_INCREMENT=65216 DEFAULT CHARSET=utf8 ROW_FORMAT=DYNAMIC COMMENT='会议主表';

  • t_meeting_location(属地表,地区维表)

    CREATE TABLE `t_meeting_location` (
    `id` int(11) NOT NULL AUTO_INCREMENT,
    `short_name` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '属地简称',
    `full_name` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '属地全称',
    `code` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '属地code',
    `region_id` int(11) DEFAULT NULL COMMENT '地区id',
    `create_user` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '创建人',
    `update_user` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '更新人',
    `create_time` datetime DEFAULT NULL COMMENT '创建时间',
    `update_time` datetime DEFAULT NULL COMMENT '更新时间',
    PRIMARY KEY (`id`) USING BTREE,
    UNIQUE KEY `t_meeting_location_code_uindex` (`code`) USING BTREE
    ) ENGINE=InnoDB AUTO_INCREMENT=103 DEFAULT CHARSET=utf8 ROW_FORMAT=DYNAMIC COMMENT='属地表';

  • t_meeting_address(会议室属地表,会议室维表)

    CREATE TABLE `t_meeting_address` (
    `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主键id',
    `name` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '会议室名称',
    `location` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '所在属地',
    `shared` int(3) DEFAULT NULL COMMENT '是否共享 0 默认不共享 1 全部共享 2 选择性共享',
    `cost` int(10) DEFAULT NULL COMMENT '每小时成本',
    `size` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '会议室容量大小',
    `bvm_ip` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT 'BVM IP',
    `type` int(2) DEFAULT NULL COMMENT '会议室类型 1 普通会议室 2 视频会议室',
    `create_time` datetime DEFAULT NULL COMMENT '创建时间',
    `create_user` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '创建人',
    `update_time` datetime DEFAULT NULL COMMENT '更新时间',
    `update_user` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '更新人',
    `status` int(2) DEFAULT NULL COMMENT '是否启用 ,0 未启用 1已启用 2已删除',
    `order` int(5) DEFAULT NULL COMMENT '排序',
    `approve` int(2) DEFAULT NULL COMMENT '是否审批 0 不审批 1 审批',
    PRIMARY KEY (`id`) USING BTREE,
    KEY `t_meeting_address_location_index` (`location`) USING BTREE,
    KEY `order` (`order`) USING BTREE
    ) ENGINE=InnoDB AUTO_INCREMENT=554 DEFAULT CHARSET=utf8 ROW_FORMAT=DYNAMIC COMMENT='会议室表';

2.实现方案

方案如下图所示:

  • 利用Canal监听Mysql数据库的增量BinLog日志数据(JSON格式
  • 将增量日志数据作为Kafka的生产者,Flink解析KafkaTopic 中的数据并消费
  • 将计算后的流式数据(Stream)注册为Flink 中的表(Table)
  • 最后利用Flink与t_meeting_location、t_meeting_address维表进行JOIN,将最终的结果写入数据库。
    在这里插入图片描述

需要服务器:CentOS7,JDK8、Scala 2.12.6、Mysql、Canal、Flink1.9、Zookkeeper、Kafka

3.可视化方案

  • Tableau实时刷新Greenplum,FineBI也可以(秒级)
  • DataV也可以每几秒刷新一次
  • Flink计算后的结果,写入到缓存,前端开发可视化组件进行展示(实时展示)。

4.项目地址

由于CSDN不方便粘贴图片,详细内容请见:
FlinkStreamETL

https://github.com/liwei199411/FlinkStreamETL/tree/master

5.参考目录

[1].基于Spark Streaming + Canal + Kafka对Mysql增量数据实时进行监测分析

[2].Canal

[3].Canal 的 .NET 客户端

[4].如何基于MYSQL做实时计算?

[5].基于Canal与Flink实现数据实时增量同步(一)

[6].美团DB数据同步到数据仓库的架构与实践

[7].处理JSON格式的日志数据,然后进行流式Join

[8].Flink继续实践:从日志清洗到实时统计内容PV等多个指标

[9].实时数据架构体系建设思路

[10].Flink` 流与维表的关联

[11].Flink DataStream流表与维表Join(Async` I/O)

12. `flink 流表join mysql表

作者:岳过山丘
链接:https://www.jianshu.com/p/44583b98ecbb

13. `flink1.9 使用LookupableTableSource实现异步维表关联

作者:todd5167
链接:https://www.jianshu.com/p/7ebe1ec8aa7c

14. Flink异步之矛盾-锋利的Async I/O

作者:王知无
链接:https://www.jianshu.com/p/85ee258aa41f

15.Flink 的时间属性及原理解析

https://blog.csdn.net/zhengzhaoyang122/article/details/107352934?utm_medium=distribute.pc_relevant.none-task-blog-baidujs-3

16.大屏数据可视化
https://yyhsong.github.io/iDataV/xiang.top/2020/03/05/基于Canal与Flink实现数据实时增量同步-一/)


版权声明:本文为leaeason原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。
原文链接:https://blog.csdn.net/leaeason/article/details/107762692
推荐阅读
  • MyBatis多表查询与动态SQL使用
    本文介绍了MyBatis多表查询与动态SQL的使用方法,包括一对一查询和一对多查询。同时还介绍了动态SQL的使用,包括if标签、trim标签、where标签、set标签和foreach标签的用法。文章还提供了相关的配置信息和示例代码。 ... [详细]
  • mysql-cluster集群sql节点高可用keepalived的故障处理过程
    本文描述了mysql-cluster集群sql节点高可用keepalived的故障处理过程,包括故障发生时间、故障描述、故障分析等内容。根据keepalived的日志分析,发现bogus VRRP packet received on eth0 !!!等错误信息,进而导致vip地址失效,使得mysql-cluster的api无法访问。针对这个问题,本文提供了相应的解决方案。 ... [详细]
  • 基于PgpoolII的PostgreSQL集群安装与配置教程
    本文介绍了基于PgpoolII的PostgreSQL集群的安装与配置教程。Pgpool-II是一个位于PostgreSQL服务器和PostgreSQL数据库客户端之间的中间件,提供了连接池、复制、负载均衡、缓存、看门狗、限制链接等功能,可以用于搭建高可用的PostgreSQL集群。文章详细介绍了通过yum安装Pgpool-II的步骤,并提供了相关的官方参考地址。 ... [详细]
  • 本文介绍了在开发Android新闻App时,搭建本地服务器的步骤。通过使用XAMPP软件,可以一键式搭建起开发环境,包括Apache、MySQL、PHP、PERL。在本地服务器上新建数据库和表,并设置相应的属性。最后,给出了创建new表的SQL语句。这个教程适合初学者参考。 ... [详细]
  • 本文介绍了如何在MySQL中将零值替换为先前的非零值的方法,包括使用内联查询和更新查询。同时还提供了选择正确值的方法。 ... [详细]
  • 本文介绍了Oracle数据库中tnsnames.ora文件的作用和配置方法。tnsnames.ora文件在数据库启动过程中会被读取,用于解析LOCAL_LISTENER,并且与侦听无关。文章还提供了配置LOCAL_LISTENER和1522端口的示例,并展示了listener.ora文件的内容。 ... [详细]
  • 本文介绍了一个在线急等问题解决方法,即如何统计数据库中某个字段下的所有数据,并将结果显示在文本框里。作者提到了自己是一个菜鸟,希望能够得到帮助。作者使用的是ACCESS数据库,并且给出了一个例子,希望得到的结果是560。作者还提到自己已经尝试了使用"select sum(字段2) from 表名"的语句,得到的结果是650,但不知道如何得到560。希望能够得到解决方案。 ... [详细]
  • 本文介绍了在MySQL8.0中如何查看性能并解析SQL执行顺序。首先介绍了查询性能工具的开启方法,然后详细解析了SQL执行顺序中的每个步骤,包括from、on、join、where、group by、having、select distinct、union、order by和limit。同时还介绍了虚拟表的概念和生成过程。通过本文的解析,读者可以更好地理解MySQL8.0中的性能查看和SQL执行顺序。 ... [详细]
  • MySQL多表数据库操作方法及子查询详解
    本文详细介绍了MySQL数据库的多表操作方法,包括增删改和单表查询,同时还解释了子查询的概念和用法。文章通过示例和步骤说明了如何进行数据的插入、删除和更新操作,以及如何执行单表查询和使用聚合函数进行统计。对于需要对MySQL数据库进行操作的读者来说,本文是一个非常实用的参考资料。 ... [详细]
  • Android Studio Bumblebee | 2021.1.1(大黄蜂版本使用介绍)
    本文介绍了Android Studio Bumblebee | 2021.1.1(大黄蜂版本)的使用方法和相关知识,包括Gradle的介绍、设备管理器的配置、无线调试、新版本问题等内容。同时还提供了更新版本的下载地址和启动页面截图。 ... [详细]
  • 本文介绍了在Mac上搭建php环境后无法使用localhost连接mysql的问题,并通过将localhost替换为127.0.0.1或本机IP解决了该问题。文章解释了localhost和127.0.0.1的区别,指出了使用socket方式连接导致连接失败的原因。此外,还提供了相关链接供读者深入了解。 ... [详细]
  • 本文详细介绍了Spring的JdbcTemplate的使用方法,包括执行存储过程、存储函数的call()方法,执行任何SQL语句的execute()方法,单个更新和批量更新的update()和batchUpdate()方法,以及单查和列表查询的query()和queryForXXX()方法。提供了经过测试的API供使用。 ... [详细]
  • ALTERTABLE通过更改、添加、除去列和约束,或者通过启用或禁用约束和触发器来更改表的定义。语法ALTERTABLEtable{[ALTERCOLUMNcolu ... [详细]
  • ubuntu用sqoop将数据从hive导入mysql时,命令: ... [详细]
  • 小程序wxs中的时间格式化以及格式化时间和date时间互转
    本文介绍了在小程序wxs中进行时间格式化操作的问题,并提供了解决方法。同时还介绍了格式化时间和date时间的互相转换的方法。 ... [详细]
author-avatar
Ben_Design_114
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有