一、基础概念
1、MQ持久化机制
如果我们生产者每发一条消息,都要MQ持久化到磁盘中,然后再发起ack或nack的回调。这样的话是不是我们MQ的吞吐量很不高,因为每次都要把消息持久化到磁盘中。写入磁盘这个动作是很慢的,在高并发场景下是不能够接受的,吞吐量太低了。
所以MQ持久化磁盘真实的实现,是通过异步调用处理的,他是有一定的机制,如:等到有几千条消息的时候,会一次性的刷盘到磁盘上面。而不是每来一条消息,就刷盘一次。
2、confirm机制的原理
(1)消息生产者把消息发送给MQ,如果接收成功,MQ会返回一个ack消息给生产者;
(2)如果消息接收不成功,MQ会返回一个nack消息给生产者;
3、消息幂等
幂等性:就是用户对于同一操作发起的一次请求或者多次请求的结果是一致的,不会因为多次点击而产生了副作用,数据库的结果都是唯一的,不可变的。
只要保持幂等性,不管来多少条重复消息,最后处理的结果都一样,需要业务端来实现。
4、消息堆积
消息中间件的主要功能是异步解耦,还有个重要功能是挡住前端的数据洪峰,保证后端系统的稳定性,这就要求消息中间件具有一定的消息堆积能力,消息堆积分以下两种情况:
- 消息堆积在内存Buffer:一旦超过内存Buffer,可以根据一定的丢弃策略来丢弃消息,如CORBA Notification规范中描述。适合能容忍丢弃消息的业务,这种情况消息的堆积能力主要在于内存Buffer大小,而且消息堆积后,性能下降不会太大,因为内存中数据多少对于对外提供的访问能力影响有限。
- 消息堆积到持久化存储系统中:例如DB,KV存储,文件记录形式。 当消息不能在内存Cache命中时,要不可避免的访问磁盘,会产生大量读IO,读IO的吞吐量直接决定了消息堆积后的访问能力。
二、如何避免消息丢失
1、丢失原因?
第一种情况:一般MQ中间件为了提高系统的吞吐量会把消息保存在内存中,如果不作其他处理,MQ服务器一旦宕机,消息将全部丢失。这个是业务不允许的,造成很大的影响。
第二种情况: 就是消息刚刚保存到MQ内存中,但还没有来得及更新到磁盘文件中,突然宕机了。这个场景在持续的大量消息投递的过程中,会很常见。
2、避免消息丢失
持久化:将MQ消息持久化;
confirm机制:消息发送端接受MQ的confirm通知;
通过持久化和confirm机制,这样是不是就可以保障100%消息不丢失了呢?
comfirm机制其实是一个异步监听的机制,是为了保证系统的高吞吐量,这样就导致了还是不能够100%保障消息不丢失,因为即使加上了confirm机制,消息在MQ内存中还没有刷盘到磁盘就宕机了,还是没法处理。
3、辅助手段
生产者在投递消息之前,可以在本地数据库建一张消息表,先把消息持久化到Redis或DB中,这样就可以利用本地数据库的事务机制。事务提交成功后,将消息表中的消息转移到消息队列中。
confirm机制监听消息是否发送成功?如ack成功消息,删除DB中此消息或者修改消息表状态。
如果nack不成功的消息,这个可以根据自身的业务选择是否重发此消息。也可以删除此消息,由自己的业务决定。
三、如何避免消息重复消费
1、重复消费的问题?
导致重复消费的原因可能出现在生产者,也可能出现在 MQ 或 消费者。
这里说的重复消费问题是指同一个数据被执行了两次,不单单指 MQ 中一条消息被消费了两次,也可能是 MQ 中存在两条一模一样的消费。
- 生产者:生产者可能会重复推送一条数据到 MQ 中,为什么会出现这种情况呢?也许是一个 Controller 接口被重复调用了 2 次,没有做接口幂等性导致的;也可能是推送消息到 MQ 时响应比较慢,生产者的重试机制导致再次推送了一次消息。
- MQ:在消费者消费完一条数据响应 ack 信号消费成功时,MQ 突然挂了,导致 MQ 以为消费者还未消费该条数据,MQ 恢复后再次推送了该条消息,导致了重复消费。
- 消费者:消费者已经消费完了一条消息,正准备但是还未给 MQ 发送 ack 信号时,此时消费者挂了,服务重启后 MQ 以为消费者还没有消费该消息,再次推送了该条消息。
2、如何保证幂等性?
去重原则:使用业务端逻辑保持幂等性
去重策略:保证每条消息都有唯一编号(比如唯一流水号),且保证消息处理成功与去重表的日志同时出现。
建立一个消息表,拿到这个消息做数据库的insert操作。给这个消息做一个唯一主键(primary key)或者唯一约束,那么就算出现重复消费的情况,就会导致主键冲突,那么就不再处理这条消息。
消费者怎么解决重复消费问题呢?这里提供两种方法:
- 状态判断法:消费者消费数据后把消费数据记录在 redis 中,下次消费时先到 redis 中查看是否存在该消息,存在则表示消息已经消费过,直接丢弃消息。
- 业务判断法:通常数据消费后都需要插入到数据库中,使用数据库的唯一性约束防止重复消费。每次消费直接尝试插入数据,如果提示唯一性字段重复,则直接丢失消息。一般都是通过这个业务判断的方法就可以简单高效地避免消息的重复处理了。
四、如何解决消息积压
1、消息积压处理办法
首要考虑临时紧急扩容,比如:利用K8s弹性扩容,增加消费者数量;
先修复 consumer 的问题,确保其恢复消费速度,然后将现有 cnosumer 都停掉。
新建一个 topic,partition 是原来的 10 倍,临时建立好原先 10 倍的 queue 数量。
然后写一个临时的分发数据的 consumer 程序,这个程序部署上去消费积压的数据,消费之后不做耗时的处理,直接均匀轮询写入临时建立好的 10 倍数量的 queue。
接着临时征用 10 倍的机器来部署 consumer,每一批 consumer 消费一个临时 queue 的数据。这种做法相当于是临时将 queue 资源和consumer 资源扩大 10 倍,以正常的 10 倍速度来消费数据。
等快速消费完积压数据之后,得恢复原先部署的架构,重新用原先的consumer 机器来消费消息。
2、MQ中消息失效
假设你用的是 RabbitMQ,RabbtiMQ 是可以设置过期时间的,也就是 TTL。如果消息在queue 中积压超过一定的时间就会被 RabbitMQ 给清理掉,这个数据就没了。那这就是第二个坑了。这就不是说数据会大量积压在 mq里,而是大量的数据会直接搞丢。
我们可以采取一个方案,就是批量重导,这个我们之前线上也有类似的场景干过。就是大量积压的时候,我们当时就直接丢弃数据了,然后等过了高峰期以后,比如:
大家一起喝咖啡熬夜到晚上12点以后,用户都睡觉了。这个时候我们就开始写程序,将丢失的那批数据,写个临时程序,一点一点的查出来,然后重新灌入 mq 里面去,把白天丢的数据给他补回来。也只能是这样了。
假设 1 万个订单积压在 mq 里面,没有处理,其中 1000个订单都丢了,你只能手动写程序把那 1000 个订单给查出来,手动发到 mq 里去再补一次。
3、MQ消息队列快满了
如果消息积压在 mq 里,你很长时间都没有处理掉,此时导致 mq 都快写满了,咋办?这个还有别的办法吗?
没有,谁让你第一个方案执行的太慢了,你临时写程序,接入数据来消费,消费一个丢弃一个,都不要了,快速消费掉所有的消息。然后走第二个方案,到了晚上再补数据吧 。
五、如何保证消息的顺序性
1、为什么要保证顺序
消息队列中的若干消息如果是对同一个数据进行操作,这些操作具有前后的关系,必须要按前后的顺序执行,否则就会造成数据异常。举例:
Producer先后发送了2条消息,一条insert,一条update,分别分配到2台Queue中,消费者组中的两台机器分别处理这两个Queue中的消息,这时候顺序是无法保证的;
2、出现顺序错乱的场景
(1)RabbtiMQ
- a、一个queue,有多个consumer去消费,这样就会造成顺序的错误。consumer从MQ里面读取数据是有序的,但是每个consumer的执行时间是不固定的,无法保证先读到消息的consumer一定先完成操作,这样就会出现消息并没有按照顺序执行,造成数据顺序错误。
- b、一个queue对应一个consumer,但是consumer里面进行了多线程消费,这样也会造成消息消费顺序错误。
(2)RocketMQ
- a、RocketMQ一个topic,一个queue,一个consumer,但是consumer内部进行多线程消费,这样数据也会出现顺序错乱问题。
- b、具有顺序的数据写入到了不同的partition里面,不同的消费者去消费,但是每个consumer的执行时间是不固定的,无法保证先读到消息的consumer一定先完成操作,这样就会出现消息并没有按照顺序执行,造成数据顺序错误。
3、保证消息的消费顺序
(1)RabbtiMQ
- a、拆分多个queue,每个queue一个consumer,就是多一些queue而已,确实是麻烦点;这样也会造成吞吐量下降,可以在消费者内部采用多线程的方式取消费。
- b、或者就一个queue但是对应一个consumer,然后这个consumer内部用内存队列做排队,然后分发给底层不同的worker来处理。
(2)RocketMQ/Kafka
RocketMQ可以严格的保证消息有序。但这个顺序,不是全局顺序,只是分区(queue)顺序。要全局顺序只能一个分区。 之所以出现你这个场景看起来不是顺序的,是因为发送消息的时候,消息发送默认是会采用轮询的方式发送到不通的queue(分区)。
- a、分区顺序:拆分多个queue,然后N个线程分别消费一个queue。
- b、全局顺序:确保同一个消息发送到同一个partition,一个topic,一个分区(queue),一个consumer,内部单线程消费。
六、延时消费
启动一个cron定时任务,每隔一段时间执行一次,比如30分钟,找到那些超时的数据,直接更新状态,或者拿出来执行一些操作。如果数据量比较大,需要分页查询,分页update,这将是一个for循环更新操作。
cron方案是很常见的一种方案,但是常见的不一定是最好的,主要有以下几个问题:
- 当数据量大的时候轮询效率低;
- 时效性不够好,如果每小时轮询一次,最差的情况时间误差会达到1小时;
- 如果通过增加cron轮询频率来减少时间误差,则会出现轮询低效和重复计算的问题;
(1)RabbtiMQ
RabbitMQ延迟队列的核心思路是:TTL消息/队列 + DLX死信队列
TTL是Time To Live的缩写,也就是生存时间的意思;
DLX死信队列
虽然叫队列,但其实指的是Exchange
,或者说指的Exchange
和它所属的Queue
,他俩一块构成了死信队列。
(2)RocketMQ
RocketMQ延迟队列的核心思路是:所有的延迟消息由producer发出之后,都会存放到同一个topic(SCHEDULE_TOPIC_XXXX)下,不同的延迟级别会对应不同的队列序号,当延迟时间到之后,由定时线程读取转换为普通的消息存的真实指定的topic下,此时对于consumer端此消息才可见,从而被consumer消费。
注意:RocketMQ不支持任意时间的延时,只支持以下几个固定的延时等级
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
七、回溯消费
回溯消费是指Consumer已经消费成功的消息,由于业务上的需求需要重新消费,要支持此功能,Broker在向Consumer投递成功消息后,消息仍然需要保留。并且重新消费一般是按照时间维度。
例如:由于Consumer系统故障,恢复后需要重新消费1小时前的数据,那么Broker要提供一种机制,可以按照时间维度来回退消费进度。
RocketMQ支持按照时间回溯消费,时间维度精确到毫秒,可以向前回溯,也可以向后回溯。
感觉从生产端回溯更加合理,需要生产端提供可回溯的业务机制。