1.为什么要进行消息确认?
2.rabbitmq消息确认 机制是什么样的?
3.发送方如何确认消息发送成功?什么样才算发送成功?
4.消费方如何告知rabbitmq消息消费成功或失败?
经常会听到丢消息的字眼, 对于前面的demo来说,就存在丢消息的隐患.
发送者没法确认是否发送成功,消费者处理失败也无法反馈.
没有消息确认机制,就会出现消息莫名其妙的没了,也不知道什么情况.
首先看官网对消息确认的介绍http://www.rabbitmq.com/confirms.html
Networks can fail in less-than-obvious ways and detecting some failures takes time. Therefore a client that’s written a protocol frame or a set of frames (e.g. a published message) to its socket cannot assume that the message has reached the server and was successfully processed. It could have been lost along the way or its delivery can be significantly delayed.
网络可能以不太明显的方式发生故障,检测一些故障需要时间。因此,客户端在其套接字中写入了一个或一组协议帧(例如,发布的消息),不能假定消息已经到达服务器并被成功处理。它可能在途中丢失,也可能在运送过程中被严重延迟。
Using standard AMQP 0-9-1, the only way to guarantee that a message isn’t lost is by using transactions – make the channel transactional then for each message or set of messages publish, commit. In this case, transactions are unnecessarily heavyweight and decrease throughput by a factor of 250. To remedy this, a confirmation mechanism was introduced. It mimics the consumer acknowledgements mechanism already present in the protocol.
使用标准的AMQP 0-9-1,保证消息不丢失的唯一方法是使用事务——然后为每个消息或一组消息发布、提交使通道事务性。在这种情况下,事务是不必要的重量级,并将吞吐量降低250倍。为了解决这个问题,引入了一种确认机制。它模仿协议中已经存在的消费者确认机制
To enable confirms, a client sends the confirm.select method. Depending on whether no-wait was set or not, the broker may respond with a confirm.select-ok. Once the confirm.select method is used on a channel, it is said to be in confirm mode. A transactional channel cannot be put into confirm mode and once a channel is in confirm mode, it cannot be made transactional.
为了启用confirm,客户端发送confirm.select方法。根据是否设置了no-wait,代理可能会响应confirm.select-ok。一旦确认,在通道上使用confirm.select,称为处于确认模式。事务通道不能设置为确认模式,并且一旦通道设置为确认模式,就不能设置为事务通道,简言之:实物和确认机制不能同时使用
Once a channel is in confirm mode, both the broker and the client count messages (counting starts at 1 on the first confirm.select). The broker then confirms messages as it handles them by sending a basic.ack on the same channel. The delivery-tag field contains the sequence number of the confirmed message. The broker may also set the multiple field in basic.ack to indicate that all messages up to and including the one with the sequence number have been handled.
一旦通道处于confirm模式,broker 和client 都会对消息进行计数(在第一个confirm.select上从1开始计数)。然后,broker 在处理消息时通过发送 basic.ack 。在同一个频道。delivery-tag字段包含已确认消息的序列号。代理也可以在basic中设置多个字段。ack来指示直到并包括具有序列号的消息在内的所有消息都已处理
点这里看官方的例子
判断消息成功或失败,其实就是看进行消息确认的时机,因为成功或失败后就会把结果告诉发送方
When Will Published Messages Be Confirmed by the Broker?
For unroutable messages, the broker will issue a confirm once the exchange verifies a message won’t route to any queue (returns an empty list of queues). If the message is also published as mandatory, the basic.return is sent to the client before basic.ack. The same is true for negative acknowledgements (basic.nack).
对于不可路由的消息,一旦交换器验证消息不会路由到任何队列(返回空队列列表),broker 将发出确认。如果消息也被强制发布,则 basic.return在basic.ack之前发送给客户端。否定的确认也是如此(basic.nack)。
For routable messages, the basic.ack is sent when a message has been accepted by all the queues. For persistent messages routed to durable queues, this means persisting to disk. For mirrored queues, this means that all mirrors have accepted the message.
对于可路由消息, 当消息被所有队列接受时发送basic.ack。对于路由到持久队列的持久消息,这意味着持久化到磁盘。对于镜像队列,这意味着所有镜像都已接受消息
根据消费方不同的确认模式,确认时机也不同.
自动确认会在消息发送给消费者后立即确认,如果手动则当消费者调用ack,nack,reject几种方法时进行确认.
一般会设置手动模式,业务失败后可以进行一些操作.
上面我们从官网上了解到,消息生产之发送到队列很难保证消息一定能成功发送:这样我们需要数据准确性或者安全性来说我们需要一定的机制处理,官网上也说了 ,使用事务完全可以帮我们解决这类的问题,并且rabbitmq也支持事务,但是事务会导致rabbitmq并发量下降250倍,这也就是为啥工作中我们从来不用事务的原因,既然不用我们就不研究了我们看看其他的机制:
总结:
confirm 机制:
2.Return Listener用于处理一些不可路由的消息;
消息生产者,通过指定一个Exchange和Routingkey,把消息送达到某一个队列中去,然后消费者监听队列,进行消费处理操作;
但某些情况下,如果我们在发送消息的时候,当前的exchange不存在或者指定的路由key路由不到,这个时候如果我们需要监听这种不可达的消息,就需要Return Listener!
发送方确认开启:
其实代码在上面配置连接的时候已经放出来了 就是在连接工厂那被注释的一行代码 :
connectionFactory.setPublisherConfirms(true);
如果是yml配置的话:
spring:rabbitmq:publisher-confirms: true
实现一个回调类
public class MyConfirmCallback implements RabbitTemplate.ConfirmCallback {Logger logger = LoggerFactory.getLogger(MyConfirmCallback.class);@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {System.out.println(ack);logger.info("confirmcallback:correlationData={} ,ack={} , cause= {}",correlationData,ack,cause);}
}
在RabbitmqTemplate 设置一下
template.setConfirmCallback(new MyConfirmCallback());
而且我们可以在发送消息的时候附带一个CorrelationData参数 这个对象可以设置一个id,可以是你的业务id 方便进行对应的操作
public void testSend() {//至于为什么调用这个API 后面会解释//参数介绍: 交换机名字,路由建, 消息内容rabbitTemplate.convertAndSend("directExchange", "direct.key", "hello11",new CorrelationData(UUID.randomUUID().toString()));}
效果打印出来:
true
2021-02-06 10:20:09.452 INFO 37444 --- [nectionFactory1] c.w.r.callback.MyConfirmCallback : confirmcallback:correlationData=CorrelationData [id=16e3e463-2fb2-4922-9148-41d964175d86] ,ack=true , cause= null
注意 使用失败回调也需要开启发送方确认模式 开启方式在下文
更改RabbitmqTemplate:
//开启mandatory模式(开启失败回调)template.setMandatory(true);//指定失败回调接口的实现类template.setReturnCallback(new MyReturnCallback());
回调接口的实现类:
实现RabbitTemplate.ReturnCallback里面的returnedMessage方法即可 他会吧相关的参数都传给你
public class MyReturnCallback implements RabbitTemplate.ReturnCallback {Logger logger = LoggerFactory.getLogger(MyReturnCallback.class);@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {logger.info("message={} , repylCode={},replyText={}, exchange={},routingkey={}",message,replyCode,replyText,exchange,routingKey);}
}
这里模拟一个失败的发送 : 当指定的交换机不能吧消息路由到队列时(没有指定路由建或者指定的路右键没有绑定对应的队列 或者压根就没有绑定队列都会失败) 消息就会发送失败 效果:
2021-02-06 10:34:44.456 INFO 60952 --- [nectionFactory1] c.w.r.callback.MyReturnCallback : message=(Body:'hello11' MessageProperties [headers={spring_returned_message_correlation=2887149f-b208-4d07-b512-75283fc201a9}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]) , repylCode=312,replyText=NO_ROUTE, exchange=directExchange,routingkey=direct.key111
true
2021-02-06 10:34:44.459 INFO 60952 --- [nectionFactory2] c.w.r.callback.MyConfirmCallback : confirmcallback:correlationData=CorrelationData [id=2887149f-b208-4d07-b512-75283fc201a9] ,ack=true , cause= null
消息的可靠投递,100% 的保证消息不丢失(大厂高频面试)?
什么是生产端的可靠性投递?
保证消息的成功发出
保证mq节点的成功接收
发送端收到Mq节点确认应答(confirm机制)
以上3点,也不能够确保消息的100%投递成功我们需要自己设计补偿机制
如何设置
方案一:
方案一: 优点比较简单,定时任务跑下发送的消息状态即可
缺点也很明显,对数据库访问比较多,mq的并非一般比较高,io将成为系统的瓶颈
方案二:
为什么要确认消费? 默认情况下 消费者在拿到rabbitmq的消息时 已经自动确认这条消息已经消费了, 讲白话就是rabbitmq的队列里就会删除这条消息了, 但是 我们实际开发中 难免会遇到这种情况, 比如说 拿到这条消息 发现我处理不了 比如说 参数不对, 又比如说 我当前这个系统出问题了, 暂时不能处理这个消息, 但是 这个消息已经被你消费掉了 rabbitmq的队列里也删除掉了, 你自己这边又处理不了, 那么 ,这个消息就被遗弃了。 这种情况在实际开发中是不合理的, rabbitmq提供了解决这个问题的方案, 也就是我们上面所说的confirm模式 只是我们刚刚讲的是发送方的 这次我们来讲消费方的。
设置一下消息确认为手动确认:
当然 我们要对我们的消费者监听器进行一定的配置的话, 我们需要先实例一个监听器的Container 也就是容器, 那么我们的监听器(一个消费者里面可以实例多个监听器) 可以指定这个容器 那么我们只需要对这个Container(容器) 进行配置就可以了
首先得声明一个容器并且在容器里面指定消息确认为手动确认:
@Beanpublic SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory(ConnectionFactory connectionFactory){SimpleRabbitListenerContainerFactory s = new SimpleRabbitListenerContainerFactory();//connectionFactory 这个我们自己手工配置的连接工厂注入进来s.setConnectionFactory(connectionFactory);//设置确认模式 NONE MANUAL AUTO//无确认模式 NONE//有确认 确认方式:MANUAL 手工 AUTO 自动s.setAcknowledgeMode(AcknowledgeMode.MANUAL);return s;}
消费端修改
package com.hrp.mq.springboot;import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.io.IOException;
import java.util.Random;/*** ecs** @Title: com.hrp.mq.springboot* @Date: 2020/8/6 20:16* @Author: wfg* @Description:* @Version:*/
@Component
public class AckConsumer {//containerFactory:指定我们刚刚配置的容器// @RabbitListener(queues = "testQueue1",containerFactory = "simpleRabbitListenerContainerFactory")public void toMessage(Message message, Channel channel) throws IOException {System.out.println("消费者1============="+new String(message.getBody()));//这里根据自己的业务是否成功选择是否确认if (doSomthing()){/*** 参数1: 消息标识, 这个我们通过Message 获取就可以,这个是有,RabbitMq来维护的* 参数2: 是否批量处理*/channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}else {/*** 当然 如果这个订单处理失败了 我们也需要告诉rabbitmq 告诉他这条消息处理失败了 可以退回* 也可以遗弃 要注意的是 无论这条消息成功与否 一定要通知 就算失败了 如果不通知的话 rabbitmq端会显示这条* 消息一直处于未确认状态,那么这条消息就会一直堆积在rabbitmq端 除非与rabbitmq断开连接 那么他就会把这条* 消息重新发给别人 所以 一定要记得通知!* 前两个参数 和上面的意义一样, 最后一个参数 就是这条消息是返回到原队列 还是这条消息作废* 就是不退回了。*/channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);}}public boolean doSomthing(){boolean flag= new Random().nextBoolean();System.out.println(flag);return flag;}//containerFactory:指定我们刚刚配置的容器//@RabbitListener(queues = "testQueue1",containerFactory = "simpleRabbitListenerContainerFactory")public void toMessage1(Message message, Channel channel) throws IOException {System.out.println("消费者2=============" + new String(message.getBody()));//这里根据自己的业务是否成功选择是否确认/*** 参数1: 消息标识, 这个我们通过Message 获取就可以,这个是有,RabbitMq来维护的* 参数2: 是否批量处理*/channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}@RabbitListener(queues = "testQueue",containerFactory = "simpleRabbitListenerContainerFactory")public void toMessage2(Message message, Channel channel) throws IOException {System.out.println("消费者1============="+new String(message.getBody()));//这里根据自己的业务是否成功选择是否确认if (false){/*** 参数1: 消息标识, 这个我们通过Message 获取就可以,这个是有,RabbitMq来维护的* 参数2: 是否批量处理*/channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}else {/*** 当然 如果这个订单处理失败了 我们也需要告诉rabbitmq 告诉他这条消息处理失败了 可以退回* 也可以遗弃 要注意的是 无论这条消息成功与否 一定要通知 就算失败了 如果不通知的话 rabbitmq端会显示这条* 消息一直处于未确认状态,那么这条消息就会一直堆积在rabbitmq端 除非与rabbitmq断开连接 那么他就会把这条* 消息重新发给别人 所以 一定要记得通知!* 前两个参数 和上面的意义一样, 最后一个参数 就是这条消息是返回到原队列 还是这条消息作废* 就是不退回了。*/System.out.println("消息退回了==========");channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);}}}
我们可以观察rabbitmq的管理页面以及控制台打印有助于我们理解确认模式,以及退回消息