热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

MySQL数据实时同步到KafkaBinlogcanal、Maxwell、KafkaConnect实现MySQL增量同步

一、需求分析早期业务借助Sqoop将Mysql中的数据同步到Hive、hdfs来进行数据分析,使用过程中也带来了一些问题:虽然Sqoop支持增量同步但还属于粗粒度的离线同步,无法满

一、需求分析

早期业务借助Sqoop将Mysql中的数据同步到Hive、hdfs来进行数据分析,使用过程中也带来了一些问题:

  • 虽然Sqoop支持增量同步但还属于粗粒度的离线同步,无法满足下游数仓实时性的需求(可能一个小时,或者一天) 
  • 每次同步Sqoop以sql的方式向Mysql发出数据请求也在一定程度上对Mysql带来一定的压力
  • 同时Hive对数据更新的支持也相对较弱,由于Hive本身的语法不支持更新、删除等SQL原语,对于MySQL中发生Update/Delete的数据无法很好地进行支持

现在的需求是需要数仓和mysql中的数据保持在秒级别的一致;

我们想到了MySQL主从复制时使用的binlog日志,它记录了所有的 DDL 和 DML 语句(除了数据查询语句select、show等),以事件形式记录,还包含语句所执行的消耗时间。

目前的思路就是实时监听mysql binlog日志,使mysql中的数据变化实时同步到数仓中(这里需要注意的是,我们只监听insert,update,delete这几种event)

binlog记录了Mysql数据的实时变化,是数据同步的基础,服务需要做的就是遵守Mysql的协议,将自己伪装成Mysql的slave来监听业务从库,完成数据实时同步。

二、MySQL主从复制的原理,主要有以下几个步骤:

  1. master(主库)在每次准备提交事务完成数据更新前,将改变记录到二进制日志(binary log)中
  2. slave(从库)发起连接,连接到master,请求获取指定位置的binlog文件
  3. master创建dump线程,推送binlog的slave
  4. slave启动一个I/O线程来读取主库上binary log中的事件,并记录到slave自己的中继日志(relay log)中
  5. slave还会起动一个SQL线程,该线程从relay log中读取事件并在备库执行,完成数据同步
  6. slave记录自己的binlog

 

三、整体架构: 直接解析binlog日志,然后解析后的数据写入kafka。

1)监听mysql的binlog日志工具分析:canal、Maxwell、Databus、DTS  
https://blog.csdn.net/weixin_38071106/article/details/88547660

2)mysql-binlog-connector-java (jar包直接读取binlog,然后send to kafka)

https://www.jianshu.com/p/5acb30ec8347

整体的架构如上图所示。在Binlog实时采集方面,我们采用了阿里巴巴的开源项目Canal,负责从MySQL实时拉取Binlog并完成适当解析。Binlog采集后会暂存到Kafka上供下游消费。整体实时采集部分如图中红色箭头所示。

离线处理Binlog的部分,如图中黑色箭头所示,通过下面的步骤在Hive上还原一张MySQL表:

  1. 采用Linkedin的开源项目Camus,负责每小时把Kafka上的Binlog数据拉取到Hive上。

  2. 对每张ODS表,首先需要一次性制作快照(Snapshot),把MySQL里的存量数据读取到Hive上,这一过程底层采用直连MySQL去Select数据的方式。

  3. 对每张ODS表,每天基于存量数据和当天增量产生的Binlog做Merge,从而还原出业务数据。


四、Binlog实时采集

1、开启mysql的binlog

(1)binlog简介

binlog,即二进制日志,它记录了数据库上的所有改变,并以二进制的形式保存在磁盘中;

记录数据库增删改,不记录查询的二进制日志.语句以“事件”的形式保存,它描述数据更改。

它可以用来查看数据库的变更历史、数据库增量备份和恢复、Mysql的复制(主从数据库的复制)。

(2)Binary Log记录方式

binlog记录更新事件的方式有三种:

一:statement(基于语句的复制),保存的是高层的sql语句,优点是传输的数据比较少,缺点是很难做到主从一致,譬如rand()会在不同的地方产生不同的值

二:row(基于行的复制),5.1.5版本的MySQL才开始支持row level的复制,它不记录sql语句上下文相关信息,仅保存哪条记录被修改。仅需要记录那一条记录被修改成什么了。

保存的是记录变化前和变化后的数据,优点是不容易出错,可以做到原样复制,缺点是传输的数据可能会很多,譬如某个delete语句删除一个表里几百万行,基于statement的方式只会产生一个event,而基于row的方式会产生几百万条

三:mixed(混合方式),不会在从库产生歧义的语句只记录sql语句,会产生歧义的语句使用row方式,兼顾前两者的优点

选择哪种可以在my.cnf里的binlog_formate进行修改

ps:在后续开发中将使用Row格式

(3)开启mysql的binlog

