热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

SpringBoot+RabbitMQ消息确认机制详解

本文详细介绍如何在SpringBoot项目中使用RabbitMQ的消息确认机制,包括消息发送确认和消息接收确认,帮助开发者解决在实际操作中可能遇到的问题。

本文详细介绍了如何在 Spring Boot 项目中使用 RabbitMQ 的消息确认机制,包括消息发送确认和消息接收确认。希望通过本文能够帮助开发者解决在实际操作中可能遇到的问题。

一、环境准备

首先,需要在项目中添加 Spring Boot AMQP 依赖:


  org.springframework.boot
  spring-boot-starter-amqp

然后,配置 RabbitMQ 连接信息和消息确认机制:

spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
# 开启发送者确认机制
spring.rabbitmq.publisher-cOnfirms=true
# 开启发送者返回机制
spring.rabbitmq.publisher-returns=true
# 设置消费端手动 ACK
spring.rabbitmq.listener.simple.acknowledge-mode=manual
# 是否支持重试
spring.rabbitmq.listener.simple.retry.enabled=true

定义交换机和队列,并将队列绑定到交换机上:

@Configuration
public class QueueConfig {

  @Bean(name = "confirmTestQueue")
  public Queue confirmTestQueue() {
    return new Queue("confirm_test_queue", true, false, false);
  }

  @Bean(name = "confirmTestExchange")
  public FanoutExchange confirmTestExchange() {
    return new FanoutExchange("confirmTestExchange");
  }

  @Bean
  public Binding confirmTestFanoutExchangeAndQueue(
      @Qualifier("confirmTestExchange") FanoutExchange confirmTestExchange,
      @Qualifier("confirmTestQueue") Queue confirmTestQueue) {
    return BindingBuilder.bind(confirmTestQueue).to(confirmTestExchange);
  }
}

RabbitMQ 的消息确认分为两部分:发送消息确认和消息接收确认。

Spring Boot + RabbitMQ 消息确认机制

二、消息发送确认

当消息被 RabbitMQ Broker 接收到时,会触发 ConfirmCallback 回调:

@Slf4j
@Component
public class ConfirmCallbackService implements RabbitTemplate.ConfirmCallback {

  @Override
  public void confirm(CorrelationData correlationData, boolean ack, String cause) {
    if (!ack) {
      log.error("消息发送异常!");
    } else {
      log.info("发送者已收到确认,correlatiOnData={}, ack={}, cause={}", correlationData.getId(), ack, cause);
    }
  }
}

实现 ConfirmCallback 接口,重写 confirm() 方法,参数包括 correlationData、ack 和 cause:

  • correlationData:包含消息的唯一标识 ID。
  • ack:消息是否成功投递到 Broker,true 表示成功。
  • cause:投递失败的原因。

如果消息未能投递到目标队列,将触发 ReturnCallback 回调:

@Slf4j
@Component
public class ReturnCallbackService implements RabbitTemplate.ReturnCallback {

  @Override
  public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
    log.info("returnedMessage ===> replyCode={}, replyText={}, exchange={}, routingKey={}", replyCode, replyText, exchange, routingKey);
  }
}

实现 ReturnCallback 接口,重写 returnedMessage() 方法,参数包括 message、replyCode、replyText、exchange 和 routingKey。

具体的消息发送代码如下:

@Autowired
private RabbitTemplate rabbitTemplate;

@Autowired
private ConfirmCallbackService confirmCallbackService;

@Autowired
private ReturnCallbackService returnCallbackService;

public void sendMessage(String exchange, String routingKey, Object msg) {
  rabbitTemplate.setMandatory(true);
  rabbitTemplate.setConfirmCallback(confirmCallbackService);
  rabbitTemplate.setReturnCallback(returnCallbackService);
  rabbitTemplate.convertAndSend(exchange, routingKey, msg,
      message -> {
        message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
        return message;
      },
      new CorrelationData(UUID.randomUUID().toString()));
}

三、消息接收确认

@Slf4j
@Component
@RabbitListener(queues = "confirm_test_queue")
public class ReceiverMessage1 {

  @RabbitHandler
  public void processHandler(String msg, Channel channel, Message message) throws IOException {
    try {
      log.info("收到消息:{}", msg);
      // 具体业务逻辑
      channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    } catch (Exception e) {
      if (message.getMessageProperties().getRedelivered()) {
        log.error("消息已重复处理失败,拒绝再次接收...");
        channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // 拒绝消息
      } else {
        log.error("消息即将再次返回队列处理...");
        channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
      }
    }
  }
}

消费消息有三种回执方法:

  • basicAck:表示成功确认,消息将被删除。
  • basicNack:表示失败确认,消息将重新入队列。
  • basicReject:拒绝消息,不能进行批量操作。

具体方法如下:

void basicAck(long deliveryTag, boolean multiple)
void basicNack(long deliveryTag, boolean multiple, boolean requeue)
void basicReject(long deliveryTag, boolean requeue)

四、测试

通过上述配置和代码,可以进行消息发送和接收的测试,验证消息确认机制的有效性。

五、常见问题及解决方案

开启消息确认机制时,消费消息别忘了 channel.basicAck,否则消息会一直存在,导致重复消费。

