热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

RocketMq(四)运行原理

前面已经对于目前订单系统的问题使用mq解决了四个问题:此时还要两个问题要进行解决,不过在解决之前我们先来看看rocketmq的一些运行原理结构一、生成者

前面已经对于目前订单系统的问题使用mq解决了四个问题:

此时还要两个问题要进行解决,不过在解决之前我们先来看看rocketmq的一些运行原理结构


一、生成者如何发送消息的


1、消息在broker是怎么存储的

首先需要明白MessageQueue,一个类型的topic内可以有多个MessageQueue,也就是队列用来存放消息的。可以在建立topic的时候指定MessageQueue的数量。

假如我们现在有一个topic,然后为他指定了4个MessageQueue,此时来看看在broker集群下是怎么分布的。(我们知道topic是分布式存储在多个broker中的)

这里的MessageQueue本质上就是一个数据分片的机制,在这个机制中假如你一个topic有1万条数据,然后这个topic有4个MessageQueue,此时大致每个MessageQueue就会存2500个消息。(通过MessageQueue将一个topic数据拆分为多个数据分片)

知道了消息在broker上是怎么存储的,那么此时生产者是怎么知道将消息写入哪个MessageQueue?
2、发送消息到MessageQueue

此时生产者会跟NameServer进行通信获取Topic的数据信息,所以生产者就会知道一个topic中有几个MessageQueue,哪些MessageQueue在哪个broker上。

而消息发送到哪个MessageQueue上,此时可以暂时的认为是均匀的发送到各MessageQueue上,至于是否还有其他写入策略我们后面再说。

3、此时broker出现故障怎么办?
假如现在某个master broker挂了,此时会等待其他slave切换为master,但这个时机段这组broker就没有master提供写操作了。那么此时消息发送到master就会失败。

此时就会建议在producer中开启一个开关,就是sendLatencyFaultEnable

一旦打开这个开关,此时就会有一个容错机制,比如某次访问某个broker发现网络延迟有500ms,此时还无法访问,那么就会自动回避这个broker一段时间(例如接下来的3000ms内就不会访问这个broker了),然后过段时间后再去访问(此时已经切换成功或者master恢复了)。


二、Broker是怎么持久化消息的?


1、CommitLog消息的顺序写入机制

当broker上接收到一个消息的时候,broker会把这个消息直接顺序写入一个日志文件(CommitLog日志),该CommitLog有很多文件,每隔文件最多为1G,Broker在接收到消息后直接追加到这个文件的末尾。如果一个CommitLog文件写满了就会创建一个新的CommitLog文件。

2、那么MessageQueue有什么用?
前面我们说将消息顺序写入CommitLog文件中,那么我们前面说的MessageQueue有什么用?

其实在Broker中,每个MessageQueue都会有一系列的ConsumeQueue文件

就是在Broker的磁盘上,会有下面这种格式的一系列文件:$HOME/store/consumequeue/{topic}/{queueId}/{filename}

这时topic表示哪个topic,queueId表示哪个MessageQueue。此时这些ConsumeQueue文件存储的信息——————一条消息对应在CommitLog文件中的offset偏移量(每次写入消息时不仅要对CommitLog文件进行顺序写入,还要在ConsumeQueue文件中记录各个消息在CommitLog的offset偏移量)(此时我们要去CommitLog中拿消息就可以直接根据偏移量进行快速的获取)

其实在ConsumeQueue文件中不仅记录了消息的物理位置的offset偏移量,还会记录消息的长度、tag hashcode等信息。

3、rockemq怎么优化消息写入CommitLog文件的过程

在RocketMq中,Broker基于OS操作系统的PageCache+顺序写两个机制来提升写入CommitLog文件的性能。

首先顺序就是直接在文件末尾加一条数据即可。

其次,数据写入CommitLog时不是直接写入底层的物理磁盘文件的,而是先写入OS的PageCache内存缓存中,然后再由OS的后台线程选择一个时间进行异步刷盘。

这样虽然可以大量的优化性能,但此时要是os的pagecache有部分数据没写入磁盘中,此时机器挂掉了,那么不久造成了数据丢失的现象?这个问题在后面解答

4、同步刷盘和异步刷盘

