本文详细介绍了如何在 Spring Boot 项目中使用 RabbitMQ 的消息确认机制,包括消息发送确认和消息接收确认。希望通过本文能够帮助开发者解决在实际操作中可能遇到的问题。
一、环境准备
首先,需要在项目中添加 Spring Boot AMQP 依赖:
org.springframework.boot
spring-boot-starter-amqp
然后,配置 RabbitMQ 连接信息和消息确认机制:
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
# 开启发送者确认机制
spring.rabbitmq.publisher-cOnfirms=true
# 开启发送者返回机制
spring.rabbitmq.publisher-returns=true
# 设置消费端手动 ACK
spring.rabbitmq.listener.simple.acknowledge-mode=manual
# 是否支持重试
spring.rabbitmq.listener.simple.retry.enabled=true
定义交换机和队列,并将队列绑定到交换机上:
@Configuration
public class QueueConfig {
@Bean(name = "confirmTestQueue")
public Queue confirmTestQueue() {
return new Queue("confirm_test_queue", true, false, false);
}
@Bean(name = "confirmTestExchange")
public FanoutExchange confirmTestExchange() {
return new FanoutExchange("confirmTestExchange");
}
@Bean
public Binding confirmTestFanoutExchangeAndQueue(
@Qualifier("confirmTestExchange") FanoutExchange confirmTestExchange,
@Qualifier("confirmTestQueue") Queue confirmTestQueue) {
return BindingBuilder.bind(confirmTestQueue).to(confirmTestExchange);
}
}
RabbitMQ 的消息确认分为两部分:发送消息确认和消息接收确认。
二、消息发送确认
当消息被 RabbitMQ Broker 接收到时,会触发 ConfirmCallback 回调:
@Slf4j
@Component
public class ConfirmCallbackService implements RabbitTemplate.ConfirmCallback {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (!ack) {
log.error("消息发送异常!");
} else {
log.info("发送者已收到确认,correlatiOnData={}, ack={}, cause={}", correlationData.getId(), ack, cause);
}
}
}
实现 ConfirmCallback 接口,重写 confirm() 方法,参数包括 correlationData、ack 和 cause:
- correlationData:包含消息的唯一标识 ID。
- ack:消息是否成功投递到 Broker,true 表示成功。
如果消息未能投递到目标队列,将触发 ReturnCallback 回调:
@Slf4j
@Component
public class ReturnCallbackService implements RabbitTemplate.ReturnCallback {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.info("returnedMessage ===> replyCode={}, replyText={}, exchange={}, routingKey={}", replyCode, replyText, exchange, routingKey);
}
}
实现 ReturnCallback 接口,重写 returnedMessage() 方法,参数包括 message、replyCode、replyText、exchange 和 routingKey。
具体的消息发送代码如下:
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private ConfirmCallbackService confirmCallbackService;
@Autowired
private ReturnCallbackService returnCallbackService;
public void sendMessage(String exchange, String routingKey, Object msg) {
rabbitTemplate.setMandatory(true);
rabbitTemplate.setConfirmCallback(confirmCallbackService);
rabbitTemplate.setReturnCallback(returnCallbackService);
rabbitTemplate.convertAndSend(exchange, routingKey, msg,
message -> {
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
return message;
},
new CorrelationData(UUID.randomUUID().toString()));
}
三、消息接收确认
@Slf4j
@Component
@RabbitListener(queues = "confirm_test_queue")
public class ReceiverMessage1 {
@RabbitHandler
public void processHandler(String msg, Channel channel, Message message) throws IOException {
try {
log.info("收到消息:{}", msg);
// 具体业务逻辑
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
if (message.getMessageProperties().getRedelivered()) {
log.error("消息已重复处理失败,拒绝再次接收...");
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // 拒绝消息
} else {
log.error("消息即将再次返回队列处理...");
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
}
}
}
消费消息有三种回执方法:
- basicNack:表示失败确认,消息将重新入队列。
- basicReject:拒绝消息,不能进行批量操作。
具体方法如下:
void basicAck(long deliveryTag, boolean multiple)
void basicNack(long deliveryTag, boolean multiple, boolean requeue)
void basicReject(long deliveryTag, boolean requeue)
四、测试
通过上述配置和代码,可以进行消息发送和接收的测试,验证消息确认机制的有效性。
五、常见问题及解决方案
开启消息确认机制时,消费消息别忘了 channel.basicAck,否则消息会一直存在,导致重复消费。
在处理业务逻辑时,如果发生异常,消息会被无限投递进队列,导致死循环。解决方案是先将消息进行应答,然后重新发送消息到队列尾部,保证消息不会丢失且正常业务可以继续进行。
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
// 重新发送消息到队尾
channel.basicPublish(message.getMessageProperties().getReceivedExchange(),
message.getMessageProperties().getReceivedRoutingKey(), MessageProperties.PERSISTENT_TEXT_PLAIN,
JSON.toJSONBytes(msg));
为了进一步优化,可以设置消息重试次数,达到重试上限后手动确认并删除消息,将消息持久化到 MySQL 并推送报警,进行人工处理和定时任务补偿。
如何保证 MQ 的消费是幂等性的,需要根据具体业务而定,可以通过 MySQL 或 Redis 将消息持久化,并通过消息中的唯一性属性进行校验。
至此,关于如何使用 Spring Boot + RabbitMQ 消息确认机制的学习就结束了,希望能够解决大家的疑惑。理论与实践相结合,能更好地帮助大家掌握这一技能,快去试试吧!若想继续学习更多相关知识,请继续关注编程笔记网站,小编会继续努力为大家带来更多实用的文章!