作者:我爱板材_527 | 来源:互联网 | 2023-10-10 15:16
Canal和Kafka整合方案——解决Canal写入Kafka并发消费问题-一、问题描述在使用Canal读取binlog来对数据库增量进行同步时遇到一下几个问题首先是在使用C
一、问题描述
在使用Canal读取binlog来对数据库增量进行同步时遇到一下几个问题
- 首先是在使用Canal自带客户端进行同步时需要自己手动调用get()或者getWithoutAck()进行拉取
- 拉取日志后进行同步只能一条一条处理,效率比较低
- 如果处理日志过慢或者其他原因容易导致日志堆积在服务器
为了解决上面的问题打算在日志同步过程中引入MQ来作为中间同步,Canal支持RocketMQ和Kafka两种,最终选用Kafka来进行
二、引入Kafka
1.Canal整合Kafka及项目初步搭建
Kafka与Canal整合在Canal搭建中已经介绍过,整体搭建过程不变。
- 这次以一组房间数据作为示例,数据库:demo
- 表名:pricing_house_info
- Kafka的topic:house-topic
完成Canal搭建和整合Kafka之后,启动Canal,然后往pricing_house_info表中插入数据,并在Kafka消费数据,搭建SpringBoot项目并整合Kafka(SpringBoot整合Kafka在之前的博文中有描述:SpringBoot整合Kafka),然后完成相关配置后启动项目在Kafka控制台中看到Canal同步过来的数据表示搭建成功,并且在项目中也成功消费。
2.整合Kafka后引出新问题
此时如果按照这样的方式,确实能够缓解Canal服务器压力以及不再手动拉取日志的问题,但是这样的方式依然是在一条一条的消费消息,性能并未得到提升。
如何解决这样的问题?首先肯定想到的是多线程并发消费,如果我们单纯地用多线程并发消费的话并不能保证消息的有序性,这种binlog日志同步是需要严格有序性的,否则会导致数据错乱。那有没有办法能够保证顺序的情况下并发消费呢?答案是有的,即将指定数据发送到指定分区当中,然后起多个消费者消费不同分区的数据即可,并且Canal提供写入指定分区的配置
三、最终方案
1.修改Canal配置文件
在Canal配置文件中的mq config里面配置如下
# mq config
canal.mq.topic=house-topic
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
# canal.mq.partition=0
# hash partition config
canal.mq.partitiOnsNum=3
canal.mq.partitiOnHash=demo\\.pricing_house_info:house_code
这里面主要配置了canal.mq.partitionsNum和canal.mq.partitionHash两个参数,他们的意思如下:
- canal.mq.partitionsNum:指定当前topic的分区数
- canal.mq.partitionHash:指定到分区的分区规则,可以细化到字段
这里我们指定topic有3个分区,并且使用pricing_house_info表中的house_code字段来进行划分,即让相同house_code的数据全部到一个分区当中
2.修改项目代码
原先项目中只有一个消费,现在再添加两个消费的方法,让三个消费者能够消费不同分区的数据,通过@TopicPartition注解指定topic和对应的分区,并且可以同时消费多个分区的数据,三个消费者的groupId一定要保持一致,因为Kafka指定在一个group里面一条partition的消息只能被一个消费者消费
@Component
public class MessageListener {
@KafkaListener(topicPartitiOns= {@TopicPartition(topic = "house-topic",partitiOns= {"0"})}, groupId = "house-consumer-group")
public void partition1(ConsumerRecord, ?> record){
receive(record);
}
@KafkaListener(topicPartitiOns= {@TopicPartition(topic = "house-topic",partitiOns= {"1"})}, groupId = "house-consumer-group")
public void partition2(ConsumerRecord, ?> record){
receive(record);
}
@KafkaListener(topicPartitiOns= {@TopicPartition(topic = "house-topic",partitiOns= {"2"})}, groupId = "house-consumer-group")
public void partition3(ConsumerRecord, ?> record){
receive(record);
}
private void receive(ConsumerRecord, ?> record){
final String value = (String) record.value();
FlatMessage flatMessage = JSONObject.parseObject(value, FlatMessage.class);
final String houseCode = flatMessage.getData().get(0).get("house_code");
System.out.println("分区:"+record.partition()+"\t接收到数据的code:"+houseCode+"\t操作类别:"+flatMessage.getType());
}
}
通过这样的方式我们可以确保相同house_code的数据到同一个分区被同一个消费者有序消费且只消费一次,这样即可达到目的
3.整体架构
4.结果验证
全部配置修改好后重启Canal和项目,然后往数据库里写数据,控制台结果如下:
分区:0 接收到数据的code:CD000001 操作类别:INSERT
分区:1 接收到数据的code:CD000002 操作类别:INSERT
分区:0 接收到数据的code:CD000003 操作类别:INSERT
分区:1 接收到数据的code:CD000004 操作类别:INSERT
分区:2 接收到数据的code:CD000005 操作类别:INSERT
分区:0 接收到数据的code:CD000006 操作类别:INSERT
分区:2 接收到数据的code:CD000007 操作类别:INSERT
分区:0 接收到数据的code:CD000008 操作类别:INSERT
分区:1 接收到数据的code:CD000009 操作类别:INSERT
分区:0 接收到数据的code:CD000010 操作类别:INSERT
可以看到不同house_code的数据被写入了不同的分区,并且三个消费者消费到了来自三个分区的数据,此时如果在house_code为CD000010的数据上进行修改,得到如下结果:
分区:0 接收到数据的code:CD000010 操作类别:UPDATE
分区:0 接收到数据的code:CD000010 操作类别:UPDATE
分区:0 接收到数据的code:CD000010 操作类别:UPDATE
分区:0 接收到数据的code:CD000010 操作类别:UPDATE
此时更进一步证明了同一个partition只存固定的house_code的数据,保证了单个partition和消费者的消息有序性。
四、总结思考
利用Canal将数据根据字段写入不同分区且消费者消费指定分区数据,增加了消费的吞吐量,并且保证了单个消费者的消息有序性以及单条记录(同一house_code的数据)的处理有序性,本方案是在单行数据基础上来进行分区匹配的,还可以在表和数据库的基础上进行分区匹配,修改Canal参数即可。
这个方案是通过增加消费者来提高MQ的吞吐量,使数据处理更快实时性更高,如果要更进一步提高吞吐量还有什么方法呢?这里我能够想到的是在Consumer这里接收到消息后将消息放入队列中而不是直接处理,然后再根据单个Consumer中该队列中的house_code并发消费,结构如下:
但是就目前而言上面的方案已经足以解决问题,就不需要再将程序搞复杂了,待以后确实需要性能优化再考虑这样的方案吧
五、参考
Canal官方文档提供的相关配置
canal.mq.partitionHash 表达式说明:
canal 1.1.3版本之后, 支持配置格式:schema.table:pk1^pk2,多个配置之间使用逗号分隔
- 例子1:test\.test:pk1^pk2 指定匹配的单表,对应的hash字段为pk1 + pk2
- 例子2:.\..:id 正则匹配,指定所有正则匹配的表对应的hash字段为id
- 例子3:.\..:$pk$ 正则匹配,指定所有正则匹配的表对应的hash字段为表主键(自动查找)
- 例子4: 匹配规则啥都不写,则默认发到0这个partition上
- 例子5:.\.. ,不指定pk信息的正则匹配,将所有正则匹配的表,对应的hash字段为表名
按表hash: 一张表的所有数据可以发到同一个分区,不同表之间会做散列 (会有热点表分区过大问题)
- 例子6: test\.test:id,.\..* , 针对test的表按照id散列,其余的表按照table散列
注意:大家可以结合自己的业务需求,设置匹配规则,多条匹配规则之间是按照顺序进行匹配(命中一条规则就返回)
mq顺序性问题
binlog本身是有序的,写入到mq之后如何保障顺序是很多人会比较关注,在issue里也有非常多人咨询了类似的问题,这里做一个统一的解答
- canal目前选择支持的kafka/rocketmq,本质上都是基于本地文件的方式来支持了分区级的顺序消息的能力,也就是binlog写入mq是可以有一些顺序性保障,这个取决于用户的一些参数选择
- canal支持MQ数据的几种路由方式:单topic单分区,单topic多分区、多topic单分区、多topic多分区
- canal.mq.dynamicTopic,主要控制是否是单topic还是多topic,针对命中条件的表可以发到表名对应的topic、库名对应的topic、默认topic name
- canal.mq.partitionsNum、canal.mq.partitionHash,主要控制是否多分区以及分区的partition的路由计算,针对命中条件的可以做到按表级做分区、pk级做分区等
- canal的消费顺序性,主要取决于描述2中的路由选择,举例说明:
- 单topic单分区,可以严格保证和binlog一样的顺序性,缺点就是性能比较慢,单分区的性能写入大概在2~3k的TPS
- 多topic单分区,可以保证表级别的顺序性,一张表或者一个库的所有数据都写入到一个topic的单分区中,可以保证有序性,针对热点表也存在写入分区的性能问题
- 单topic、多topic的多分区,如果用户选择的是指定table的方式,那和第二部分一样,保障的是表级别的顺序性(存在热点表写入分区的性能问题),如果用户选择的是指定pk hash的方式,那只能保障的是一个pk的多次binlog顺序性 ** pk hash的方式需要业务权衡,这里性能会最好,但如果业务上有pk变更或者对多pk数据有顺序性依赖,就会产生业务处理错乱的情况. 如果有pk变更,pk变更前和变更后的值会落在不同的分区里,业务消费就会有先后顺序的问题,需要注意