热门标签 | HotTags
当前位置:  开发笔记 > 前端 > 正文

终于来了...RocketMQ扫盲篇

java基础教程栏目今天详细介绍有关RocketMQ知识。

java基础教程栏目今天详细介绍有关RocketMQ知识。

图片来自gitee.com/mirrors/roc…

可以看到RocketMQ主要有四个组件:

NameServer

  • 无状态服务,注册中心,可集群部署,但是NameServer节点之间没有任何数据交互。
  • Borker会以定时把Topic路由信息上报给所有的NameServer。Producer、Consumer会随机选择一个NameServer定时Topic更新路由信息。
  • Topic路由信息在NameServer集群中采用最终一致性。
  • 保证AP。

Borker

  • RocketMQ的服务端,用于存储消息、分发消息。
  • Borker会定时把自身拥有的所有的Topic路由信息上报给NameServer。
  • Borker有两个角色:Master、Follower,Master承担读(消费消息)写(生产消息)操作,如果Master比较忙,或者不可用,Follower可以承担读操作。BorkerId=0,代表是Matser,BorkerId!=0,代表是Follower,需要注意的有两点: 其一,目前为止,BorkerId=1的Follower才可以承担读操作; 其二,只有较高版本的RocketMQ才支持当Master节点挂掉,Follower自动升级到Master。

Producer

生产者,每隔一定时间向NameServer发起Topic的路由信息查询。

Consumer

消费者,每隔一定时间向NameServer发起Topic的路由信息查询。

为什么注册中心不选用Zookeeper

其实,在低版本的RocketMQ中,确实是选用Zookeeper作为注册中心的,但是后面改成了现在的NameServer,猜想主要原因是:

  • RocketMQ已经是一个中间件了,不想再依赖其他中间件。
  • Zookeeper比较重,有很多功能RocketMQ是用不到的,不如写一个轻量级的注册中心。
  • Zookeeper是CP,一旦触发领导选举,那么注册中心就不可用了,而RocketMQ的注册中心,不需要强一致性,只要保证最终一致性。

RocketMQ消息领域模型

Message

  • 传输的消息。
  • 消息必须有Topic。
  • 消息可以有多个Tag和多个Key,可以看做消息的附加属性。

Topic

  • 一类消息的集合。
  • 每个消息必须有一个Topic。
  • 消息的第一级类型。

Tag

  • 一个消息除了有Topic之外,还可以有Tag,用来细分同一个Topic下的不同种类的消息。
  • Tag不是必须的。
  • 消息的第二级类型。

Group

分为ProducerGroup,ConsumerGroup,我们更多的是关注ConsumerGroup,ConsumerGroup包含多个Consumer。

在集群消费模式下,一个ConsumerGroup下的Consumer共同消费一个Topic,且每个Consumer会被分配到N个队列,但是一个队列只会被一个Consumer消费,不同的ConsumerGroup可以消费同一个Topic,一条消息会被订阅此Topic的所有ConsumerGroup消费。

Queue

  • 一个Topic默认包含四个Queue。
  • 在集群消费模式下,同一个ConsumerGroup中的Consumer可以消费多个Queue的消息,但是一个Queue只能被一个Consumer消费。
  • Queue中的消息是有序的。
  • 分为读Queue和写Queue,一般来说,读Queue的数量和写Queue的数量是一致的,否则很容易出问题。

消费模式

消费模式有两种:Clustering(集群消费)和Broadcasting(广播消费)。

和其他MQ不同,其他MQ是在发送消息的时候,指定是集群消费还是广播消费,RocketMQ是在消费者端设置是集群消费还是广播消费。

Clustering(集群消费)

默认情况下是集群消费模式,该模式下,ConsumerGroup所有的Consumer共同消费一个Topic的消息,每个Consumer负责消费N个队列的消息(N也可能为1,甚至是0,没有分配到队列),但是一个队列只会被一个Consumer消费。如果某个Consumer挂掉,ConsumerGroup下的其他Consumer会接替挂掉的Consumer继续消费。