a.修改 /etc/my.cnf 配置

  1. log-bin=mysql-bin # 开启binlog

  2. binlog-format=ROW # 设置Binary Log记录方式为Row

  3. server_id=1 # 记住id 后续开发会使用

  4. # 指定binlog日志文件的名字为mysql-bin,以及其存储路径

  5. # 如果没有对log-bin指定log文件,默认在 /var/lib/mysql目录下以mysqld-bin.00000X等作为名称。

  6. # 而 mysqld-bin.index则记录了所有的log的文件名称

  7. # 使用时则使用mysqlbinlog /var/lib/mysql|grep "*****"等来追踪database的操作。

  8.  
  9. log-bin=/var/lib/mysql/mysql-bin

  10. datadir=/var/lib/mysql

  11. socket=/var/lib/mysql/mysql.sock

  12.  

b.重启mysql


service mysqld restart

c.查看开启状态

输入 show variables like 'log_bin'; 查看binlog开启状态。如下图所示。

输入 show variables like 'binlog_format';  查看Binary Log记录方式。如下图所示。

  1. mysql> show variables like 'log_%';

  2. +---------------------------------+-------------+

  3. | Variable_name | Value |

  4. +---------------------------------+-------------+

  5. | log_bin | ON |

  6. | log_bin_trust_function_creators | OFF |

  7. | log_error | .\mysql.err |

  8. | log_queries_not_using_indexes | OFF |

  9. | log_slave_updates | OFF |

  10. | log_slow_queries | ON |

  11. | log_warnings | 1 |

  12. +---------------------------------+-------------+

  13.  
  14. 没有开启log_bin的值是OFF,开启之后是ON


  1. mysql> SHOW VARIABLES LIKE 'character%';

  2. +--------------------------+----------------------------+

  3. | Variable_name | Value |

  4. +--------------------------+----------------------------+

  5. | character_set_client | utf8 |

  6. | character_set_connection | utf8 |

  7. | character_set_database | latin1 |

  8. | character_set_filesystem | binary |

  9. | character_set_results | utf8 |

  10. | character_set_server | latin1 |

  11. | character_set_system | utf8 |

  12. | character_sets_dir | /usr/share/mysql/charsets/ |

  13. +--------------------------+----------------------------+


注意:每次服务器(数据库)重启,服务器会调用flush logs;,新创建一个binlog日志

        由于我之前重启过数据库,因此这里有mysql-bin.000001到mysql-bin.000002这2个文件。这里你们看到的

应该只有mysql-bin.000001和mysql-bin.index两个文件

查看指定binlog文件的内容



mysql> show binlog events in 'mysql-bin.000002';  或者cat /var/lib/mysql/mysql-bin.000002

  1. # /var/lib/mysql/mysql-bin.000002 是二进制文件路径

  2.  
  3. mysqlbinlog --no-defaults --database=test --start-datetime="2017-09-17 07:21:09" --stop-datetime="2019-09-19 07:59:50" mysql_bin.000002 > sanjiaomao.txt

  4. mysqlbinlog --no-defaults --database=test --start-datetime="2017-09-17 07:21:09" --stop-datetime="2019-09-19 07:59:50" mysql_bin.000002 | more

  5.  
  6. #v如果需要过滤,只查询insert,update,delete的语句,可以这样写:

  7. mysqlbinlog --no-defaults --database=test mysql_bin.000002 |grep update |more

  8.  
  9. mysqlbinlog --no-defaults --start-position=2098 --stop-position=2205 -d test /var/lib/mysql/mysql_bin.000002


5、mysql工具mysqlbinlog常用参数

mysqlbinlog命令常用参数

参数说明

-d ,--database=name

根据指定库拆分binlog(拆分单表binlog可通过SQL关键字过滤)

-r ,--result-file=name

指定解析binlog输出SQL语句的文件

-R,--read-from-remote-server

从mysql服务器读取binlog日志,是下面参数的别名

-j,--start-position=#

读取binlog的起始位置点,#号是具体的位置点

--stop-position=#

读取binlog的停止位置点,#号是具体的位置点

--start-datetime=name

读取binlog的起始位置点,name是具体的时间,格式为:2004-12-25 11:25:26

--stop-datetime=name

读取binlog的停止位置点,name是具体的时间,格式为:2004-12-25 11:25:26

--base64-output=decode-rows

解析row级别binlog日志的方法,例如:mysqlbinlog  --base64-output=decode-rows -v  mysqlbin.000016