此时看完上面的刷盘策略,这不就是异步刷盘的模式吗?
虽然异步刷盘性能更高,但存在数据丢失的情况

此时如果要求数据持久化不会出现数据丢失现象,此时可以使用另一种刷盘模式——同步刷盘,即消息发送到broker后需要强制将这个消息刷入磁盘后才会返回ack给生产者,此时才表示消息写入成功。

此时要是master broker没有把消息刷入磁盘就挂了,此时生产者会接收到发送失败的回应,此时就会进行重发,知道主从切换成功。这样也是不会导致数据丢失的。

虽然同步刷盘可以保证数据不丢失,但是其性能是比不上异步刷盘的。

那么此时我们该如何选择这两种策略?

对于同步/异步刷盘,同步/异步复制的选择:
https://blog.csdn.net/guyue35/article/details/105674044

对于日志类型这种场景,可以允许数据的丢失,但是要求比较高的吞吐量,可以采用异步刷盘的方式。另外非核心的业务场景,不涉及重要核心数据变更的场景,也可以使用异步刷盘,比如订单支付成功,发送短信这种场景。但是对于涉及到核心的数据变更的场景,就需要使用同步刷盘,比如订单支付成功后扣减库存。


三、基于DLedger技术的Broker主从同步原理(Raft协议)


1、基于DLedger技术替换Broker的CommitLog

RocketMq的自动主从切换是基于DLedger实现的。

DLedger技术实际上它就有自己的一个CommitLog机制,你把数据给它,它会写入到CommitLog磁盘文件中去。即用先DLedger来关路CommitLog。

然后Broker再基于DLedger管理的CommitLog去构建出各个ConsumeQueue文件。

(每个Broker上都有一个DLedger组件)

2、DLedger是如何基于Raft协议选举Leader Broker?

DLedger是基于Raft协议进行Leader Broker选举的,那么Raft协议具体是怎么进行选举的。

当master机器宕机后,假如此时有三台slave,此时三台slave需要进行一次选举投票。

假如三台机器都投了自己并把投票结果发送给其他slave。那么此时的投票结果就是ABC三台机器都是一票,这样就选不出来leader了,此时让ABC各自休眠一个随机时间(例如A1秒,B1.5秒,c2秒),然后A第一个醒会投给自己然后将自己的投票结果发给BC,BC醒后发现A投给了它自己,此时BC都会将票给到A。所以最后A就得到了3票成功成为Leader。

(只要获得 (机器数/2)+1 就会成为Leader)

总结:Raft协议中选举Leader算法中确保有人可以成为Leader的核心机制就是一轮选举不出来Leader,就让大家都随机休眠一下,先醒的人会投票给自己,其他后面醒来的人发现自己收到选票了就会直接投给那个人。

3、DLedger基于Raft协议进行多副本同步?
Leader Broker是如何基于Raft协议同步数据给其他follow  Broker的。

简单来说,数据同步分为两个阶段,一个是uncommitted阶段,一个是commited阶段。

首先Leader Broker上的DLedger接收到一条数据后进行持久化,然后会标记这个消息为uncommitted状态,然后通过自己的DLedgerServer组件把这个uncomitted消息发送到Follower Broker的DLedgerServer,当Follower保存完这个消息后就给Leader的DLedgerServer返回一个ack,然后Leader会统计如果超过半数的Follower有返回ack则会将这个消息标记为committed状态(然后返回给生产者)。然后Leader的DLedgerServer就会发送commited消息给各个Follower的DLedgerServer,让他们也把消息改为commited。

4、leader宕机了怎么办?
如果Leader宕机了,此时会选举一个follower出来,然后由新的leader去做同步数据操作

(那么此时选举的投票是投数据最保存最完整的?还是说随机?)(本次leader commit的消息网络波动的slave没接受到,leader会记录每个slave对应读取到commit index的位置下次在发送数据过去的时候会把就数据带上这部分数据slave接受到后根据自己当前的index来判断这部分消息是否存在不存在则会保存 不会出现数据丢失问题。只有commit index完整的follower才能成为leader)

其实对于Raft协议,专栏中写的并不清楚,后续会自己详细的分析下Raft协议的具体实现原理。


四、消费者如何获取消息且进行ack的?