集群消费模式下,消费进度维护在Borker端,存储路径为${ROCKET_HOME}/store/config/ consumerOffset.json,如下图所示:使用topicName@consumerGroupName为Key,消费进度为Value,Value的形式是queueId:offset ,说明如果有多个ConsumerGroup,每个ConsumerGroup的消费进度是不同的,需要分开来存储。

Broadcasting(广播消费)

广播消费消息会发给ConsumerGroup中所有的Consumer。

广播消费模式下,消费进度维护在Consumer端。

消费队列负载算法与重平衡机制

消费队列负载算法

我们知道了在集群消费模式下,ConsumerGroup下所有的Consumer共同消费一个Topic的消息,每个Consumer负责消费N个队列的消息,那么具体是如何分配的呢?这就涉及到消费队列负载算法了。

RocketMQ提供了众多的消费队列负载算法,其中最常用的是两种算法,即AllocateMessageQueueAveragely、AllocateMessageQueueAveragelyByCircle。下面我们来看下这两个算法的区别。

假设,现在一个Topic有16个队列,用q0~q15表示,有3个Consumer,用c0-c2表示。

用AllocateMessageQueueAveragely消费队列负载算法的结果如下:

  • c0:q0 q1 q2 q3 q4 q5
  • c1:q6 q7 q8 q9 q10
  • c2:q11 q12 q13 q14 q15

用AllocateMessageQueueAveragelyByCircle消费队列负载算法的结果如下:

  • c0:q0 q3 q6 q9 q12 q15
  • c1:q1 q4 q7 q10 q13
  • c2:q2 q5 q8 q11 q14

ConsumerGroup下所有的Consumer共同消费一个Topic的消息,每个Consumer负责消费N个队列的消息,但是一个队列不能同时被N个Consumer消费,这意味着什么?

聪明的你一定可以想到,如果一个Topic只有4个队列,而有5个Consumer,那么有一个Consumer将不能分配到任何队列,所以在RocketMQ中,Topic下队列的个数直接决定了Consumer的最大个数,也就说明,不能光靠增加Consumer来提高消费速度。

重平衡

虽然建议在创建Topic的时候,就应该充分考虑队列的个数,但是实际情况往往是不尽人意的,哪怕队列数没有发生改变,Consumer的数量也一定会发生改变,比如Consumer的上下线,比如某个Consumer挂了,比如新增了Consumer。队列的扩容、缩容,Consumer的扩容、缩容都会导致重平衡,也就是为Consumer重新分配消费的队列。

在RocketMQ中,Consumer会定时查询Topic的队列的个数,Consumer的个数,如果发生了改变,就会触发重平衡。

重平衡是RocketMQ内部实现的,程序员无需关心。

Pull OR Push?

一般来说,MQ有两种方法获取消息:

  • Pull:Consumer主动拉取消息,好处是Consumer可以控制拉取消息的频率,条数,Consumer知道自身的消费能力,所以在Consumer端不容易造成消息堆积,但是实时性不是太好,效率相对较低。
  • Push:Broker主动发送消息,好处是实时性、效率比较高,但是Broker无法知道Consumer端的消费能力,如果发给Consumer的消息过多,会造成Consumer端的消息堆积;如果发给Consumer的数据太少,又会造成Consumer端的空闲。

不管是Pull,还是Push,Consumer总会与Broker产生交互,交互的方式一般有短连接、长连接、轮询三种方式。

看起来,RocketMQ支持既支持Pull,也支持Push,但是实际上Push也是用Pull实现的,那么Consumer是怎么与Broker产生交互的呢?

这就是RocketMQ设计的巧妙的地方了,既不是短连接,也不是长连接,也不是轮询,而是采用的长轮询。

长轮询

Consumer发起拉取消息的请求,分为两种情况:

  • 有消息:Consumer拿到消息后,连接断开。
  • 没有消息:Borker Hold(保持)住连接一定时间,每隔5秒,检查下是否有消息,如果有消息,给Consumer,连接断开。

