热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

binlog工具_基于Binlog实时同步数仓,有哪些不为人知的坑?

最近看到一篇文章《基于Canal与Flink实现数据实时增量同步》,主要讲解的是基于Flink有关于MySQLBinlog数据采集的方案,看了一下实践方
最近看到一篇文章《基于Canal与Flink实现数据实时增量同步》,主要讲解的是基于Flink有关于MySQL Binlog数据采集的方案,看了一下实践方法和具体代码操作,感觉有一些欠考虑和不足的情况。

笔者之前有过一些类似的采集工具实践的总结,但是并没有在整体上做出一个系统性的总结,所以我在想,是不是可以写一篇个人总结性的文章,把Binlog采集中的问题以及相应的解决方案也进行总结呢?

可能很多人对于Binlog的认识还不是很充足,会粗浅的认为:“它不就是MySQL产生的,有固定结构的log嘛,把数据采集过来,然后把它做一下数据落地,它有什么难的呢?”

的确,它本质上确实就是个log,可是实际上,关于Binlog采集从场景分析,再到技术选型,整体内部有很多不为人知的坑,不要小瞧了它。

笔者写这篇文章,目的是把实际工作中对于Binlog数据采集的开发流程的原则、注意事项、可能存在的问题点展示出来,其中也会有笔者自己的一些个人总结数据采集中的原则,为大家作参考,都是干货哦。

一、Binlog实时采集总结原则

首先抛开技术框架的讨论,个人总结Binlog日志的数据采集主要原则:

  • 原则一 :与业务数据解耦

  • 原则二 :与数据结构解耦

  • 原则三 :数据是可回溯的

分别阐述一下这三个原则的具体含义。

原则一

在数据采集中,数据落地一般都会使用时间分区进行落地,那就需要我们确定一下固定的时间戳作为时间分区的基础时间序列。

在这种情况下看来,业务数据上的时间戳字段,无论是从实际开发中获取此时间戳的角度,还是现实表中都会存在这样的时间戳,都不可能所有表完全满足。

举一下反例:

表 :业务时间戳(或事件时间)

table A :create_time,update_time

table B :create_time

table C :create_at

table D :无

像这样的情况,理论上可以通过限制RD和DBA的在设计表时规则化表结构来实现时间戳以及命名的统一化、做限制,但是是在实际工作中,这样的情况基本上是做不到的,相信很多读者也会遇到这样的情况。

可能很多做数据采集的同学会想,我们能不能要求他们去制定标准呢?

个人的想法是,可以,但是不能把大数据底层数据采集完全依靠这样互相制定的标准。

原因有以下三点:

  • 如果只是依靠两个部门或者多个部门制定的口头的或者书面的标准,却没有强制性在coding上面做约束,全部都是人为在约束的话,后期人员增加,迟早会出问题。

  • 大数据部门与后台部门,在于数据情况变更的情况,有时候可能是信息延时的,也就是说,有可能在数据落地后发现异常后,才知道后台部门做出了调整。

  • 也是最重要的一点,大数据部门不能要求在底层数据源去要求数据源去适应大数据的采集,这样要成的后果很有可能是限制后台部门在开发业务功能上的自由度,这样的开发流程也是不合理的。

所以如果想要使用唯一固定的时间序列,就要和业务的数据剥离开,我们想要的时间戳不受业务数据的变动的影响。

原则二

在业务数据库中,一定会存在表结构变更的问题,绝大部分情况为增加列,但是也会存在列重命名、列删除这类情况,而其中字段变更的顺序是不可控的。

此原则想描述的是,导入到数据仓库中的表,要适应数据库表的各种操作,保持其可用性与列数据的正确性。

原则三

此数据可回溯,其中包括两个方面:

  • 数据采集可回溯

  • 数据消费落地可回溯

第一个描述的是,在采集binlog采集端,可以重新按位置采集binlog。

第二个描述的是,在消费binlog落地的一端,可以重复消费把数据重新落地。

此为笔者个人总结,无论是选择什么样的技术选型进行组合搭建,这几点原则是需要具备的。

二、实现方案与具体操作

技术架构 :Debezium + Confluent + Kafka + OSS/S3 + Hive

基于原则一的解决方案

Debezium提供了New Record State Extraction的配置选项,相当于提供了一个transform算子,可以抽取出binlog中的元数据。

对于0.10版本的配置,可以抽取table,version,connector,name,ts_ms,db,server_id,file,pos,row等binlog元数据信息。

其中ts_ms为binlog日志的产生时间,此为binlog元数据,可以应用于所有数据表,而且可以在完全对数据表内部结构不了解的情况下,使用此固定时间戳,完全实现我们的原则一。

