热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

SpringBoot+RabbitMQ消息确认机制详解

本文详细介绍如何在SpringBoot项目中使用RabbitMQ的消息确认机制,包括消息发送确认和消息接收确认,帮助开发者解决在实际操作中可能遇到的问题。

本文详细介绍了如何在 Spring Boot 项目中使用 RabbitMQ 的消息确认机制,包括消息发送确认和消息接收确认。希望通过本文能够帮助开发者解决在实际操作中可能遇到的问题。

一、环境准备

首先,需要在项目中添加 Spring Boot AMQP 依赖:


  org.springframework.boot
  spring-boot-starter-amqp

然后,配置 RabbitMQ 连接信息和消息确认机制:

spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
# 开启发送者确认机制
spring.rabbitmq.publisher-cOnfirms=true
# 开启发送者返回机制
spring.rabbitmq.publisher-returns=true
# 设置消费端手动 ACK
spring.rabbitmq.listener.simple.acknowledge-mode=manual
# 是否支持重试
spring.rabbitmq.listener.simple.retry.enabled=true

定义交换机和队列,并将队列绑定到交换机上:

@Configuration
public class QueueConfig {

  @Bean(name = "confirmTestQueue")
  public Queue confirmTestQueue() {
    return new Queue("confirm_test_queue", true, false, false);
  }

  @Bean(name = "confirmTestExchange")
  public FanoutExchange confirmTestExchange() {
    return new FanoutExchange("confirmTestExchange");
  }

  @Bean
  public Binding confirmTestFanoutExchangeAndQueue(
      @Qualifier("confirmTestExchange") FanoutExchange confirmTestExchange,
      @Qualifier("confirmTestQueue") Queue confirmTestQueue) {
    return BindingBuilder.bind(confirmTestQueue).to(confirmTestExchange);
  }
}

RabbitMQ 的消息确认分为两部分:发送消息确认和消息接收确认。

Spring Boot + RabbitMQ 消息确认机制

二、消息发送确认

当消息被 RabbitMQ Broker 接收到时,会触发 ConfirmCallback 回调:

@Slf4j
@Component
public class ConfirmCallbackService implements RabbitTemplate.ConfirmCallback {

  @Override
  public void confirm(CorrelationData correlationData, boolean ack, String cause) {
    if (!ack) {
      log.error("消息发送异常!");
    } else {
      log.info("发送者已收到确认,correlatiOnData={}, ack={}, cause={}", correlationData.getId(), ack, cause);
    }
  }
}

实现 ConfirmCallback 接口,重写 confirm() 方法,参数包括 correlationData、ack 和 cause:

  • correlationData:包含消息的唯一标识 ID。
  • ack:消息是否成功投递到 Broker,true 表示成功。
  • cause:投递失败的原因。

如果消息未能投递到目标队列,将触发 ReturnCallback 回调:

@Slf4j
@Component
public class ReturnCallbackService implements RabbitTemplate.ReturnCallback {

  @Override
  public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
    log.info("returnedMessage ===> replyCode={}, replyText={}, exchange={}, routingKey={}", replyCode, replyText, exchange, routingKey);
  }
}

实现 ReturnCallback 接口,重写 returnedMessage() 方法,参数包括 message、replyCode、replyText、exchange 和 routingKey。

具体的消息发送代码如下:

@Autowired
private RabbitTemplate rabbitTemplate;

@Autowired
private ConfirmCallbackService confirmCallbackService;

@Autowired
private ReturnCallbackService returnCallbackService;

public void sendMessage(String exchange, String routingKey, Object msg) {
  rabbitTemplate.setMandatory(true);
  rabbitTemplate.setConfirmCallback(confirmCallbackService);
  rabbitTemplate.setReturnCallback(returnCallbackService);
  rabbitTemplate.convertAndSend(exchange, routingKey, msg,
      message -> {
        message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
        return message;
      },
      new CorrelationData(UUID.randomUUID().toString()));
}

三、消息接收确认

@Slf4j
@Component
@RabbitListener(queues = "confirm_test_queue")
public class ReceiverMessage1 {

  @RabbitHandler
  public void processHandler(String msg, Channel channel, Message message) throws IOException {
    try {
      log.info("收到消息:{}", msg);
      // 具体业务逻辑
      channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    } catch (Exception e) {
      if (message.getMessageProperties().getRedelivered()) {
        log.error("消息已重复处理失败,拒绝再次接收...");
        channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // 拒绝消息
      } else {
        log.error("消息即将再次返回队列处理...");
        channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
      }
    }
  }
}

消费消息有三种回执方法:

  • basicAck:表示成功确认,消息将被删除。
  • basicNack:表示失败确认,消息将重新入队列。
  • basicReject:拒绝消息,不能进行批量操作。

具体方法如下:

void basicAck(long deliveryTag, boolean multiple)
void basicNack(long deliveryTag, boolean multiple, boolean requeue)
void basicReject(long deliveryTag, boolean requeue)

四、测试

通过上述配置和代码,可以进行消息发送和接收的测试,验证消息确认机制的有效性。

五、常见问题及解决方案

开启消息确认机制时,消费消息别忘了 channel.basicAck,否则消息会一直存在,导致重复消费。

在处理业务逻辑时,如果发生异常,消息会被无限投递进队列,导致死循环。解决方案是先将消息进行应答,然后重新发送消息到队列尾部,保证消息不会丢失且正常业务可以继续进行。

channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
// 重新发送消息到队尾
channel.basicPublish(message.getMessageProperties().getReceivedExchange(),
    message.getMessageProperties().getReceivedRoutingKey(), MessageProperties.PERSISTENT_TEXT_PLAIN,
    JSON.toJSONBytes(msg));

为了进一步优化,可以设置消息重试次数,达到重试上限后手动确认并删除消息,将消息持久化到 MySQL 并推送报警,进行人工处理和定时任务补偿。

如何保证 MQ 的消费是幂等性的,需要根据具体业务而定,可以通过 MySQL 或 Redis 将消息持久化,并通过消息中的唯一性属性进行校验。

至此,关于如何使用 Spring Boot + RabbitMQ 消息确认机制的学习就结束了,希望能够解决大家的疑惑。理论与实践相结合,能更好地帮助大家掌握这一技能,快去试试吧!若想继续学习更多相关知识,请继续关注编程笔记网站,小编会继续努力为大家带来更多实用的文章!


推荐阅读
  • Spring Boot 中配置全局文件上传路径并实现文件上传功能
    本文介绍如何在 Spring Boot 项目中配置全局文件上传路径,并通过读取配置项实现文件上传功能。通过这种方式,可以更好地管理和维护文件路径。 ... [详细]
  • 本文提供了 RabbitMQ 3.7 的快速上手指南,详细介绍了环境搭建、生产者和消费者的配置与使用。通过官方教程的指引,读者可以轻松完成初步测试和实践,快速掌握 RabbitMQ 的核心功能和基本操作。 ... [详细]
  • 开发心得:利用 Redis 构建分布式系统的轻量级协调机制
    开发心得:利用 Redis 构建分布式系统的轻量级协调机制 ... [详细]
  • Redis 是一个高性能的开源键值存储系统,支持多种数据结构。本文将详细介绍 Redis 中的六种底层数据结构及其在对象系统中的应用,包括字符串对象、列表对象、哈希对象、集合对象和有序集合对象。通过12张图解,帮助读者全面理解 Redis 的数据结构和对象系统。 ... [详细]
  • 开发笔记:前端之前端初识
    开发笔记:前端之前端初识 ... [详细]
  • 在项目需要国际化处理时,即支持多种语言切换的功能,通常有两种方案:单个包和多个包。本文将重点讨论单个包的实现方法。 ... [详细]
  • Ping 命令的高级用法与技巧
    本文详细介绍了 Ping 命令的各种高级用法和技巧,帮助读者更好地理解和利用这一强大的网络诊断工具。 ... [详细]
  • 在JavaWeb开发中,文件上传是一个常见的需求。无论是通过表单还是其他方式上传文件,都必须使用POST请求。前端部分通常采用HTML表单来实现文件选择和提交功能。后端则利用Apache Commons FileUpload库来处理上传的文件,该库提供了强大的文件解析和存储能力,能够高效地处理各种文件类型。此外,为了提高系统的安全性和稳定性,还需要对上传文件的大小、格式等进行严格的校验和限制。 ... [详细]
  • 本文详细探讨了使用纯JavaScript开发经典贪吃蛇游戏的技术细节和实现方法。通过具体的代码示例,深入解析了游戏逻辑、动画效果及用户交互的实现过程,为开发者提供了宝贵的参考和实践经验。 ... [详细]
  • http:blog.csdn.netzeo112140articledetails7675195使用TCPdump工具,抓TCP数据包。将数据包上传到PC,通过Wireshark查 ... [详细]
  • 利用 Python Socket 实现 ICMP 协议下的网络通信
    在计算机网络课程的2.1实验中,学生需要通过Python Socket编程实现一种基于ICMP协议的网络通信功能。与操作系统自带的Ping命令类似,该实验要求学生开发一个简化的、非标准的ICMP通信程序,以加深对ICMP协议及其在网络通信中的应用的理解。通过这一实验,学生将掌握如何使用Python Socket库来构建和解析ICMP数据包,并实现基本的网络探测功能。 ... [详细]
  • 分享一款基于Java开发的经典贪吃蛇游戏实现
    本文介绍了一款使用Java语言开发的经典贪吃蛇游戏的实现。游戏主要由两个核心类组成:`GameFrame` 和 `GamePanel`。`GameFrame` 类负责设置游戏窗口的标题、关闭按钮以及是否允许调整窗口大小,并初始化数据模型以支持绘制操作。`GamePanel` 类则负责管理游戏中的蛇和苹果的逻辑与渲染,确保游戏的流畅运行和良好的用户体验。 ... [详细]
  • HTML 页面中调用 JavaScript 函数生成随机数值并自动展示
    在HTML页面中,通过调用JavaScript函数生成随机数值,并将其自动展示在页面上。具体实现包括构建HTML页面结构,定义JavaScript函数以生成随机数,以及在页面加载时自动调用该函数并将结果呈现给用户。 ... [详细]
  • 本章节在上一章的基础上,深入探讨了如何通过引入机器人实现自动聊天、表情包回应以及Adidas官方账号的自动抽签功能。具体介绍了使用wxpy库进行微信机器人的开发,优化了智能回复系统的性能和用户体验。通过详细的代码示例和实践操作,展示了如何实现这些高级功能,进一步提升了机器人的智能化水平。 ... [详细]
  • 2019年后蚂蚁集团与拼多多面试经验详述与深度剖析
    2019年后蚂蚁集团与拼多多面试经验详述与深度剖析 ... [详细]
author-avatar
UP7家族--婵婵_172
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有