事务消息

RocketMQ支持事务消息,Producer把事务消息发送给Broker后,Broker会把消息存储在系统Topic:RMQ_SYS_TRANS_HALF_TOPIC,这样Consumer就无法消费到这条消息了。

Broker会有一个定时任务,消费RMQ_SYS_TRANS_HALF_TOPIC的消息,向Producer发起回查,回查的状态有三种:提交、回滚、未知。

  • 如果回查的状态是提交,回滚,会触发消息的提交和回滚;
  • 如果是未知,会等待下一次回查,RocketMQ可以设置一条消息的回查间隔与回查次数,超过一定的回查次数,消息会自动回滚。

延迟消息

延迟消息是指息发到Broker后,不能立刻被Consumer消费,需要等待一定的时间才可以被消费到,RocketMQ只支持特定的延迟时间:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

消费形式

RocketMQ支持两种消费形式:并发消费、顺序消费。 如果是顺序消费,需要保证排序的消息在同一个队列。如何选择队列发送呢,RocketMQ发送消息的方法有好几个重载,其中有一个重载方法支持队列的选择。

同步刷盘、异步刷盘

Producer把消息发送到Borker中,Borker是需要把消息持久化的,RocketMQ支持两种持久化策略:

  • 同步刷盘:Borker把消息持久后才返回ACK给Producer,好处是消息可靠性高,但是效率较慢。
  • 异步刷盘:Broker把消息写入到PageCache中,就返回ACK给Producer。好处是效率极高,但是如果服务器挂了,消息可能会丢失,如果只是RocketMQ服务挂了,不会造成消息丢失。

同步复制、异步复制

为了MQ的可靠性、可用性,在生产环境,一般会部署Follower节点,Follower节点会复制Master的数据,RocketMQ支持两种持复制策略:

  • 同步复制:Master、Follower都把消息写入成功,才返回ACK给Producer,可靠性较高,但是效率较慢。
  • 异步复制:只要Master写入成功,就返回ACK给Producer,效率较高,但是可能会丢失消息。

"写入"是写入PageCache,还是写入硬盘,要看Follower Broker的配置。

再谈谈Producer

RocketMQ提供了三种发送消息的方法:

  • oneway:fire and forget,单向消息,指消息发送出去后,就不管了,这个方法是没有返回值的。
  • 同步:消息发送出去后,同步等待Borker的响应。
  • 异步:消息发送出去后,立即返回,收到Boker的响应后,会执行函调方法。

在实际开发中,一般选用同步方法,如果要提高RocketMQ的性能,一般都是修改Borker端的参数,特别是刷盘策略和复制策略。

发送消息重试

消息发送时,如果使用了MessageQueueSelector,那消息发送的重试机制将会失效。

发送消息响应可能为以下四种:

public enum SendStatus {
    SEND_OK,
    FLUSH_DISK_TIMEOUT,
    FLUSH_SLAVE_TIMEOUT,
    SLAVE_NOT_AVAILABLE,
}

除了第一种,其他情况都是有问题的,为了保证消息不丢失,需要设置Producer参数:RetryAnotherBrokerWhenNotStoreOK为true。

故障规避机制

如果消息发送失败了,重试的时候,还是发送给这个Borker,那么大概率发送还是失败的,RockteMQ设计精巧之处在于,重试的时候,会自动避开这个Borker,而选择其他Borker,但是目前为止,异步发送没有那么智能,只会在一个Borker上重试,所以强烈建议选择同步发送方式。

RocketMQ提供了两种故障规避机制。用参数SendLatencyFaultEnable来控制。

  • false:默认值,只有在重试的时候,才会启用故障规避机制,比如发送消息给BorkerA失败了,重试的时候,会选择BorkerB,但是下次发送消息,还是会选择发送给BorkerA。
  • true:开启延迟退避机制,一旦消息发送给BorkerA失败,就会悲观的认为在一段时间内,BorkerA不可用,在将来的一段时间内,不会再向BorkerA发送消息。