1、什么是消费组

简单点讲就是多个类似消费者的组合,例如我们有一个Topic叫”TopicOrderPaySuccess“,然后库存系统、积分系统、营销系统、仓储系统都要去消费这个Topic中的数据。

此时我们就会给各个系统中分别起一个消费组的名字例如:stock_consumer_group、marketing_consumer_group、credie_consumer_group、wms_consumer_group。(每个消费组可以有多个消费者)

2、消息发送到broker后,消费者如何进行消费的?

不同的消费组会订阅各自的topic,然后去各自的Topic中拉取消息
这里会有两种消费模式:集群模式消费(默认常用)和广播模式消费

集群消费模式:即一条消息到消费组后,该消息只能被消费组中的一个消费者消费。

广播模式:如果消费组获取到一条消息,则该组的所有消费者都会消费这个消息。

(可以通过sonsumer.setMessageModel(MessageModel.BROADCASTING)来设置为广播模式)

3、Pull模式和Push模式

消费者消费消息在RocketMQ中可以采用Push和Pull模式,具体区别看:https://blog.csdn.net/qq_21383435/article/details/101113808

https://my.oschina.net/xinxingegeya/blog/956370

实际上,这两个消费模式的本质是一样的,都是通过消费者机器主从发送请求到Broker机器去拉取一批消息下来。

Push模式本质就是基于消费组主动拉取的模式来实现的(Pull),只不过它的名字叫Push而已,意思是Broker会尽可能实时地把消息交给消费者来处理,所以其时效性更好。

我们一般都是基于Push模式进行消费的,因为Pull模式写起来更加复杂繁琐,而且Push底层就是基于Pull模式来实现的,只不过RocketMq帮我们实现了封装,时效性更好。

Push模式中消费者会去Broker拉取一批消息,如果有新的消息会立马拉取到消费者进行处理,在处理完这些消息后会立刻在去Broker发请求看看还有没有消息。

Push模式下有一个请求挂起和长轮询的机制:
长轮询和请求挂起:当你的请求到Broker后,发现没有新的消息给你处理,此时就会让请求线程挂起,默认是15秒,然后这个期间Broker会定时的有后台线程轮询地去检查是否有新消息给你,如果有新消息会主动唤醒挂起的线程然后将消息给你。

4、Broker收到消费者请求后是怎么将消息给消费者地?
假设我现在消费者向Broker发送请求,告诉Broker我要拉取MessageQueue0中地消息,然后我之前都没拉取过消息,所以此时就从MessageQueue0第一条消息开始拉取。

此时Broker就会找到MessageQueue0对应的consumerQueue0文件,从里面找到第一条信息的offset地址。

接着就根据ConsumeQueue0中找到的第一条消息的地址,去CommitLog中根据这个offset地址去读取这条数据,然后把这条消息的数据返回给消费者。

总结一下就是broker根据你要消费的MessageQueue以及开始消费的位置,然后去相应的consumerQueue0文件中找到相应消息的offset地址,然后根据这个offset地址去CommitLog文件中获取消息数据。

5、消费者如何处理消息、怎么记录上次的消费记录?
当我们消费成功后会回调一个我们注册的函数:

消费成功后给Broker返回成功消费的信息。

然后Broker收到ack后会记录我们消费组的消费进度(用一个ConsumerOffset去记录我们的消费进度)那么下次消费组再进行拉取这个ConsumeQueue的消息时,就会从Broker记录的消费位置开始拉取,这样就不会从头进行拉取了。(也可以设置为从头开始拉取)

6、如果消费组中出现机器宕机或者扩容加机器会怎么处理?

这个时候会进入一个rebalance环节,也就是给各个消费者分配他们要处理的MessageQueue。

这里提一些问题:

问题一. 根据上一节的讲解和代码, 只有处理完逻辑后, 返回ConsumeSuccess 才算处理 完了. 问题二: 收到消息还没处理完就宕机了, 就还没有给broker返回消费成功的标记, Broker就不会存储这条消息的消费进度, 等消费者重启后, 再次消费消息时, 会从上一次的偏移量消息开始消费. 问题三: 如果处理完了消息, 但没有来得及提交进度, 那么broker会认为这条消息还没有被重新消费, 消费者重启后, 会重新从这条消息开始拉取消息, 此时就是重复消费了, 就需要做幂等性处理.(这里要考虑幂等性,那不就所以信息都需要进行幂等性保证了???)