方式一:mysql-binlog-connector-java

  1. import com.github.shyiko.mysql.binlog.BinaryLogClient;

  2. import com.github.shyiko.mysql.binlog.event.Event;

  3.  
  4. public class BinlogParse {

  5.  
  6. public static void main(String[] args) throws Exception {

  7. final BinaryLogClient client = new BinaryLogClient("10.23.92.189", 3306, "root", "hadoop");

  8. /*

  9. 这里需要注意的是,如果你不指定如下参数,程序将从mysql当前binlog位置开始同步数据,

  10. 这显然不是我们需要的,更多的时候我们需要灵活的从任意位置读取数据

  11. */

  12. client.setBinlogFilename("mysql-bin.000002");

  13. client.setBinlogPosition(123); //从指定的位置读取binlog,具体位置可以在mysql中查看

  14. client.registerEventListener(new BinaryLogClient.EventListener() {

  15.  
  16. public void onEvent(Event event) {

  17.  
  18. System.out.println(event.toString());

  19. System.out.println(client.getBinlogPosition());

  20.  
  21. }

  22. });

  23. client.connect();

  24. }

  25. }

  26. /*...之后我手动登录到mysql,分别进行了updata,insert,delete,监听到的log如下:

  27. Event{header=EventHeaderV4{timestamp=1529247253000, eventType=TABLE_MAP, serverId=1, headerLength=19, dataLength=34, nextPosition=346, flags=0}, data=TableMapEventData{tableId=110, database='testdb', table='hello', columnTypes=8, 15, columnMetadata=0, 40, columnNullability={1}}}

  28. Event{header=EventHeaderV4{timestamp=1529247253000, eventType=EXT_UPDATE_ROWS, serverId=1, headerLength=19, dataLength=46, nextPosition=411, flags=0}, data=UpdateRowsEventData{tableId=110, includedColumnsBeforeUpdate={0, 1}, includedColumns={0, 1}, rows=[

  29. {before=[3, Will], after=[3, David]}

  30. ]}}

  31. ...

  32. Event{header=EventHeaderV4{timestamp=1529247347000, eventType=TABLE_MAP, serverId=1, headerLength=19, dataLength=34, nextPosition=634, flags=0}, data=TableMapEventData{tableId=110, database='testdb', table='hello', columnTypes=8, 15, columnMetadata=0, 40, columnNullability={1}}}

  33. Event{header=EventHeaderV4{timestamp=1529247347000, eventType=EXT_WRITE_ROWS, serverId=1, headerLength=19, dataLength=31, nextPosition=684, flags=0}, data=WriteRowsEventData{tableId=110, includedColumns={0, 1}, rows=[

  34. [4, Frank]

  35. ]}}

  36. ...

  37. Event{header=EventHeaderV4{timestamp=1529247404000, eventType=TABLE_MAP, serverId=1, headerLength=19, dataLength=34, nextPosition=907, flags=0}, data=TableMapEventData{tableId=110, database='testdb', table='hello', columnTypes=8, 15, columnMetadata=0, 40, columnNullability={1}}}

  38. Event{header=EventHeaderV4{timestamp=1529247404000, eventType=EXT_DELETE_ROWS, serverId=1, headerLength=19, dataLength=30, nextPosition=956, flags=0}, data=DeleteRowsEventData{tableId=110, includedColumns={0, 1}, rows=[

  39. [2, Bill]

  40. ]}}

  41. */


读出来的binlog日志我们会封装成一个json字符串push到kafka,然后再从kafka消费数据到数仓,这样的话数仓中就有最实时的数据,olap引擎采用impala

需要注意的是delete from table 这种操作,表中数据是一条一条删除,如果表中数据非常大的话binlog日志也会非常大