延迟退避机制看起来很好用,但是一般来说Borker端繁忙,导致Borker不可用或者网络不可用只是一瞬间的事情,马上就可以恢复,如果开启了延迟退避机制,本来可用的Borker在一段时间内却被规避了,其他Borker更加繁忙,那可能情况更糟糕。

再谈谈Consumer

Consumer线程注意事项

Consumer有两个参数,可以消费的并行度,即ConsumeThreadMinConsumeThreadMax,看起来给人的感觉是,如果Consumer端堆积消息比较少,消费线程数为ConsumeThreadMin;如果Consumer端堆积消息比较多,就自动开启新的线程来消费,直到消费线程数为ConsumeThreadMax。但是并不是这样,Consumer内部持有一个线程池,选用的是无界队列,也就是ConsumeThreadMax参数是无效的,所以在实际开发中,ConsumeThreadMinConsumeThreadMax往往设置成一样。

ConsumeFromWhere

如果查询不到消费进度的时候,Consumer从哪里开始消费,RocketMQ支持从最新消息、最早消息、指定时间戳这三种方式进行消费。

消费消息重试

RocketMQ会为每个ConsumerGroup都设置一个Topic名称为%RETRY%+consumerGroup的重试队列,用来保存需要给ConsumerGroup重试的消息,但是重试需要一定的延时时间,RocketMQ对于重试消息的处理是先保存至Topic名称为SCHEDULE_TOPIC_XXXX的延迟队列中,后台定时任务按照对应的时间进行Delay后重新保存至%RETRY%+consumerGroup的重试队列中。

消息堆积、消费能力不够,怎么办

  • 提高消费进度,这是最好的办法。
  • 增加队列,增加Consumer。
  • 原先的Consumer作为搬砖工,根据一定的规则把消息“搬”到多个新的Topic,再开几个ConsumerGroup去消费不同的Topic。
  • 新开一个ConsumerGroup去消费,也就是两个ConsumerGroup同时消费一个Topic,但是需要注意offset的判断,比如一个ConsumerGroup消费offset为奇数的消息,一个ConsumerGroup消费offset为偶数的消息。

本来以为写扫盲文,应该会写的很顺,但是还是想多了,因为是扫盲文,面向的是没有怎么接触过RocketMQ的小伙伴,但是RocketMQ有没有那么简单,不可能用一篇博客,就让没有怎么接触过RocketMQ的小伙伴顺利入门,所以在写博客的时候,一直在想,这个东西重要吗,需要仔细描述吗;这个东西可以忽视,可以不介绍吗 等等,大家可以看到本文基本都是在介绍各种概念,几乎没有涉及到API的层面,因为一旦涉及到API,那么估计写两个星期也写不完。

End

相关免费学习推荐:java基础教程

以上就是终于来了...RocketMQ扫盲篇的详细内容,更多请关注其它相关文章!


