Apache RocketMQ 自诞生以来,因其架构简单、业务功能丰富、具备极强可扩展性等特点被众多企业开发者以及云厂商广泛采用。历经十余年的大规模场景打磨,RocketMQ 已经成为业内共识的金融级可靠业务消息首选方案,被广泛应用于互联网、大数据、移动互联网、物联网等领域的业务场景。
RocketMQ 优点:
RocketMQ 缺点:
特性 | ActiveMq | RabbitMQ | RocketMQ | Kafka |
---|---|---|---|---|
开发语言 | java | erlang | java | scala |
单机吞吐量 | 万级 | 万级 | 10万级 | 10万级 |
时效性 | ms级 | us级 | ms级 | ms级 |
可用性 | 高(主从架构) | 高(主从架构) | 非常高(分布式架构) | 非常高(分布式架构) |
功能特性 | 成熟的产品,在很多公司得到应用;有较多的文档;各种协议支持较好 | 基于erlang开发,所以并发能力很强,性能极其好,延时很低;管理界面较丰富 | MQ功能比较完备,扩展性佳 | 只支持主要的MQ功能,像一些消息查询,消息回溯等功能没有提供,毕竟是为大数据准备的,在大数据领域应用广。 |
在了解rocketmq之前有必要对rocketmq的相关概念混个脸熟
Apache RocketMQ 中消息传输和存储的顶层容器,用于标识同一类业务逻辑的消息。主题通过TopicName来做唯一标识和区分。
Apache RocketMQ 中按照消息传输特性的不同而定义的分类,用于类型管理和安全校验。 Apache RocketMQ 支持的消息类型有普通消息、顺序消息、事务消息和定时/延时消息。
队列是 Apache RocketMQ 中消息存储和传输的实际容器,也是消息的最小存储单元。 Apache RocketMQ 的所有主题都是由多个队列组成,以此实现队列数量的水平拆分和队列内部的流式存储。队列通过QueueId来做唯一标识和区分。
消息是 Apache RocketMQ 中的最小数据传输单元。生产者将业务数据的负载和拓展属性包装成消息发送到服务端,服务端按照相关语义将消息投递到消费端进行消费。
消息视图是 Apache RocketMQ 面向开发视角提供的一种消息只读接口。通过消息视图可以读取消息内部的多个属性和负载信息,但是不能对消息本身做任何修改。
消息标签是Apache RocketMQ 提供的细粒度消息分类属性,可以在主题层级之下做消息类型的细分。消费者通过订阅特定的标签来实现细粒度过滤。
消息是按到达Apache RocketMQ 服务端的先后顺序存储在指定主题的多个队列中,每条消息在队列中都有一个唯一的Long类型坐标,这个坐标被定义为消息位点。
一条消息被某个消费者消费完成后不会立即从队列中删除,Apache RocketMQ 会基于每个消费者分组记录消费过的最新一条消息的位点,即消费位点。
消息索引是Apache RocketMQ 提供的面向消息的索引属性。通过设置的消息索引可以快速查找到对应的消息内容。
生产者是Apache RocketMQ 系统中用来构建并传输消息到服务端的运行实体。生产者通常被集成在业务系统中,将业务消息按照要求封装成消息并发送至服务端。
Apache RocketMQ 中生产者用来执行本地事务检查和异常事务恢复的监听器。事务检查器应该通过业务侧数据的状态来检查和判断事务消息的状态。
Apache RocketMQ 中事务消息发送过程中,事务提交的状态标识,服务端通过事务状态控制事务消息是否应该提交和投递。事务状态包括事务提交、事务回滚和事务未决。
消费者分组是Apache RocketMQ 系统中承载多个消费行为一致的消费者的负载均衡分组。和消费者不同,消费者分组并不是运行实体,而是一个逻辑资源。在 Apache RocketMQ 中,通过消费者分组内初始化多个消费者实现消费性能的水平扩展以及高可用容灾。
消费者是Apache RocketMQ 中用来接收并处理消息的运行实体。消费者通常被集成在业务系统中,从服务端获取消息,并将消息转化成业务可理解的信息,供业务逻辑处理。
Apache RocketMQ 中PushConsumer消费监听器处理消息完成后返回的处理结果,用来标识本次消息是否正确处理。消费结果包含消费成功和消费失败。
订阅关系是Apache RocketMQ 系统中消费者获取消息、处理消息的规则和状态配置。订阅关系由消费者分组动态注册到服务端系统,并在后续的消息传输中按照订阅关系定义的过滤规则进行消息匹配和消费进度维护。
消费者可以通过订阅指定消息标签(Tag)对消息进行过滤,确保最终只接收被过滤后的消息合集。过滤规则的计算和匹配在Apache RocketMQ 的服务端完成。
以时间轴为坐标,在消息持久化存储的时间范围内,重新设置消费者分组对已订阅主题的消费进度,设置完成后消费者将接收设定时间点之后,由生产者发送到Apache RocketMQ 服务端的消息。
在一条消息从生产者发出到消费者接收并处理过程中,由各个相关节点的时间、地点等数据汇聚而成的完整链路信息。通过消息轨迹,您能清晰定位消息从生产者发出,经由Apache RocketMQ 服务端,投递给消费者的完整链路,方便定位排查问题。
生产者已经将消息发送到Apache RocketMQ 的服务端,但由于消费者的消费能力有限,未能在短时间内将所有消息正确消费掉,此时在服务端保存着未被消费的消息,该状态即消息堆积。
事务消息是Apache RocketMQ 提供的一种高级消息类型,支持在分布式场景下保障消息生产和本地事务的最终一致性。
定时/延时消息是Apache RocketMQ 提供的一种高级消息类型,消息被发送至服务端后,在指定时间后才能被消费者消费。通过设置一定的定时时间可以实现分布式场景的延时调度触发效果。
顺序消息是Apache RocketMQ 提供的一种高级消息类型,支持消费者按照发送消息的先后顺序获取消息,从而实现业务场景中的顺序处理。
Apache RocketMQ 是一款典型的分布式架构下的中间件产品,使用异步通信方式和发布订阅的消息传输模型。 Apache RocketMQ 产品具备异步通信的优势,系统拓扑简单、上下游耦合较弱,主要应用于异步解耦,流量削峰填谷等场景。
如上图(官方图)所示,Apache RocketMQ 中消息的生命周期主要分为消息生产、消息存储、消息消费这三部分。
生产者生产消息并发送至 Apache RocketMQ 服务端,消息被存储在服务端的主题中,消费者通过订阅主题消费消息。
分布式系统架构思想下,将复杂系统拆分为多个独立的子模块,例如微服务模块。此时就需要考虑子模块间的远程通信,典型的通信模式分为以下两种:
一种是同步的RPC远程调用
同步RPC调用模型下,不同系统之间直接进行调用通信,每个请求直接从调用方发送到被调用方,然后要求被调用方立即返回响应结果给调用方,以确定本次调用结果是否成功。 注意 此处的同步并不代表RPC的编程接口方式,RPC也可以有异步非阻塞调用的编程方式,但本质上仍然是需要在指定时间内得到目标端的直接响应。
一种是基于中间件代理的异步通信方式
异步消息通信模式下,各子系统之间无需强耦合直接连接,调用方只需要将请求转化成异步事件(消息)发送给中间代理,发送成功即可认为该异步链路调用完成,剩下的工作中间代理会负责将事件可靠通知到下游的调用系统,确保任务执行完成。该中间代理一般就是消息中间件。
异步通信的优势
主流的消息中间件的传输模型主要为点对点模型和发布订阅模型。
点对点模型
点对点模型也叫队列模型,具有如下特点:
发布订阅模型
发布订阅模型具有如下特点:
点对点模型和发布订阅模型对比
点对点模型和发布订阅模型各有优势,点对点模型更为简单,而发布订阅模型的扩展性更高。 Apache RocketMQ 使用的传输模型为发布订阅模型,因此也具有发布订阅模型的特点。
NameServer: NameServer 是专为 RocketMQ 设计的轻量级名称服务,具有简单、可集群横吐扩展、无状态,节点之间互不通信等特点。
Broker集群: Broker用于接收生产者发送消息,或者消费者消费消息的请求。一个Broker集群由多组Master/Slave组成,Master可写可读,Slave只可以读,Master将写入的数据同步给Slave。每个Broker节点,在启动时,都会遍历NameServer列表,与每个NameServer建立长连接,注册自己的信息,之后定时上报。
Producer集群: 消息的生产者,通过NameServer集群获得Topic的路由信息,包括Topic下面有哪些Queue,这些Queue分布在哪些Broker上等。Producer只会将消息发送到Master节点上,因此只需要与Master节点建立连接。
Consumer集群: 消息的消费者,通过NameServer集群获得Topic的路由信息,连接到对应的Broker上消费消息。注意,由于Master和Slave都可以读取消息,因此Consumer会与Master和Slave都建立连接。
我们以windows机器为例,linux也是一样
下载地址:
https://archive.apache.org/dist/rocketmq/5.0.0/
https://www.apache.org/dyn/closer.cgi?path=rocketmq/5.0.0/rocketmq-all-5.0.0-bin-release.zip
我们选择5.0解压缩版本
放到自己的工作目录解压:
配置环境变量
修改堆内存大小(这个可以根据机器适当调整,也可不做调整)
进去到bin目录,修改runserver.cmd 和 runbroker.cmd 的启动脚本
在bin目录下进入cmd模式,执行 start mqnamesrv.cmd
我们可以看到启动之后报了一个错,找不到或无法加载主类
解决办法:
编辑runbroker.cmd 为 %CLASSPATH% 增加 双引号
大家需要注意一点,rocketmq的安装路径一定不要有空格,否则还会提示找不到主类(我上面的截图中因为有空格,所以又失败了,大家也检查下自己的安装路径)
再次启动:
这次成功了
bin目录下进入cmd
执行如下命令 (autoCreateTopicEnable=true 允许自动创建topic)
mqbroker.cmd -n localhost:9876 autoCreateTopicEnable=true
看到如下提示标识启动成功
rocketmq-console下载地址:
https://github.com/apache/rocketmq-externals
修改application.properties配置文件:
启动项目:
1、执行命令进行打包(首先电脑要安装maven并配置环境变量)
mvn clean package -Dmaven.test.skip=true
看到目录下生成了target文件夹表示打包成功
进到target目录,
执行启动命令
java -jar rocketmq-console-ng-1.0.0.jar
浏览器访问 http://localhost:8080/#/
如果没有主题先新增主题,点击发送消息
执行命令,启动消费者监听
set NAMESRV_ADDR=localhost:9876
tools.cmd org.apache.rocketmq.example.quickstart.Consumer
可以看到我们上面发送的消息
管理端查看消息:
分割线-----------------------------------------------------------------------------------------------------------
创作不易,三连支持一下吧 👍
最后的最后送大家一句话
白驹过隙,沧海桑田
与君共勉