二、和上面同样代码 https://www.jb51.net/article/166757.htm

  1. public static void main(String[] args) {

  2. BinaryLogClient client = new BinaryLogClient("hostname", 3306, "username", "passwd");

  3. EventDeserializer eventDeserializer = new EventDeserializer();

  4. eventDeserializer.setCompatibilityMode(

  5. EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG,

  6. EventDeserializer.CompatibilityMode.CHAR_AND_BINARY_AS_BYTE_ARRAY

  7. );

  8. client.setEventDeserializer(eventDeserializer);

  9. client.registerEventListener(new BinaryLogClient.EventListener() { @

  10. Override

  11. public void onEvent(Event event) {

  12. // TODO

  13. dosomething();

  14. logger.info(event.toString());

  15. }

  16. });

  17. client.connect();

  18. }

  19. /**

  20. 这个完全是根据官方教程里面写的,在onEvent里面可以写自己的业务逻辑,由于我只是测试,所以我在里面将每一个event都打印了出来.

  21. 之后我手动登录到mysql,分别进行了增加,修改,删除操作,监听到的log如下:

  22. 00:23:13.331 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=0, eventType=ROTATE, serverId=1, headerLength=19, dataLength=28, nextPosition=0, flags=32}, data=RotateEventData{binlogFilename='mysql-bin.000001', binlogPosition=886}}

  23. 00:23:13.334 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468403000, eventType=FORMAT_DESCRIPTION, serverId=1, headerLength=19, dataLength=100, nextPosition=0, flags=0}, data=FormatDescriptionEventData{binlogVersion=4, serverVersion='5.7.23-0ubuntu0.16.04.1-log', headerLength=19, dataLength=95}}

  24. 00:23:23.715 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468603000, eventType=ANONYMOUS_GTID, serverId=1, headerLength=19, dataLength=46, nextPosition=951, flags=0}, data=null}

  25. 00:23:23.716 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468603000, eventType=QUERY, serverId=1, headerLength=19, dataLength=51, nextPosition=1021, flags=8}, data=QueryEventData{threadId=4, executiOnTime=0, errorCode=0, database='pf', sql='BEGIN'}}

  26. 00:23:23.721 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468603000, eventType=TABLE_MAP, serverId=1, headerLength=19, dataLength=32, nextPosition=1072, flags=0}, data=TableMapEventData{tableId=108, database='pf', table='student', columnTypes=15, 3, columnMetadata=135, 0, columnNullability={}}}

  27. 00:23:23.724 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468603000, eventType=EXT_WRITE_ROWS, serverId=1, headerLength=19, dataLength=23, nextPosition=1114, flags=0}, data=WriteRowsEventData{tableId=108, includedColumns={0, 1}, rows=[

  28. [[B@546a03af, 2]

  29. ]}}

  30. 00:23:23.725 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468603000, eventType=XID, serverId=1, headerLength=19, dataLength=12, nextPosition=1145, flags=0}, data=XidEventData{xid=28}}

  31. 00:23:55.872 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468635000, eventType=ANONYMOUS_GTID, serverId=1, headerLength=19, dataLength=46, nextPosition=1210, flags=0}, data=null}

  32. 00:23:55.872 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468635000, eventType=QUERY, serverId=1, headerLength=19, dataLength=51, nextPosition=1280, flags=8}, data=QueryEventData{threadId=4, executiOnTime=0, errorCode=0, database='pf', sql='BEGIN'}}

  33. 00:23:55.873 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468635000, eventType=TABLE_MAP, serverId=1, headerLength=19, dataLength=32, nextPosition=1331, flags=0}, data=TableMapEventData{tableId=108, database='pf', table='student', columnTypes=15, 3, columnMetadata=135, 0, columnNullability={}}}

  34. 00:23:55.875 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468635000, eventType=EXT_UPDATE_ROWS, serverId=1, headerLength=19, dataLength=31, nextPosition=1381, flags=0}, data=UpdateRowsEventData{tableId=108, includedColumnsBeforeUpdate={0, 1}, includedColumns={0, 1}, rows=[

  35. {before=[[B@6833ce2c, 1], after=[[B@725bef66, 3]}

  36. ]}}

  37. 00:23:55.875 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468635000, eventType=XID, serverId=1, headerLength=19, dataLength=12, nextPosition=1412, flags=0}, data=XidEventData{xid=41}}

  38. 00:24:22.333 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468662000, eventType=ANONYMOUS_GTID, serverId=1, headerLength=19, dataLength=46, nextPosition=1477, flags=0}, data=null}

  39. 00:24:22.334 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468662000, eventType=QUERY, serverId=1, headerLength=19, dataLength=51, nextPosition=1547, flags=8}, data=QueryEventData{threadId=4, executiOnTime=0, errorCode=0, database='pf', sql='BEGIN'}}

  40. 00:24:22.334 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468662000, eventType=TABLE_MAP, serverId=1, headerLength=19, dataLength=32, nextPosition=1598, flags=0}, data=TableMapEventData{tableId=108, database='pf', table='student', columnTypes=15, 3, columnMetadata=135, 0, columnNullability={}}}

  41. 00:24:22.335 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468662000, eventType=EXT_DELETE_ROWS, serverId=1, headerLength=19, dataLength=23, nextPosition=1640, flags=0}, data=DeleteRowsEventData{tableId=108, includedColumns={0, 1}, rows=[

  42. [[B@1888ff2c, 3]

  43. ]}}

  44. 00:24:22.335 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468662000, eventType=XID, serverId=1, headerLength=19, dataLength=12, nextPosition=1671, flags=0}, data=XidEventData{xid=42}}

  45. */


实现思路

  1. 支持对单个表的监听,因为我们不想真的对所有数据库中的所有数据表进行监听.
  2. 可以多线程消费.
  3. 把监听到的内容转换成我们喜闻乐见的形式(文中的数据结构不一定很好,我没想到更加合适的了).