关于Debezium,不同版本之前的配置参数有可能是不同的,如果读者有需要实践的话需要在官方文档上确认相应版本的配置参数。

对于其他框架,例如市面上用的较多的Canal,或者读者有自己需要开发数据采集程序的话,binlog的元数据建议全部抽取出来,在此过程以及后续过程中都可能会被用到。

基于原则二的解决方案

对于Hive ,目前主流的数据存储格式为Parquet、ORC、Json、Avro这几种。

抛开数据存储的效率讨论。

对于前两种数据格式,为列存,也就是说,这两种数据格式的数据读取,会严格依赖于我们数据表中的数据存储的顺序,这样的数据格式,是无法满足数据列灵活增加、删除等操作的。

Avro格式为行存,但是它需要依赖于Schema Register服务,考虑Hive的数据表读取完全要依赖一个外部服务,风险过高。

最后确定使用Json格式进行数据存储,虽然这样的读取和存储效率没有其他格式高,但是这样可以保证业务数据的任何变更都可以在Hive中读取出来。

Debezium组件采集binlog的数据就是为json格式,和预期的设计方案是吻合的,可以解决原则二带来的问题。

对于其他框架,例如市面上用的较多的Canal,可以设置为Json数据格式进行传输,或者读者有自己需要开发数据采集程序的话,也是相同的道理。

基于原则三的解决方案

在采集binlog采集端,可以重新按位置采集binlog。

此方案实现方式在Debezium官方网站上也给出了相应的解决方案,大概描述一下,需要用到Kafkacat工具。

对于每一个采集的MySQL实例,创建数据采集任务时,Confluent都会相应的创建connector(也就是采集程序)的采集的元数据的topic,里面会存储相应的时间戳、文件位置、以及位置,可以通过修改此数据,重置采集binlog日志的位置。

值得注意的是,此操作的时间节点也是有限制的,和MySQL的binlog日志保存周期有关,所以此方式回溯时,需要确认的是MySQL日志还存在。

对于重复消费把数据重新落地。

此方案因为基于Kafka,对于Kafka重新制定消费offset消费位点的操作网上有很多方案,此处不再赘述。

对于读者自己实现的话,需要确认所选择的MQ支持此特性就好了。

Frequently Asked Questions:https://debezium.io/documentation/faq/#how_to_change_the_offsets_of_the_source_database

三、不同的业务场景

此部分只描述在笔者技术架构下如何实现以下操作,读者可以根据自己选择的技术组件探究不同的技术方案。

1、数据库分库分表的情况

基于Debezium的架构,一个Source端只能对应一个MySQL实例进行采集,对于同一实例上的分表情况,可以使用Debezium Topic Routing功能。

在采集过滤binlog时把相应需要采集的表按照正则匹配写入一个指定的topic中。

在分库的情况下,还需要在sink端增加RegexRouter transform算子进行topic间的合并写入操作。

2、数据增量采集与全量采集

对于采集组件,目前目前的配置都是以增量为默认,所以无论是选择Debezium还是Canal的话,正常配置就好。

但是有些时候会存在需要采集全表的情况,笔者也给出一下全量的数据采集的方案。

方案一:

Debezium本身自带了这样的功能,需要将snapshot.mode参数选型设置为when_needed,这样可以做表的全量采集操作。

官方文档中,在此处的参数配置有更加细致的描述。

Snapshots:https://debezium.io/documentation/reference/0.10/connectors/mysql.html#snapshots

方案二:

使用sqoop和增量采集同时使用的方式进行。

此方案适用于表数据已存在很多,而目前binlog数据频率不频繁的情况下,使用此方案。

值得注意的是有两点:

  • sqoop数据导入落地为Parquet格式,与增量采集数据合并时,需要做数据格式整合,也就是中间需要有临时表,通过union all的方式把数据merge到全量表中。

  • sqoop导入的Parquet格式,与Debezium处理某些数据类型时会存在不相同的问题,例如datetime类型,sqoop会导出string,Debezium会转化为bigint。

3、离线数据去重条件

数据落地后,通过json表映射出binlog原始数据,那么问题也就来了,我们如何找到最新的一条数据呢?

也许我们可以简单的认为,用我们刚刚的抽取的那个ts_ms,然后做倒排不就好了吗?

大部分情况下这样做确实是可以的。但是笔者在实际开发中,发现这样的情况是不能满足所有情况的,因为在binlog中,可能真的会存在ts_ms与PK相同,但是确实不同的两条数据。

