作者:三砖先生 | 来源:互联网 | 2023-09-02 20:56
上一篇RabbitMQ入门之基础概念介绍了RabbitMQ的一些基础概念,本文再来介绍其中的一些细节和其它的进阶的概念。一、消息生产者发送的消息不可达时如何处理RabbitMQ提供
上一篇 RabbitMQ 入门之基础概念 介绍了 RabbitMQ 的一些基础概念,本文再来介绍其中的一些细节和其它的进阶的概念。
一、消息生产者发送的消息不可达时如何处理
RabbitMQ 提供了消息在传递过程中无法发送到一个队列(比如根据自己的类型和路由键没有找到匹配的队列)时将消息回传给消息发送方的功能,使用 RabbitMQ 的客户端提供 channel.basicPublish
方法的两个参数 mandatory
和 immediate
(RabbitMQ 3.0 以下版本),除此之外还提供了一个备份交换器可以将无法发送的消息存储起来处理,不用重新传回给发送方。
1.1 mandatory 参数
mandatory 被定义在 RabbitMQ 提供的客户端的 channel.basicPublish
方法中,如下所示:
当把方法的 mandatory 参数设置为 true
时,那么会在交换器无法根据自身的类型和路由键找到一个符合要求的队列时,RabbitMQ 会自动调用 Basic.Return
把该消息回传给发送方也就是我们的消息生产者。反之,如果设置为 false
的话,消息就会被直接丢弃掉。那么问题来了,我们要如何去获取这些没有被发送出去的消息呢?RabbitMQ 给我们提供了事件监听机制来获取这种消息,可以通过 addReturnListener
方法添加一个 ReturnListener
来获取这种未发送到队列的消息,如下所示:
通过查看 ReturnListener 接口的源码可以看到,该接口只有一个方法,如果是 JDK8+ 的版本的话可以使用 Lambda 表达式来简化一些代码。
可以看出,当设置了 mandatory 参数时,还必须为生产者同时添加 ReturnListener 监听器的编程逻辑,这样就会使得生产者的代码变得更加复杂了,为了处理这种情况,RabbitMQ 提供了 `备份交换器` 来将没有成功路由出去的消息存储起来,当我们需要的时候再去处理即可。
1.2 immediate 参数
该的参数同样也是在channel.basicPublish
方法中定义的,其官方描述如下:
This flag tells the server how to react if the message cannot be routed to a queue consumer immediately. If this flag is set, the server will return an undeliverable message with a Return method. If this flag is zero, the server will queue the message, but with no guarantee that it will ever be consumed.
当把 immediate 参数设置为 true 时,如果交换器根据其类型和路由键找到符合要求的队列时,发现所有队列上没有任何消费者,则该消息并不会存入到队列中,会通过 Basic.Return 命令把消息回传给生产者。简而言之也就是说,当设置了 immediate 参数时,该消息关联的队列上存在消费者时,会立即发送消息到该队列中,反之如果匹配的队列上不存在任何消费者,则直接把消息回传给生产者。这里有一点需要注意的是:从 RabbitMQ 3.0 + 已经去除了该参数。
二、如何对消息和队列设置过期时间 (TTL)
TTL 是 time to live 首字母的简称,RabbitMQ 中可以设置消息和队列的过期时间,我们先来看看要如何设置消息的过期时间。
1.1 消息 TTL 设置
RabbitMQ 提供了两种设置消息的过期时间,第一种是通过队列的属性设置,该方式的特点就是队列中所有消息的过期时间都一致。还有一种是更小粒度的设置,就是对每条消息单独设置过期时间,这种方式更加灵活,每条消息的过期时间都可以不一样。这是你可能会问,如果同时设置了队列的过期属性和消息本身的过期属性,最终以哪个为准呢?结果是 RabbitMQ 会比较这两个 TTL 的值大小,以较小的那个为准。很容易想到,通过队列的属性的方式设置过期时间的话是在声明队列的时候指定,对应到客户端就是其提供的 channel.queueDeclare
方法的参数 arguments 指定,示例代码如下:
需要注意的是 x-message-ttl
参数的单位是毫秒。如果不设置 TLL,则表示该消息不会过期,如果将 TTL 设置为 0,表示除非此时可以把消息直接发送投递到消费者端去,否则就会直接丢弃该消息。
准对每条消息设置 TTL 的方法是在发送消息的时候设置的,对应到客户端方法是 channel.basicPublish
的 expiration 属性参数,具体设置代码如下:
这种设置方式,即使队列过期也不会立即从队列中移除,因为每条消息是否过期的判定是在发送到消费者是才进行的,如果此时发现已经过期才会删除消息。而对于第一种方式则会把已经过期的消息移到队列头部,然后 RabbitMQ 只要定期的从头开始扫描是否存在过期的消息即可。
1.2 队列 TTL 设置
设置队列的过期时间使用的是客户端的 channel.queueDeclare
方法参数中的 x-expires
参数,其单位同样也是毫秒,不过需要注意的是它不能设置为 0。设置队列过期的代码如下所示:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-8S2FHXO6-1596969415042)(https://cdn.nlark.com/yuque/0/2020/png/285197/1596961384667-34a7e51a-5ecd-4d32-bfe2-e46ba173f91c.png?x-oss-process=image%2Fresize%2Cw_1500 “image.png”)]
上面代码创建了一个过期时间为 15 分钟的队列。
三、死信队列介绍
死信交换器(DLX)的全称是 Dead-Letter-Exchange
,也称之为死信邮箱。简单来说就是当一个消息由于 消息被拒绝
、 消息过期
、 队列达到最大长度
时,变成死信(dead message)之后,会被重新发送到一个交换器中,这个交换器就是死信交换器,绑定在这个交换器上的队列就称之为死信队列。死信交换器实际上就是平常的交换器,可以在任何队列上指定,当在一个队列上设置死信交换器后,如果该队列出现死信时就会被 RabbitMQ 把死信消息重新发送到死信交换器上去,然后路由到死信队列中,我们可以监听这个队列来处理那些死信消息。为一个队列设置死信交换器是在生产者的声明队列的方法中设置 x-dead-letter-message
参数来实现的,如下所示:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-uwevqar7-1596969415044)(https://cdn.nlark.com/yuque/0/2020/png/285197/1596962590228-f7da6c9d-ef06-4284-8873-7cffa6351329.png?x-oss-process=image%2Fresize%2Cw_1500 “image.png”)]
同时也可以通过 x-dead-letter-routing-key
参数设置死信交互器的路由键,不设置默认使用原始度列的路由键。可以到 RabbitMQ 的后台管理界面,有 DLX
标志的就是死信队列。
RabbitMQ 提供的 DLX 是个比较实用的功能特性,它可以在我们消息不能被消费者正确消费的情况下放入到死信队列,后续我们可以通过这个死信队列的内容来查看异常情况来改造和优化系统。
四、延迟队列介绍
顾名思义,延迟队列存储的是哪些需要等待指定时间后才能拿到的延迟消息,一个比较典型的场景就是订单 30 分钟后未支付取消订单。这里需要注意的是,在 RabbitMQ 中并没有直接提供延迟队列的功能,而是需要通过上面介绍的过期时间(TTL)和死信队列一起来实现,比如超时取消订单这个场景,我们可以让消费者订阅死信队列,设置正常的那个队列的超时时间为 30 分钟并绑定到该死信队列上,当消息超过 30 分钟未被处理后消息就会把发送到死信队列中,然后死信队列的消费者就可以在 30 分钟后成功的消费到该消息了。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-KVvvABGF-1596969415046)(https://cdn.nlark.com/yuque/0/2020/png/285197/1596964785505-925cb926-2429-4ccd-ac92-a6ebc49afc5e.png “image.png”)]
同时当我们有其它的超时配置需求时也很方便扩展,比如可以在生产者发送消息的时候通过设置不同的路由键,通过路由键来将消息发送到与交换器绑定的不同队列中,然后这些队列分别设置不同的过期时间和与之相对应的死信队列,当消息过期时就会被 RabbitMQ 转发到相应的死信队列中,这样就可以去订阅相应的死信队列即可。
五、交换器、消息和队列持久化
持久化可以提高可靠性,可以防止宕机或者重启等异常下数据的丢失,RabbitMQ 的持久化从组成结构上可以分为三个部分,即交换器持久化、消息持久化和队列持久化。
1.1 交换器持久化
交换器持久化是在声明交换器时将 durable 参数设置为 true 来实现的。如果不设置持久化属性的话,当 RabbitMQ 服务重启后交换器的数据就会丢失,需要注意的是,是交换器的数据丢失,消息不会丢失,只是不能将消息发送到这个交换器中了,一般生产环境使用都会把该属性设置为持久化。
1.2 消息持久化
交换器的持久化仅仅只是保证了交换器本身的元数据不会丢失,无法保证其存储的消息不会丢失,如果需要其内部存储的消息不丢失,则需要设置消息的持久化,通过将消息的投递模式(deliveryMode)设置为 2 即可实现消息的持久化,如下所示:
需要消息持久化的前提是其所在的队列也要设置持久化,假如仅仅只设置消息的持久化的话,RabbitMQ 重启之后队列消失,然后消息也会丢失。这里有点需要注意一下,虽然持久化可以提高可靠性,但是持久化是将数据存储到硬盘上,比直接操作内存要慢很多,所以对于哪些可靠性要求不高的业务不需要进行持久化。
1.3 队列持久化
队列的持久化的设置和交换器持久化类似,同样也是在声明的时候通过 durable 参数设置为 true 实现的,如果不设置,当 RabbitMQ 重启后,相关的队列元数据也会丢失,相应的其存储的消息也会随之丢失掉。
将交换器、队列、消息都设置了持久化之后就能百分之百保证数据不丢失了吗?其实无法保证百分之百数据不丢失。比如消费者在订阅消费队列时将自动应答(autoAck)参数设置为 true 的话,在接收到消息后还没来得及处理就挂了,这时需要把自动应答设置 false,进行手动 ack 应答即可。还有一个就是由于不是实时持久化存盘,当消息存盘的过程中 RabbitMQ 宕机了,此时也会发生数据丢失,此时需要通过 RabbitMQ 的 镜像队列机制
来处理了。
五、总结
本文主要介绍了一些参数具体使用时的设置细节和死信队列、延迟队列以及持久化等,还有一些比较重要的点没有涉及到,比如消息确认机制。“纸上得来终觉浅,绝知此事要躬行”,在了解一些基础的概念之后还是需要通过具体编码实践才能对其更加理解深刻。