7、消费者用什么策略从Master或者Slave上拉取消息的?

前面说过,刚开始消费者是连接到Master Broker机器去拉取消息的,然后如果master机器觉得自己负载比较高,就会告诉消费者,下次可以从Slave机器进行拉取。

之前说过CommitLog文件的消息写入时基于os cache(内存级别的)+顺序写从而提高吞吐量的,而ConsumeQueue文件的操作也是基于os cache进行的(ConsumeQueue文件一个就几M,每个文件可以存30条信息,所以ConsumeQueue文件几乎可以都在os cache中的)

那么此时master什么时候会让你从Slave拉取数据?
假如你写入了10万条数据,此时仅消费了2万条,此时还有8万条数据没拉取,则下次会从20001条数据开始拉取。然后broker知道自己的物理内存大概能存多少条信息(因为CommitLog文件的读取是需要将磁盘的文件读取到os cache中的,ConsumeQueue文件很小几乎可以不用理它),例如它现在知道自己只能利用os cache存5万条数据(加上之前2万条的缓存数据,此时这里的os cache只能再从磁盘中拉取3万条消息),但后面消费者是需要拉取8万条数据的,经过判断broker认为自己的负载比较高,其余的5万条消息可能无法及时给你,所以此时就会跟消费者说,这次我可以给你读取3万条消息,下次你还是从slave中去拉取把。

(在现代计算机系统中,CPU,RAM,DISK的速度不相同。CPU与RAM之间,RAM与DISK之间的速度差异常常是指数级。为了在速度和容量上折中,在CPU与RAM之间使用CPU cache以提高访存速度,在RAM与磁盘之间,操作系统使用page cache提高系统对文件的访问速度。访问磁盘文件一般都是要先加载到os cache的)

(OS cache是基于内存和磁盘之间,但是os cache用的都是机器的内存,所以你可以把它看出内存)

(如果你拉取的消息超过最大能使用的内存的量,那么就说明你后续会频繁从磁盘加载数据,这时就会让你从slave去加载数据了)

几个问题:

1、消费者只会和自己对应要拉取消息的topic下的messagequeue所在的broker节点建立连接。

2、kafka是支持主从架构下读写分离的,rabbitmq并没有读写分离,它的集群模式是per-per的,每个节点都支持读写,然后把数据同步给其他节点。

3、kafka和rabbitmq都存在数据不一致的情况。

4、主要是提升消息的处理效率;另外一种方式就是采用异步方式处理消息,也就是拉取到消息就直接提交处理完成的信息给broker,但这里可能会出现数据丢失的问题。


 