那我们怎么去解决时间都相同的两条数据呢?答案就在上文,我们刚刚建议的把binlog的元数据都抽取出来。

SELECT *

FROM

(

SELECT *,

row_number() over(partition BY t.id ORDER BY t.`__ts_ms` DESC,t.`__file` DESC,cast(t.`__pos` AS int) DESC) AS order_by

FROM test t

WHERE dt='{pt}'

AND hour='{now_hour}'

) t1

WHERE t1.order_by = 1

解释一下这个sql中row_number的的条件:

  • __ts_ms :为binlog中的ts_ms,即事件时间。

  • __file :为binlog此条数据所在file name。

  • __pos :为binlog中此数据所在文件中的位置,为数据类型。

这样的条件组合取出的数据,就是最新的一条。

也许有读者会问,如果这条数据被删除了怎么办,你这样取出来的数据不就是错的了吗?

这个Debezium也有相应的操作,有相应的配置选项让你如何选择处理删除行为的binlog数据。

作为给大家的参考,笔者选择rewrite的参数配置,这样在上面的sql最外层只需要判断 “delete = ’false‘“ 就是正确的数据啦。

Debezium:https://debezium.io/documentation/reference/0.10/configuration/event-flattening.html

四、架构上的总结

在技术选型以及整体与细节的架构中,笔者始终坚持一个原则——流程尽量简约而不简单,数据环节越长,出问题的环节就可能越多,对于后期锁定问题与运维难度也会很高。

所以笔者在技术选型也曾考虑过Flink + Kafka的这种方式,但是基于当时的现状,笔者并没有选择这样的技术选型,笔者也阐述一下原因。

1)笔者的Flink环境没有做开发平台化与运维平台化。

2)场景偏向于数据采集和传输,而不是计算,Flink的优势特性并没有使用到很多。

3)如果基于一个MySQL实例开发一个Flink程序,使用原生的Flink steaming,做api式的程序开发,如果因为某些表的数据导致程序挂掉,这个实例的数据都无法采集了,这样的影响范围太大。

4)如果基于一个一个表或者通过正则的方式匹配一些表,做一个Flink程序,这样虽然是保证了灵活度,但是90%的代码都是冗余的,而且会有很多任务,浪费资源。

5)最后就是开发和维护效率的问题,如果只是写原生的Flink程序的话,后续的累加开发,会把程序变得越来越重,可能逻辑也会越来越繁琐。

总结起来,我当时对于Flink的思考,如果Flink没有做开发和运维监控的平台化的情况下,可以作为一个临时方案,但是后期如果一直在这样一个开发流程下缝缝补补,多人开发下很容易出现问题,或者就是大家都这样一个程序框架下造轮子,而且越造越慢。而且后期的主要项目方向并没有把Flink平台化提上日程,所以也是考虑了一部分未来的情况进行的选择。

因此个人最后确定技术选型的时候,并没有选用Flink。

五、结束语

本文笔者写得较为理论化,也是对此场景的一个技术思路方案总结。技术架构上的方案多种多样,笔者只是选择了其中一种进行实现,也希望大家有其他的技术方案或者理论进行交流,烦请指正。

作者丨李楠来源丨数据仓库与Python大数据(ID:dw_zzxx)dbaplus社群欢迎广大技术人员投稿,投稿邮箱:editor@dbaplus.cn
我们正在经历一个数据量高速膨胀的时代,但这些海量的、分散的异构数据导致了数据资源价值低、应用难度大等问题。如何将海量数据充分挖掘与运用,来支撑决策、驱动业务发展、进行产品创新?如何利用大数据平台优化流程、服务、产品?可以说,所有的一切都离不开数据治理与数据资产管理。10月30日,DAMS中国数据智能管理峰会将在上海举办,专设数据治理及数据资产管理相关议题,抢先剧透如下:
  • 《企业数字化转型落地指南:从数据资产管理到数据中台(拟)》新炬网络 董事/副总经理 程永新
  • 《面向数据中台的数据治理如何建设与落地(拟)》网易 大数据总经理 余利华
  • 《大数据资产管理平台的设计、研发、运营实践》中国联通大数据 基础平台负责人 尹正军
  • 《腾讯游戏大数据资产管理实战:元数据管理与数据治理(拟)》腾讯游戏 大数据管理负责人 刘天斯
  • 《字节跳动数据治理实践》今日头条 数据BP/数据治理负责人 任长延
立即扫码享受限时优惠,一起探讨大数据战略从顶层设计到底层实现的落地过程。

cf60990e6104cbf017f30e80b9781d72.png