推荐阅读
  • SpringBoot整合SpringSecurity+JWT实现单点登录
    SpringBoot整合SpringSecurity+JWT实现单点登录,Go语言社区,Golang程序员人脉社 ... [详细]
  • Android中高级面试必知必会,积累总结
    本文介绍了Android中高级面试的必知必会内容,并总结了相关经验。文章指出,如今的Android市场对开发人员的要求更高,需要更专业的人才。同时,文章还给出了针对Android岗位的职责和要求,并提供了简历突出的建议。 ... [详细]
  • [译]技术公司十年经验的职场生涯回顾
    本文是一位在技术公司工作十年的职场人士对自己职业生涯的总结回顾。她的职业规划与众不同,令人深思又有趣。其中涉及到的内容有机器学习、创新创业以及引用了女性主义者在TED演讲中的部分讲义。文章表达了对职业生涯的愿望和希望,认为人类有能力不断改善自己。 ... [详细]
  • 如何用UE4制作2D游戏文档——计算篇
    篇首语:本文由编程笔记#小编为大家整理,主要介绍了如何用UE4制作2D游戏文档——计算篇相关的知识,希望对你有一定的参考价值。 ... [详细]
  • 本文介绍了高校天文共享平台的开发过程中的思考和规划。该平台旨在为高校学生提供天象预报、科普知识、观测活动、图片分享等功能。文章分析了项目的技术栈选择、网站前端布局、业务流程、数据库结构等方面,并总结了项目存在的问题,如前后端未分离、代码混乱等。作者表示希望通过记录和规划,能够理清思路,进一步完善该平台。 ... [详细]
  • sklearn数据集库中的常用数据集类型介绍
    本文介绍了sklearn数据集库中常用的数据集类型,包括玩具数据集和样本生成器。其中详细介绍了波士顿房价数据集,包含了波士顿506处房屋的13种不同特征以及房屋价格,适用于回归任务。 ... [详细]
  • GPT-3发布,动动手指就能自动生成代码的神器来了!
    近日,OpenAI发布了最新的NLP模型GPT-3,该模型在GitHub趋势榜上名列前茅。GPT-3使用的数据集容量达到45TB,参数个数高达1750亿,训练好的模型需要700G的硬盘空间来存储。一位开发者根据GPT-3模型上线了一个名为debuid的网站,用户只需用英语描述需求,前端代码就能自动生成。这个神奇的功能让许多程序员感到惊讶。去年,OpenAI在与世界冠军OG战队的表演赛中展示了他们的强化学习模型,在限定条件下以2:0完胜人类冠军。 ... [详细]
  • svnWebUI:一款现代化的svn服务端管理软件
    svnWebUI是一款图形化管理服务端Subversion的配置工具,适用于非程序员使用。它解决了svn用户和权限配置繁琐且不便的问题,提供了现代化的web界面,让svn服务端管理变得轻松。演示地址:http://svn.nginxwebui.cn:6060。 ... [详细]
  • 前言折腾了一段时间hadoop的部署管理,写下此系列博客记录一下。为了避免各位做部署这种重复性的劳动,我已经把部署的步骤写成脚本,各位只需要按着本文把脚本执行完,整个环境基本就部署 ... [详细]
  • 本文介绍了Java工具类库Hutool,该工具包封装了对文件、流、加密解密、转码、正则、线程、XML等JDK方法的封装,并提供了各种Util工具类。同时,还介绍了Hutool的组件,包括动态代理、布隆过滤、缓存、定时任务等功能。该工具包可以简化Java代码,提高开发效率。 ... [详细]
  • 阿,里,云,物,联网,net,core,客户端,czgl,aliiotclient, ... [详细]
  • 如何提高PHP编程技能及推荐高级教程
    本文介绍了如何提高PHP编程技能的方法,推荐了一些高级教程。学习任何一种编程语言都需要长期的坚持和不懈的努力,本文提醒读者要有足够的耐心和时间投入。通过实践操作学习,可以更好地理解和掌握PHP语言的特异性,特别是单引号和双引号的用法。同时,本文也指出了只走马观花看整体而不深入学习的学习方式无法真正掌握这门语言,建议读者要从整体来考虑局部,培养大局观。最后,本文提醒读者完成一个像模像样的网站需要付出更多的努力和实践。 ... [详细]
  • Zookeeper为分布式环境提供灵活的协调基础架构。ZooKeeper框架支持许多当今最好的工业应用程序。我们将在本章中讨论ZooKeeper的一些最显着的应用。雅虎ZooKee ... [详细]
  • Java开发实战讲解!字节跳动三场技术面+HR面
    二、回顾整理阿里面试题基本就这样了,还有一些零星的问题想不起来了,答案也整理出来了。自我介绍JVM如何加载一个类的过程,双亲委派模型中有 ... [详细]
  • 原创 | 大数据入门基础系列之ClouderaManager版本的Hive安装部署
    添加服务,一 ... [详细]
author-avatar
张海英Bs
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有