在处理业务逻辑时,如果发生异常,消息会被无限投递进队列,导致死循环。解决方案是先将消息进行应答,然后重新发送消息到队列尾部,保证消息不会丢失且正常业务可以继续进行。

channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
// 重新发送消息到队尾
channel.basicPublish(message.getMessageProperties().getReceivedExchange(),
    message.getMessageProperties().getReceivedRoutingKey(), MessageProperties.PERSISTENT_TEXT_PLAIN,
    JSON.toJSONBytes(msg));

为了进一步优化,可以设置消息重试次数,达到重试上限后手动确认并删除消息,将消息持久化到 MySQL 并推送报警,进行人工处理和定时任务补偿。

如何保证 MQ 的消费是幂等性的,需要根据具体业务而定,可以通过 MySQL 或 Redis 将消息持久化,并通过消息中的唯一性属性进行校验。

至此,关于如何使用 Spring Boot + RabbitMQ 消息确认机制的学习就结束了,希望能够解决大家的疑惑。理论与实践相结合,能更好地帮助大家掌握这一技能,快去试试吧!若想继续学习更多相关知识,请继续关注编程笔记网站,小编会继续努力为大家带来更多实用的文章!


推荐阅读
  • 2023年京东Android面试真题解析与经验分享
    本文由一位拥有6年Android开发经验的工程师撰写,详细解析了京东面试中常见的技术问题。涵盖引用传递、Handler机制、ListView优化、多线程控制及ANR处理等核心知识点。 ... [详细]
  • 本文介绍如何使用阿里云的fastjson库解析包含时间戳、IP地址和参数等信息的JSON格式文本,并进行数据处理和保存。 ... [详细]
  • 深入解析TCP/IP五层协议
    本文详细介绍了TCP/IP五层协议模型,包括物理层、数据链路层、网络层、传输层和应用层。每层的功能及其相互关系将被逐一解释,帮助读者理解互联网通信的原理。此外,还特别讨论了UDP和TCP协议的特点以及三次握手、四次挥手的过程。 ... [详细]
  • 深入解析Android自定义View面试题
    本文探讨了Android Launcher开发中自定义View的重要性,并通过一道经典的面试题,帮助开发者更好地理解自定义View的实现细节。文章不仅涵盖了基础知识,还提供了实际操作建议。 ... [详细]
  • 优化ListView性能
    本文深入探讨了如何通过多种技术手段优化ListView的性能,包括视图复用、ViewHolder模式、分批加载数据、图片优化及内存管理等。这些方法能够显著提升应用的响应速度和用户体验。 ... [详细]
  • Explore how Matterverse is redefining the metaverse experience, creating immersive and meaningful virtual environments that foster genuine connections and economic opportunities. ... [详细]
  • Splay Tree 区间操作优化
    本文详细介绍了使用Splay Tree进行区间操作的实现方法,包括插入、删除、修改、翻转和求和等操作。通过这些操作,可以高效地处理动态序列问题,并且代码实现具有一定的挑战性,有助于编程能力的提升。 ... [详细]
  • 基于KVM的SRIOV直通配置及性能测试
    SRIOV介绍、VF直通配置,以及包转发率性能测试小慢哥的原创文章,欢迎转载目录?1.SRIOV介绍?2.环境说明?3.开启SRIOV?4.生成VF?5.VF ... [详细]
  • 本文介绍如何使用 Python 获取文件和图片的创建、修改及拍摄日期。通过多种方法,如 PIL 库的 _getexif() 函数和 os 模块的 getmtime() 和 stat() 方法,详细讲解了这些技术的应用场景和注意事项。 ... [详细]
  • 深入理解Redis的数据结构与对象系统
    本文详细探讨了Redis中的数据结构和对象系统的实现,包括字符串、列表、集合、哈希表和有序集合等五种核心对象类型,以及它们所使用的底层数据结构。通过分析源码和相关文献,帮助读者更好地理解Redis的设计原理。 ... [详细]
  • 不确定性|放入_华为机试题 HJ9提取不重复的整数
    不确定性|放入_华为机试题 HJ9提取不重复的整数 ... [详细]
  • 分享一个简化版的Silverlight链接图项目:Link Map Simplified
    本文介绍了一个使用Silverlight开发的可视化工具,主要用于展示和操作复杂的实体关系图(Graph)。该工具在犯罪调查系统中得到了广泛应用,帮助用户直观地获取和理解相关信息。 ... [详细]
  • 本文详细介绍了网络存储技术的基本概念、分类及应用场景。通过分析直连式存储(DAS)、网络附加存储(NAS)和存储区域网络(SAN)的特点,帮助读者理解不同存储方式的优势与局限性。 ... [详细]
  • 配置多VLAN环境下的透明SQUID代理
    本文介绍如何在包含多个VLAN的网络环境中配置SQUID作为透明网关。网络拓扑包括Cisco 3750交换机、PANABIT防火墙和SQUID服务器,所有设备均部署在ESXi虚拟化平台上。 ... [详细]
  • 采用IKE方式建立IPsec安全隧道
    一、【组网和实验环境】按如上的接口ip先作配置,再作ipsec的相关配置,配置文本见文章最后本文实验采用的交换机是H3C模拟器,下载地址如 ... [详细]
author-avatar
UP7家族--婵婵_172
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有