推荐阅读
  • 本文提供了 RabbitMQ 3.7 的快速上手指南,详细介绍了环境搭建、生产者和消费者的配置与使用。通过官方教程的指引,读者可以轻松完成初步测试和实践,快速掌握 RabbitMQ 的核心功能和基本操作。 ... [详细]
  • 掌握PHP框架开发与应用的核心知识点:构建高效PHP框架所需的技术与能力综述
    掌握PHP框架开发与应用的核心知识点对于构建高效PHP框架至关重要。本文综述了开发PHP框架所需的关键技术和能力,包括但不限于对PHP语言的深入理解、设计模式的应用、数据库操作、安全性措施以及性能优化等方面。对于初学者而言,熟悉主流框架如Laravel、Symfony等的实际应用场景,有助于更好地理解和掌握自定义框架开发的精髓。 ... [详细]
  • HBase在金融大数据迁移中的应用与挑战
    随着最后一台设备的下线,标志着超过10PB的HBase数据迁移项目顺利完成。目前,新的集群已在新机房稳定运行超过两个月,监控数据显示,新集群的查询响应时间显著降低,系统稳定性大幅提升。此外,数据消费的波动也变得更加平滑,整体性能得到了显著优化。 ... [详细]
  • 进程(Process)是指计算机中程序对特定数据集的一次运行活动,是系统资源分配与调度的核心单元,构成了操作系统架构的基础。在早期以进程为中心的计算机体系结构中,进程被视为程序的执行实例,其状态和控制信息通过任务描述符(task_struct)进行管理和维护。本文将深入探讨进程的概念及其关键数据结构task_struct,解析其在操作系统中的作用和实现机制。 ... [详细]
  • 题目描述:小K不幸被LL邪教洗脑,洗脑程度之深使他决定彻底脱离这个邪教。在最终离开前,他计划再进行一次亚瑟王游戏。作为最后一战,他希望这次游戏能够尽善尽美。众所周知,亚瑟王游戏的结果很大程度上取决于运气,但通过合理的策略和算法优化,可以提高获胜的概率。本文将详细解析洛谷P3239 [HNOI2015] 亚瑟王问题,并提供具体的算法实现方法,帮助读者更好地理解和应用相关技术。 ... [详细]
  • 在 Linux 系统中,`/proc` 目录实现了一种特殊的文件系统,称为 proc 文件系统。与传统的文件系统不同,proc 文件系统主要用于提供内核和进程信息的动态视图,通过文件和目录的形式呈现。这些信息包括系统状态、进程细节以及各种内核参数,为系统管理员和开发者提供了强大的诊断和调试工具。此外,proc 文件系统还支持实时读取和修改某些内核参数,增强了系统的灵活性和可配置性。 ... [详细]
  • 本文详细介绍了HDFS的基础知识及其数据读写机制。首先,文章阐述了HDFS的架构,包括其核心组件及其角色和功能。特别地,对NameNode进行了深入解析,指出其主要负责在内存中存储元数据、目录结构以及文件块的映射关系,并通过持久化方案确保数据的可靠性和高可用性。此外,还探讨了DataNode的角色及其在数据存储和读取过程中的关键作用。 ... [详细]
  • 西北工业大学作为陕西省三所985和211高校之一,虽然在农业和林业领域不如某些顶尖院校,但在航空航天领域的实力尤为突出。该校的计算机科学专业在科研和教学方面也具有显著优势,是考研的理想选择。 ... [详细]
  • IIS 7及7.5版本中应用程序池的最佳配置策略与实践
    在IIS 7及7.5版本中,优化应用程序池的配置是提升Web站点性能的关键步骤。具体操作包括:首先定位到目标Web站点的应用程序池,然后通过“应用程序池”菜单找到对应的池,右键选择“高级设置”。在一般优化方案中,建议调整以下几个关键参数:1. **基本设置**: - **队列长度**:默认值为1000,可根据实际需求调整队列长度,以提高处理请求的能力。此外,还可以进一步优化其他参数,如处理器使用限制、回收策略等,以确保应用程序池的高效运行。这些优化措施有助于提升系统的稳定性和响应速度。 ... [详细]
  • 本文深入探讨了TCP协议的高可靠性特点及其广泛应用。TCP协议通过多种机制确保数据传输的准确性与稳定性,包括但不限于校验和验证、数据包分割与重组以及超时重传机制。这些机制共同作用,使得TCP成为互联网通信中最可靠的数据传输协议之一。 ... [详细]
  • 背景最近面试面得心力交瘁,由于没有高并发架构的实际项目经验,经常是在场景设计的面试题目上面栽跟头。上次就被问到了关于秒杀系统的设计,竟无 ... [详细]
  • 一文了解消息中间件RabbitMQ
    消息中间件---RabbitMQ1消息中间件的作用2.常用的消息中间件3消息中间件RabbitMQ3.1RabbitMQ介绍3.3RabbitMQ的队列模式3.3RabbitMQ的 ... [详细]
  • SpringCloud之Bus(消息总线)
    说明:关于SpringCloud系列的文章中的代码都在码云上面地址:https:gitee.comzh_0209_javaspringcloud-ali ... [详细]
  • .Net下RabbitMQ发布订阅模式实践
    一、概念AMQP,即AdvancedMessageQueuingProtocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的 ... [详细]
  • 启动activemq_「Java」SpringBoot amp; ActiveMQ
    一、消息队列消息队列中间件是分布式系统中重要的组件,主要解决应用耦合、异步消息、流量削锋等问题,实现高性能、高可用、可伸缩和最终一致性架构, ... [详细]
author-avatar
EIght_16
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有