推荐阅读
  • 本文详细介绍了MysqlDump和mysqldump进行全库备份的相关知识,包括备份命令的使用方法、my.cnf配置文件的设置、binlog日志的位置指定、增量恢复的方式以及适用于innodb引擎和myisam引擎的备份方法。对于需要进行数据库备份的用户来说,本文提供了一些有价值的参考内容。 ... [详细]
  • 本文介绍了在Mac上搭建php环境后无法使用localhost连接mysql的问题,并通过将localhost替换为127.0.0.1或本机IP解决了该问题。文章解释了localhost和127.0.0.1的区别,指出了使用socket方式连接导致连接失败的原因。此外,还提供了相关链接供读者深入了解。 ... [详细]
  • r2dbc配置多数据源
    R2dbc配置多数据源问题根据官网配置r2dbc连接mysql多数据源所遇到的问题pom配置可以参考官网,不过我这样配置会报错我并没有这样配置将以下内容添加到pom.xml文件d ... [详细]
  • 本文介绍了如何使用PHP代码将表格导出为UTF8格式的Excel文件。首先,需要连接到数据库并获取表格的列名。然后,设置文件名和文件指针,并将内容写入文件。最后,设置响应头部,将文件作为附件下载。 ... [详细]
  • 什么是大数据lambda架构
    一、什么是Lambda架构Lambda架构由Storm的作者[NathanMarz]提出,根据维基百科的定义,Lambda架构的设计是为了在处理大规模数 ... [详细]
  • 解决VS写C#项目导入MySQL数据源报错“You have a usable connection already”问题的正确方法
    本文介绍了在VS写C#项目导入MySQL数据源时出现报错“You have a usable connection already”的问题,并给出了正确的解决方法。详细描述了问题的出现情况和报错信息,并提供了解决该问题的步骤和注意事项。 ... [详细]
  • 本文详细介绍了MySQL表分区的创建、增加和删除方法,包括查看分区数据量和全库数据量的方法。欢迎大家阅读并给予点评。 ... [详细]
  • 本文介绍了如何使用C#制作Java+Mysql+Tomcat环境安装程序,实现一键式安装。通过将JDK、Mysql、Tomcat三者制作成一个安装包,解决了客户在安装软件时的复杂配置和繁琐问题,便于管理软件版本和系统集成。具体步骤包括配置JDK环境变量和安装Mysql服务,其中使用了MySQL Server 5.5社区版和my.ini文件。安装方法为通过命令行将目录转到mysql的bin目录下,执行mysqld --install MySQL5命令。 ... [详细]
  • ubuntu用sqoop将数据从hive导入mysql时,命令: ... [详细]
  • 本文介绍了将mysql从5.6.15升级到5.7.15的详细步骤,包括关闭访问、备份旧库、备份权限、配置文件备份、关闭旧数据库、安装二进制、替换配置文件以及启动新数据库等操作。 ... [详细]
  • WhenIusepythontoapplythepymysqlmoduletoaddafieldtoatableinthemysqldatabase,itdo ... [详细]
  • Windows7 64位系统安装PLSQL Developer的步骤和注意事项
    本文介绍了在Windows7 64位系统上安装PLSQL Developer的步骤和注意事项。首先下载并安装PLSQL Developer,注意不要安装在默认目录下。然后下载Windows 32位的oracle instant client,并解压到指定路径。最后,按照自己的喜好对解压后的文件进行命名和压缩。 ... [详细]
  • PDO MySQL
    PDOMySQL如果文章有成千上万篇,该怎样保存?数据保存有多种方式,比如单机文件、单机数据库(SQLite)、网络数据库(MySQL、MariaDB)等等。根据项目来选择,做We ... [详细]
  • 2018深入java目标计划及学习内容
    本文介绍了作者在2018年的深入java目标计划,包括学习计划和工作中要用到的内容。作者计划学习的内容包括kafka、zookeeper、hbase、hdoop、spark、elasticsearch、solr、spring cloud、mysql、mybatis等。其中,作者对jvm的学习有一定了解,并计划通读《jvm》一书。此外,作者还提到了《HotSpot实战》和《高性能MySQL》等书籍。 ... [详细]
  • 本文分析了Wince程序内存和存储内存的分布及作用。Wince内存包括系统内存、对象存储和程序内存,其中系统内存占用了一部分SDRAM,而剩下的30M为程序内存和存储内存。对象存储是嵌入式wince操作系统中的一个新概念,常用于消费电子设备中。此外,文章还介绍了主电源和后备电池在操作系统中的作用。 ... [详细]
author-avatar
彭德利
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有