作者:EIght_16 | 来源:互联网 | 2023-08-25 13:33
前面已经对于目前订单系统的问题使用mq解决了四个问题:
此时还要两个问题要进行解决,不过在解决之前我们先来看看rocketmq的一些运行原理结构
一、生成者如何发送消息的
1、消息在broker是怎么存储的
首先需要明白MessageQueue,一个类型的topic内可以有多个MessageQueue,也就是队列用来存放消息的。可以在建立topic的时候指定MessageQueue的数量。
假如我们现在有一个topic,然后为他指定了4个MessageQueue,此时来看看在broker集群下是怎么分布的。(我们知道topic是分布式存储在多个broker中的)
这里的MessageQueue本质上就是一个数据分片的机制,在这个机制中假如你一个topic有1万条数据,然后这个topic有4个MessageQueue,此时大致每个MessageQueue就会存2500个消息。(通过MessageQueue将一个topic数据拆分为多个数据分片)
知道了消息在broker上是怎么存储的,那么此时生产者是怎么知道将消息写入哪个MessageQueue?
2、发送消息到MessageQueue
此时生产者会跟NameServer进行通信获取Topic的数据信息,所以生产者就会知道一个topic中有几个MessageQueue,哪些MessageQueue在哪个broker上。
而消息发送到哪个MessageQueue上,此时可以暂时的认为是均匀的发送到各MessageQueue上,至于是否还有其他写入策略我们后面再说。
3、此时broker出现故障怎么办?
假如现在某个master broker挂了,此时会等待其他slave切换为master,但这个时机段这组broker就没有master提供写操作了。那么此时消息发送到master就会失败。
此时就会建议在producer中开启一个开关,就是sendLatencyFaultEnable
一旦打开这个开关,此时就会有一个容错机制,比如某次访问某个broker发现网络延迟有500ms,此时还无法访问,那么就会自动回避这个broker一段时间(例如接下来的3000ms内就不会访问这个broker了),然后过段时间后再去访问(此时已经切换成功或者master恢复了)。
二、Broker是怎么持久化消息的?
1、CommitLog消息的顺序写入机制
当broker上接收到一个消息的时候,broker会把这个消息直接顺序写入一个日志文件(CommitLog日志),该CommitLog有很多文件,每隔文件最多为1G,Broker在接收到消息后直接追加到这个文件的末尾。如果一个CommitLog文件写满了就会创建一个新的CommitLog文件。
2、那么MessageQueue有什么用?
前面我们说将消息顺序写入CommitLog文件中,那么我们前面说的MessageQueue有什么用?
其实在Broker中,每个MessageQueue都会有一系列的ConsumeQueue文件
就是在Broker的磁盘上,会有下面这种格式的一系列文件:$HOME/store/consumequeue/{topic}/{queueId}/{filename}
这时topic表示哪个topic,queueId表示哪个MessageQueue。此时这些ConsumeQueue文件存储的信息——————一条消息对应在CommitLog文件中的offset偏移量(每次写入消息时不仅要对CommitLog文件进行顺序写入,还要在ConsumeQueue文件中记录各个消息在CommitLog的offset偏移量)(此时我们要去CommitLog中拿消息就可以直接根据偏移量进行快速的获取)
其实在ConsumeQueue文件中不仅记录了消息的物理位置的offset偏移量,还会记录消息的长度、tag hashcode等信息。
3、rockemq怎么优化消息写入CommitLog文件的过程
在RocketMq中,Broker基于OS操作系统的PageCache+顺序写两个机制来提升写入CommitLog文件的性能。
首先顺序就是直接在文件末尾加一条数据即可。
其次,数据写入CommitLog时不是直接写入底层的物理磁盘文件的,而是先写入OS的PageCache内存缓存中,然后再由OS的后台线程选择一个时间进行异步刷盘。
这样虽然可以大量的优化性能,但此时要是os的pagecache有部分数据没写入磁盘中,此时机器挂掉了,那么不久造成了数据丢失的现象?这个问题在后面解答
4、同步刷盘和异步刷盘
此时看完上面的刷盘策略,这不就是异步刷盘的模式吗?
虽然异步刷盘性能更高,但存在数据丢失的情况
此时如果要求数据持久化不会出现数据丢失现象,此时可以使用另一种刷盘模式——同步刷盘,即消息发送到broker后需要强制将这个消息刷入磁盘后才会返回ack给生产者,此时才表示消息写入成功。
此时要是master broker没有把消息刷入磁盘就挂了,此时生产者会接收到发送失败的回应,此时就会进行重发,知道主从切换成功。这样也是不会导致数据丢失的。
虽然同步刷盘可以保证数据不丢失,但是其性能是比不上异步刷盘的。
那么此时我们该如何选择这两种策略?
对于同步/异步刷盘,同步/异步复制的选择:
https://blog.csdn.net/guyue35/article/details/105674044
对于日志类型这种场景,可以允许数据的丢失,但是要求比较高的吞吐量,可以采用异步刷盘的方式。另外非核心的业务场景,不涉及重要核心数据变更的场景,也可以使用异步刷盘,比如订单支付成功,发送短信这种场景。但是对于涉及到核心的数据变更的场景,就需要使用同步刷盘,比如订单支付成功后扣减库存。
三、基于DLedger技术的Broker主从同步原理(Raft协议)
1、基于DLedger技术替换Broker的CommitLog
RocketMq的自动主从切换是基于DLedger实现的。
DLedger技术实际上它就有自己的一个CommitLog机制,你把数据给它,它会写入到CommitLog磁盘文件中去。即用先DLedger来关路CommitLog。
然后Broker再基于DLedger管理的CommitLog去构建出各个ConsumeQueue文件。
(每个Broker上都有一个DLedger组件)
2、DLedger是如何基于Raft协议选举Leader Broker?
DLedger是基于Raft协议进行Leader Broker选举的,那么Raft协议具体是怎么进行选举的。
当master机器宕机后,假如此时有三台slave,此时三台slave需要进行一次选举投票。
假如三台机器都投了自己并把投票结果发送给其他slave。那么此时的投票结果就是ABC三台机器都是一票,这样就选不出来leader了,此时让ABC各自休眠一个随机时间(例如A1秒,B1.5秒,c2秒),然后A第一个醒会投给自己然后将自己的投票结果发给BC,BC醒后发现A投给了它自己,此时BC都会将票给到A。所以最后A就得到了3票成功成为Leader。
(只要获得 (机器数/2)+1 就会成为Leader)
总结:Raft协议中选举Leader算法中确保有人可以成为Leader的核心机制就是一轮选举不出来Leader,就让大家都随机休眠一下,先醒的人会投票给自己,其他后面醒来的人发现自己收到选票了就会直接投给那个人。
3、DLedger基于Raft协议进行多副本同步?
Leader Broker是如何基于Raft协议同步数据给其他follow Broker的。
简单来说,数据同步分为两个阶段,一个是uncommitted阶段,一个是commited阶段。
首先Leader Broker上的DLedger接收到一条数据后进行持久化,然后会标记这个消息为uncommitted状态,然后通过自己的DLedgerServer组件把这个uncomitted消息发送到Follower Broker的DLedgerServer,当Follower保存完这个消息后就给Leader的DLedgerServer返回一个ack,然后Leader会统计如果超过半数的Follower有返回ack则会将这个消息标记为committed状态(然后返回给生产者)。然后Leader的DLedgerServer就会发送commited消息给各个Follower的DLedgerServer,让他们也把消息改为commited。
4、leader宕机了怎么办?
如果Leader宕机了,此时会选举一个follower出来,然后由新的leader去做同步数据操作
(那么此时选举的投票是投数据最保存最完整的?还是说随机?)(本次leader commit的消息网络波动的slave没接受到,leader会记录每个slave对应读取到commit index的位置下次在发送数据过去的时候会把就数据带上这部分数据slave接受到后根据自己当前的index来判断这部分消息是否存在不存在则会保存 不会出现数据丢失问题。只有commit index完整的follower才能成为leader)
其实对于Raft协议,专栏中写的并不清楚,后续会自己详细的分析下Raft协议的具体实现原理。
四、消费者如何获取消息且进行ack的?
1、什么是消费组
简单点讲就是多个类似消费者的组合,例如我们有一个Topic叫”TopicOrderPaySuccess“,然后库存系统、积分系统、营销系统、仓储系统都要去消费这个Topic中的数据。
此时我们就会给各个系统中分别起一个消费组的名字例如:stock_consumer_group、marketing_consumer_group、credie_consumer_group、wms_consumer_group。(每个消费组可以有多个消费者)
2、消息发送到broker后,消费者如何进行消费的?
不同的消费组会订阅各自的topic,然后去各自的Topic中拉取消息
这里会有两种消费模式:集群模式消费(默认常用)和广播模式消费
集群消费模式:即一条消息到消费组后,该消息只能被消费组中的一个消费者消费。
广播模式:如果消费组获取到一条消息,则该组的所有消费者都会消费这个消息。
(可以通过sonsumer.setMessageModel(MessageModel.BROADCASTING)来设置为广播模式)
3、Pull模式和Push模式
消费者消费消息在RocketMQ中可以采用Push和Pull模式,具体区别看:https://blog.csdn.net/qq_21383435/article/details/101113808
https://my.oschina.net/xinxingegeya/blog/956370
实际上,这两个消费模式的本质是一样的,都是通过消费者机器主从发送请求到Broker机器去拉取一批消息下来。
Push模式本质就是基于消费组主动拉取的模式来实现的(Pull),只不过它的名字叫Push而已,意思是Broker会尽可能实时地把消息交给消费者来处理,所以其时效性更好。
我们一般都是基于Push模式进行消费的,因为Pull模式写起来更加复杂繁琐,而且Push底层就是基于Pull模式来实现的,只不过RocketMq帮我们实现了封装,时效性更好。
Push模式中消费者会去Broker拉取一批消息,如果有新的消息会立马拉取到消费者进行处理,在处理完这些消息后会立刻在去Broker发请求看看还有没有消息。
Push模式下有一个请求挂起和长轮询的机制:
长轮询和请求挂起:当你的请求到Broker后,发现没有新的消息给你处理,此时就会让请求线程挂起,默认是15秒,然后这个期间Broker会定时的有后台线程轮询地去检查是否有新消息给你,如果有新消息会主动唤醒挂起的线程然后将消息给你。
4、Broker收到消费者请求后是怎么将消息给消费者地?
假设我现在消费者向Broker发送请求,告诉Broker我要拉取MessageQueue0中地消息,然后我之前都没拉取过消息,所以此时就从MessageQueue0第一条消息开始拉取。
此时Broker就会找到MessageQueue0对应的consumerQueue0文件,从里面找到第一条信息的offset地址。
接着就根据ConsumeQueue0中找到的第一条消息的地址,去CommitLog中根据这个offset地址去读取这条数据,然后把这条消息的数据返回给消费者。
总结一下就是broker根据你要消费的MessageQueue以及开始消费的位置,然后去相应的consumerQueue0文件中找到相应消息的offset地址,然后根据这个offset地址去CommitLog文件中获取消息数据。
5、消费者如何处理消息、怎么记录上次的消费记录?
当我们消费成功后会回调一个我们注册的函数:
消费成功后给Broker返回成功消费的信息。
然后Broker收到ack后会记录我们消费组的消费进度(用一个ConsumerOffset去记录我们的消费进度)那么下次消费组再进行拉取这个ConsumeQueue的消息时,就会从Broker记录的消费位置开始拉取,这样就不会从头进行拉取了。(也可以设置为从头开始拉取)
6、如果消费组中出现机器宕机或者扩容加机器会怎么处理?
这个时候会进入一个rebalance环节,也就是给各个消费者分配他们要处理的MessageQueue。
这里提一些问题:
问题一. 根据上一节的讲解和代码, 只有处理完逻辑后, 返回ConsumeSuccess 才算处理 完了. 问题二: 收到消息还没处理完就宕机了, 就还没有给broker返回消费成功的标记, Broker就不会存储这条消息的消费进度, 等消费者重启后, 再次消费消息时, 会从上一次的偏移量消息开始消费. 问题三: 如果处理完了消息, 但没有来得及提交进度, 那么broker会认为这条消息还没有被重新消费, 消费者重启后, 会重新从这条消息开始拉取消息, 此时就是重复消费了, 就需要做幂等性处理.(这里要考虑幂等性,那不就所以信息都需要进行幂等性保证了???)
7、消费者用什么策略从Master或者Slave上拉取消息的?
前面说过,刚开始消费者是连接到Master Broker机器去拉取消息的,然后如果master机器觉得自己负载比较高,就会告诉消费者,下次可以从Slave机器进行拉取。
之前说过CommitLog文件的消息写入时基于os cache(内存级别的)+顺序写从而提高吞吐量的,而ConsumeQueue文件的操作也是基于os cache进行的(ConsumeQueue文件一个就几M,每个文件可以存30条信息,所以ConsumeQueue文件几乎可以都在os cache中的)
那么此时master什么时候会让你从Slave拉取数据?
假如你写入了10万条数据,此时仅消费了2万条,此时还有8万条数据没拉取,则下次会从20001条数据开始拉取。然后broker知道自己的物理内存大概能存多少条信息(因为CommitLog文件的读取是需要将磁盘的文件读取到os cache中的,ConsumeQueue文件很小几乎可以不用理它),例如它现在知道自己只能利用os cache存5万条数据(加上之前2万条的缓存数据,此时这里的os cache只能再从磁盘中拉取3万条消息),但后面消费者是需要拉取8万条数据的,经过判断broker认为自己的负载比较高,其余的5万条消息可能无法及时给你,所以此时就会跟消费者说,这次我可以给你读取3万条消息,下次你还是从slave中去拉取把。
(在现代计算机系统中,CPU,RAM,DISK的速度不相同。CPU与RAM之间,RAM与DISK之间的速度差异常常是指数级。为了在速度和容量上折中,在CPU与RAM之间使用CPU cache以提高访存速度,在RAM与磁盘之间,操作系统使用page cache提高系统对文件的访问速度。访问磁盘文件一般都是要先加载到os cache的)
(OS cache是基于内存和磁盘之间,但是os cache用的都是机器的内存,所以你可以把它看出内存)
(如果你拉取的消息超过最大能使用的内存的量,那么就说明你后续会频繁从磁盘加载数据,这时就会让你从slave去加载数据了)
几个问题:
1、消费者只会和自己对应要拉取消息的topic下的messagequeue所在的broker节点建立连接。
2、kafka是支持主从架构下读写分离的,rabbitmq并没有读写分离,它的集群模式是per-per的,每个节点都支持读写,然后把数据同步给其他节点。
3、kafka和rabbitmq都存在数据不一致的情况。
4、主要是提升消息的处理效率;另外一种方式就是采用异步方式处理消息,也就是拉取到消息就直接提交处理完成的信息给broker,但这里可能会出现数据丢失的问题。