实现思路大致如下:

  1. 封装个客户端,对外只提供获取方法,屏蔽掉初始化的细节代码.
  2. 提供注册监听器(伪)的方法,可以注册对某个表的监听(重新定义一个监听接口,所有注册的监听器实现这个就好).
  3. 真正的监听器只有客户端,他将此数据库实例上的所有操作,全部监听到并转换成我们想要的格式LogItem放进阻塞队列里面.
  4. 启动多个线程,消费阻塞队列,对某一个LogItem调用对应的数据表的监听器,做一些业务逻辑.

  1. //初始化代码:

  2. public MysqlBinLogListener(Conf conf) {

  3. BinaryLogClient client = new BinaryLogClient(conf.host, conf.port, conf.username, conf.passwd);

  4. EventDeserializer eventDeserializer = new EventDeserializer();

  5. eventDeserializer.setCompatibilityMode(

  6. EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG,

  7. EventDeserializer.CompatibilityMode.CHAR_AND_BINARY_AS_BYTE_ARRAY

  8. );

  9. client.setEventDeserializer(eventDeserializer);

  10. this.parseClient = client;

  11. this.queue = new ArrayBlockingQueue <> (1024);

  12. this.cOnf= conf;

  13. listeners = new ConcurrentHashMap <> ();

  14. dbTableCols = new ConcurrentHashMap <> ();

  15. this.cOnsumer= Executors.newFixedThreadPool(consumerThreads);

  16. }

  17.  
  18.  
  19. //注册代码:

  20. public void regListener(String db, String table, BinLogListener listener) throws Exception {

  21. String dbTable = getdbTable(db, table);

  22. Class.forName("com.mysql.jdbc.Driver");

  23. // 保存当前注册的表的colum信息

  24. Connection cOnnection= DriverManager.getConnection("jdbc:mysql://" + conf.host + ":" + conf.port, conf.username, conf.passwd);

  25. Map cols = getColMap(connection, db, table);

  26. dbTableCols.put(dbTable, cols); 

  27. // 保存当前注册的listener

  28. List list = listeners.getOrDefault(dbTable, new ArrayList <> ());

  29. list.add(listener);

  30. listeners.put(dbTable, list);

  31. }

  32.  
  33. //在这个步骤中,我们在注册监听者的同时,获得了该表的schema信息,并保存到map里面去,方便后续对数据进行处理.

  34. //监听代码:

  35. @Override

  36. public void onEvent(Event event) {

  37. EventType eventType = event.getHeader().getEventType(); 

  38. if (eventType == EventType.TABLE_MAP) {

  39. TableMapEventData tableData = event.getData();

  40. String db = tableData.getDatabase();

  41. String table = tableData.getTable();

  42. dbTable = getdbTable(db, table);

  43. // 只处理添加删除更新三种操作

  44. if (isWrite(eventType) || isUpdate(eventType) || isDelete(eventType)) {

  45. if (isWrite(eventType)) {

  46. WriteRowsEventData data = event.getData();

  47. for (Serializable[] row: data.getRows()) { 

  48. if (dbTableCols.containsKey(dbTable)) { 

  49. LogItem e = LogItem.itemFromInsert(row, dbTableCols.get(dbTable)); 

  50. e.setDbTable(dbTable); 

  51. queue.add(e); 

  52. }

  53. }

  54. }

  55. }

  56. }

  57.  
  58. //消费代码:

  59.  
  60. public void parse() throws IOException {

  61. parseClient.registerEventListener(this); 

  62. for (int i = 0; i

  63. consumer.submit(() - > {

  64. while (true) { 

  65. if (queue.size() > 0) { 

  66. try { 

  67. LogItem item = queue.take(); 

  68. String dbtable = item.getDbTable(); 

  69. listeners.get(dbtable).forEach(l - > { 

  70. l.onEvent(item); 

  71. });  

  72. } catch (InterruptedException e) { 

  73. e.printStackTrace(); 

  74. Thread.sleep(1000);

  75. }

  76. });

  77. }

  78. parseClient.connect();

  79. }

  80.  
  81. //消费时,从队列中获取item,之后获取对应的一个或者多个监听者,分别消费这个item.

  82. //测试代码:

  83.  
  84. public static void main(String[] args) throws Exception {

  85. Conf cOnf= new Conf();

  86. conf.host = "hostname";

  87. conf.port = 3306;

  88. conf.username = conf.passwd = "hhsgsb"; 

  89. MysqlBinLogListener mysqlBinLogListener = new MysqlBinLogListener(conf);

  90. mysqlBinLogListener.parseArgsAndRun(args);

  91. mysqlBinLogListener.regListener("pf", "student", item - > {

  92. System.out.println(new String((byte[]) item.getAfter().get("name")));

  93. logger.info("insert into {}, value = {}", item.getDbTable(), item.getAfter());

  94. });

  95. mysqlBinLogListener.regListener("pf", "teacher", item - > System.out.println("teacher ====")); 

  96. mysqlBinLogListener.parse();

  97. }


在这段很少的代码里,注册了两个监听者,分别监听student和teacher表,并分别进行打印处理,经测试,在teacher表插入数据时,可以独立的运行定义的业务逻辑.

注意:这里的工具类并不能直接投入使用,因为里面有许多的异常处理没有做,且功能仅监听了插入语句,可以用来做实现的参考.

方式二、基于Spark Streaming + Canal + Kafka对Mysql增量数据实时进行监测分析

基于Spark Streaming + Canal + Kafka对Mysql增量数据实时进行监测分析https://blog.csdn.net/github_39577257/article/details/88661052

方式三、基于Maxwell详解

  1. MySQL 配置权限

  2.  
  3. --创建 用户

  4. mysql> CREATE USER canal IDENTIFIED BY 'canal';

  5. mysql>CREATE USER 'maxwell_sync'@'%' IDENTIFIED BY '123456';

  6. -- % : 匹配所有主机

  7. -- Maxwell需要在待同步的库上建立schema_database库,将状态存储在`schema_database`选项指定的数据库中(默认为`maxwell`)

  8.  
  9. --授权

  10. #全部权限

  11. mysql> GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;

  12. mysql> GRANT ALL PRIVILEGES ON *.* TO ‘username’@‘%’ IDENTIFIED BY 'password';

  13. mysql> grant all privileges on databasename.* to username;

  14. mysql> GRANT ALL PRIVILEGES on maxwell.* to 'maxwell_sync'@'192.168.85.133';

  15. --解释:grant all指的是授权所有操作权限(增删改查),*.*指的是所有数据库,maxwell指的是数据库名,maxwell_sync是用户,192.168.85.133指的是所要授权的远程IP地址

  16.  
  17. mysql>GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'maxwell_sync'@'%';

  18. #部分权限

  19. mysql> grant select,insert,update,delete,create on databasename.* to username;

  20.  
  21. mysql>FLUSH PRIVILEGES; -- 刷新mysql用户权限,使修改生效


启动Maxwell

  1. 主要是配置config.properties文件,重要的配置参考如下:

  2. log_level=info

  3. host=

  4. user=

  5. password=

  6. port=

  7. jdbc_optiOns=autoRecOnnect=true // mysql 超时重连

  8. schema_database= // 用于在mysql中新建一个binlog相关的数据库实例

  9. producer=kafka

  10. kafka.bootstrap.servers=

  11. kafka_topic=

  12. kafka.compression.type=snappy

  13. kafka.retries=1

  14. kafka.acks=1

  15. kinesis_stream=maxwell

  16. include_dbs= // 需要处理的数据库实例

  17. include_tables= // 需要处理的表格,用逗号分隔

  18. kafka_version=0.9.0.1

  19. client_id= // 标识符,可以包含英文

  20. replica_server_id= // 只能是数字

  21. expire_logs_days=0 //防止binlog断,maxwell失败

  22.  
  23. # 可以用配置文件,也可以直接跟参数

  24.  
  25. 启动maxwell 配置文件

  26. nohup bin/maxwell --config config.properties --log_level DEBUG &


  1. #输出到kafka

  2. bin/maxwell --user='maxwell_sync'

  3. --password='123456'

  4. --host='192.168.85.133'

  5. --port=3306

  6. --producer=kafka

  7. --kafka.bootstrap.servers=192.168.85.133:9092

  8. --kafka_version=0.9

  9. --kafka_topic=mysql_binlog

  10.  
  11. 可以多跟几个参数对mysql的binlog进行过滤,只筛选某些数据库里的某些表

  12. include_dbs,

  13. exclude_dbs,

  14. include_tables,

  15. exclude_tables

  16.  
  17. #抽取多个库可添加include_dbs,逗号分隔

  18. #指定数据表 include_tables ,逗号分隔

  19. #maxwell模拟mysql slave,所以多个maxwell进程时,每个进程的client.id及replica_server_id保证不同

  20. #binlog如果断了,可能会maxwell失败,最好设置mysql的expire_logs_days=0

  21.  
  22. #输出到控制台用如下配置

  23. bin/maxwell --user='maxwell_sync' --password='123456' --host='192.168.85.133' --producer=stdout


测试STDOUT:

  1. mysql> insert into ruozedata(id,name,age,address) values(999,'jepson',18,'www.ruozedata.com');

  2.  
  3. maxwell输出:

  4. {

  5. "database": "ruozedb",

  6. "table": "ruozedata",

  7. "type": "insert",

  8. "ts": 1525959044,

  9. "xid": 201,

  10. "commit": true,

  11. "data": {

  12. "id": 999,

  13. "name": "jepson",

  14. "age": 18,

  15. "address": "www.ruozedata.com",

  16. "createtime": "2018-05-10 13:30:44",

  17. "creuser": null,

  18. "updatetime": "2018-05-10 13:30:44",

  19. "updateuser": null

  20. }

  21. }


  1. mysql> update ruozedata set age=29 where id=999;

  2. 问题: ROW,你觉得binlog更新几个字段?

  3.  
  4. maxwell输出:

  5. {

  6. "database": "ruozedb",

  7. "table": "ruozedata",

  8. "type": "update",

  9. "ts": 1525959208,

  10. "xid": 255,

  11. "commit": true,

  12. "data": {

  13. "id": 999,

  14. "name": "jepson",

  15. "age": 29,

  16. "address": "www.ruozedata.com",

  17. "createtime": "2018-05-10 13:30:44",

  18. "creuser": null,

  19. "updatetime": "2018-05-10 13:33:28",

  20. "updateuser": null

  21. },

  22. "old": {

  23. "age": 18,

  24. "updatetime": "2018-05-10 13:30:44"

  25. }

  26. }


改变数据库内容可看到如下结果:

 启动kafka

  1. #开启kafka消费者

  2. kafka-console-consumer.sh --zookeeper localhost:2181 --topic rotopic --from-beginning


1、Merge

Binlog成功入仓后,下一步要做的就是基于Binlog对MySQL数据进行还原。Merge流程做了两件事,首先把当天生成的Binlog数据存放到Delta表中,然后和已有的存量数据做一个基于主键的Merge。Delta表中的数据是当天的最新数据,当一条数据在一天内发生多次变更时,Delta表中只存储最后一次变更后的数据。

把Delta数据和存量数据进行Merge的过程中,需要有唯一键来判定是否是同一条数据。如果同一条数据既出现在存量表中,又出现在Delta表中,说明这一条数据发生了更新,则选取Delta表的数据作为最终结果;否则说明没有发生任何变动,保留原来存量表中的数据作为最终结果。Merge的结果数据会Insert Overwrite到原表中,即图中的origindb.table

Merge流程举例

下面用一个例子来具体说明Merge的流程。

数据表共id、value两列,其中id是主键。在提取Delta数据时,对同一条数据的多次更新,只选择最后更新的一条。所以对id=1的数据,Delta表中记录最后一条更新后的值value=120。Delta数据和存量数据做Merge后,最终结果中,新插入一条数据(id=4),两条数据发生了更新(id=1和id=2),一条数据未变(id=3)。

默认情况下,我们采用MySQL表的主键作为这一判重的唯一键,业务也可以根据实际情况配置不同于MySQL的唯一键。

合并更新删除操作 : https://blog.csdn.net/qq_22473611/article/details/102901625

2、Delete删除

Delete操作在MySQL中非常常见,由于Hive不支持Delete,如果想把MySQL中删除的数据在Hive中删掉,需要采用“迂回”的方式进行。

对需要处理Delete事件的Merge流程,采用如下两个步骤:

  • 首先,提取出发生了Delete事件的数据,由于Binlog本身记录了事件类型,这一步很容易做到。将存量数据(表A)与被删掉的数据(表B)在主键上做左外连接(Left outer join),如果能够全部join到双方的数据,说明该条数据被删掉了。因此,选择结果中表B对应的记录为NULL的数据,即是应当被保留的数据。

  • 然后,对上面得到的被保留下来的数据,按照前面描述的流程做常规的Merge。


 

二、Kafka Connect 实现MySQL增量同步(A表实时同步到B表)

https://www.jianshu.com/p/46b6fa53cae4

3. Kafka Connect

3.1 Connector

Kafka Connect是一个用于Kafka和其他数据系统之间进行数据传输的工具,它可以实现基于Kafka的数据管道,打通上下游数据源。我们需要做的就是在Kafka Connect服务上运行一个Connector,这个Connector是具体实现如何从/向数据源中读/写数据。Confluent提供了很多Connector实现,你可以在这里下载。不过今天我们使用Debezium提供的一个MySQL Connector插件,下载地址。

下载这个插件,并将解压出来的jar包全部拷贝到kafka lib目录下。注意:需要将这些jar包拷贝到Kafka集群所有机器上。

本文将使用Kafka Connect 实现MySQL增量同步,设计三种模式,分别为incrementing、timestamp、 timestamp+incrementing

1、incrementing 自增模式(A表实时同步到B表)

切换目录 *\kafka_2.11-2.0.1\config

quickstart-mysql.properties(source)

  1. name=mysql-a-source-person

  2. connector.class=io.confluent.connect.jdbc.JdbcSourceConnector

  3. tasks.max=1

  4. connection.url=jdbc:mysql://localhost:3306/A?user=***&password=***

  5. # incrementing 自增

  6. mode=incrementing

  7. # 自增字段 pid

  8. incrementing.column.name=pid

  9. # 白名单表 person

  10. table.whitelist=person

  11. # topic前缀 mysql-kafka-

  12. topic.prefix=mysql-kafka-


quickstart-mysql-sink.properties(sink)

  1. name=mysql-a-sink-person

  2. connector.class=io.confluent.connect.jdbc.JdbcSinkConnector

  3. tasks.max=1

  4. #kafka的topic名称

  5. topics=mysql-kafka-person

  6. # 配置JDBC链接

  7. connection.url=jdbc:mysql://localhost:3306/B?user=***&password=***

  8. # 不自动创建表,如果为true,会自动创建表,表名为topic名称

  9. auto.create=false

  10. # upsert model更新和插入

  11. insert.mode=upsert

  12. # 下面两个参数配置了以pid为主键更新

  13. pk.mode = record_value

  14. pk.fields = pid

  15. #表名为kafkatable

  16. table.name.format=kafkaperson


启动 Kafka Connect

  1. D:\com\kafka_2.11-2.0.1\bin\windows>connect-standalone.bat

  2. D:/com/kafka_2.11-2.0.1/config/connect-standalone.properties

  3. D:/com/kafka_2.11-2.0.1/config/quickstart-mysql.properties

  4. D:/com/kafka_2.11-2.0.1/config/quickstart-mysql-sink.properties


2、timestamp 时间戳模式

D:\com\kafka_2.11-2.0.1\config

timestamp-mysql-source.properties(source)

  1. name=mysql-b-source-comments

  2. connector.class=io.confluent.connect.jdbc.JdbcSourceConnector

  3. tasks.max=1

  4. connection.url=jdbc:mysql://localhost:3306/A?user=***&password=***

  5. table.whitelist=comments

  6. mode=timestamp

  7. timestamp.column.name=commenttime

  8. topic.prefix=mysql-kafka-


timestamp-mysql-sink.properties(sink)

  1. name=mysql-b-sink-comments

  2. connector.class=io.confluent.connect.jdbc.JdbcSinkConnector

  3. tasks.max=1

  4. #kafka的topic名称

  5. topics=mysql-kafka-comments

  6. # 配置JDBC链接

  7. connection.url=jdbc:mysql://localhost:3306/B?user=***&password=***

  8. # 不自动创建表,如果为true,会自动创建表,表名为topic名称

  9. auto.create=false

  10. # upsert model更新和插入

  11. insert.mode=upsert

  12. # 下面两个参数配置了以id为主键更新

  13. pk.mode = record_value

  14. pk.fields = id

  15. #表名为kafkatable

  16. table.name.format=kafkacomments


  1. D:\com\kafka_2.11-2.0.1\bin\windows>connect-standalone.bat

  2. D:/com/kafka_2.11-2.0.1/config/connect-standalone.properties

  3. D:/com/kafka_2.11-2.0.1/config/timestamp-mysql-source.properties

  4. D:/com/kafka_2.11-2.0.1/config/timestamp-mysql-sink.properties


 

此时修改id为2和4的内容content,并修改评论时间commenttime


update comments set cOntent= "show test data" ,commenttime="2018-12-20 15:55:10" where id in(2,4)

3、timestamp+incrementing 时间戳自增混合模式

实验过程同方法2不做赘述,唯一变动的是source的config文件

  1. name=mysql-b-source-comments

  2. connector.class=io.confluent.connect.jdbc.JdbcSourceConnector

  3. tasks.max=1

  4. connection.url=jdbc:mysql://localhost:3306/A?user=***&password=***

  5. table.whitelist=comments

  6. mode=timestamp+incrementing

  7. timestamp.column.name=commenttime

  8. incrementing.column.name=id

  9. topic.prefix=mysql-kafka-



推荐阅读
  • 马蜂窝数据总监分享:从数仓到数据中台,大数据演进技术选型最优解
    大家好,今天分享的议题主要包括几大内容:带大家回顾一下大数据在国内的发展,从传统数仓到当前数据中台的演进过程;我个人认为数 ... [详细]
  • 不会搭建大数据平台,我被老板优化了...
    不会,搭建,大数,据,平台,我 ... [详细]
  • 数据仓库中基本概念
    一、数据仓库数据仓库(DataWarehouse)是一个面向主题的、集成的、稳定的且随时间变化的数据集合,用于支持管理人员的决策面向主题主题就是类型的意思。传统数 ... [详细]
  • 大数据开发笔记(一):HDFS介绍
    ✨大数据开发笔记推荐:大数据开发面试知识点总结_GoAI的博客-CSDN博客_大数据开发面试​本文详细介绍大数据hadoop生态圈各部分知识,包括不限 ... [详细]
  • 智能制造数据综合分析与应用解决方案
    在智能制造领域,生产数据通过先进的采集设备收集,并利用时序数据库或关系型数据库进行高效存储。这些数据经过处理后,通过可视化数据大屏呈现,为生产车间、生产控制中心以及管理层提供实时、精准的信息支持,助力不同应用场景下的决策优化和效率提升。 ... [详细]
  • Hudi是一种数据湖的存储格式,在Hadoop文件系统之上提供了更新数据和删除数据的能力以及流式消费变化数据的能力。应用场景近实时数据摄取Hudi支持插入、更新和删除数据的能力。您 ... [详细]
  • SSAS入门指南:基础知识与核心概念解析
    ### SSAS入门指南:基础知识与核心概念解析Analysis Services 是一种专为决策支持和商业智能(BI)解决方案设计的数据引擎。该引擎能够为报告和客户端应用提供高效的分析数据,并支持在多维数据模型中构建高性能的分析应用。通过其强大的数据处理能力和灵活的数据建模功能,Analysis Services 成为了现代 BI 系统的重要组成部分。 ... [详细]
  • 在Linux系统中,原本已安装了多个版本的Python 2,并且还安装了Anaconda,其中包含了Python 3。本文详细介绍了如何通过配置环境变量,使系统默认使用指定版本的Python,以便在不同版本之间轻松切换。此外,文章还提供了具体的实践步骤和注意事项,帮助用户高效地管理和使用不同版本的Python环境。 ... [详细]
  • sh cca175problem03evolveavroschema.sh ... [详细]
  • hive和mysql的区别是什么[mysql教程]
    hive和mysql的区别有:1、查询语言不同,hive是hql语言,MySQL是sql语句;2、数据存储位置不同,hive把数据存储在hdfs上,MySQL把数据存储在自己的系统 ... [详细]
  • 转载:https:blog.csdn.nethigh2011articledetails70155431清华大学镜像网:https:mirrors.tu ... [详细]
  • 以Flink为例,消除流处理常见的六大谬见
    以Flink为例,消除流处理常见的六大谬见 ... [详细]
  • 我们在之前的文章中已经初步介绍了Cloudera。hadoop基础----hadoop实战(零)-----hadoop的平台版本选择从版本选择这篇文章中我们了解到除了hadoop官方版本外很多 ... [详细]
  • 前言本文隶属于专栏《1000个问题搞定大数据技术体系》,该专栏为笔者原创,引用请注明来源,不足和错误之处请在评论区帮忙指出, ... [详细]
  • 怎么快速学好大数据开发?
    新如何学习大数据技术?大数据怎么入门?怎么做大数据分析?数据科学需要学习那些技术?大数据的应用前景等等问题,已成为热门大数据领域热门问题,以下是对新手如何学习大数据技术问题的解答! ... [详细]
author-avatar
shi6321
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有