热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

SpringBoot+RabbitMQ消息确认机制详解

本文详细介绍如何在SpringBoot项目中使用RabbitMQ的消息确认机制,包括消息发送确认和消息接收确认,帮助开发者解决在实际操作中可能遇到的问题。

本文详细介绍了如何在 Spring Boot 项目中使用 RabbitMQ 的消息确认机制,包括消息发送确认和消息接收确认。希望通过本文能够帮助开发者解决在实际操作中可能遇到的问题。

一、环境准备

首先,需要在项目中添加 Spring Boot AMQP 依赖:


  org.springframework.boot
  spring-boot-starter-amqp

然后,配置 RabbitMQ 连接信息和消息确认机制:

spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
# 开启发送者确认机制
spring.rabbitmq.publisher-cOnfirms=true
# 开启发送者返回机制
spring.rabbitmq.publisher-returns=true
# 设置消费端手动 ACK
spring.rabbitmq.listener.simple.acknowledge-mode=manual
# 是否支持重试
spring.rabbitmq.listener.simple.retry.enabled=true

定义交换机和队列,并将队列绑定到交换机上:

@Configuration
public class QueueConfig {

  @Bean(name = "confirmTestQueue")
  public Queue confirmTestQueue() {
    return new Queue("confirm_test_queue", true, false, false);
  }

  @Bean(name = "confirmTestExchange")
  public FanoutExchange confirmTestExchange() {
    return new FanoutExchange("confirmTestExchange");
  }

  @Bean
  public Binding confirmTestFanoutExchangeAndQueue(
      @Qualifier("confirmTestExchange") FanoutExchange confirmTestExchange,
      @Qualifier("confirmTestQueue") Queue confirmTestQueue) {
    return BindingBuilder.bind(confirmTestQueue).to(confirmTestExchange);
  }
}

RabbitMQ 的消息确认分为两部分:发送消息确认和消息接收确认。

Spring Boot + RabbitMQ 消息确认机制

二、消息发送确认

当消息被 RabbitMQ Broker 接收到时,会触发 ConfirmCallback 回调:

@Slf4j
@Component
public class ConfirmCallbackService implements RabbitTemplate.ConfirmCallback {

  @Override
  public void confirm(CorrelationData correlationData, boolean ack, String cause) {
    if (!ack) {
      log.error("消息发送异常!");
    } else {
      log.info("发送者已收到确认,correlatiOnData={}, ack={}, cause={}", correlationData.getId(), ack, cause);
    }
  }
}

实现 ConfirmCallback 接口,重写 confirm() 方法,参数包括 correlationData、ack 和 cause:

  • correlationData:包含消息的唯一标识 ID。
  • ack:消息是否成功投递到 Broker,true 表示成功。
  • cause:投递失败的原因。

如果消息未能投递到目标队列,将触发 ReturnCallback 回调:

@Slf4j
@Component
public class ReturnCallbackService implements RabbitTemplate.ReturnCallback {

  @Override
  public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
    log.info("returnedMessage ===> replyCode={}, replyText={}, exchange={}, routingKey={}", replyCode, replyText, exchange, routingKey);
  }
}

实现 ReturnCallback 接口,重写 returnedMessage() 方法,参数包括 message、replyCode、replyText、exchange 和 routingKey。

具体的消息发送代码如下:

@Autowired
private RabbitTemplate rabbitTemplate;

@Autowired
private ConfirmCallbackService confirmCallbackService;

@Autowired
private ReturnCallbackService returnCallbackService;

public void sendMessage(String exchange, String routingKey, Object msg) {
  rabbitTemplate.setMandatory(true);
  rabbitTemplate.setConfirmCallback(confirmCallbackService);
  rabbitTemplate.setReturnCallback(returnCallbackService);
  rabbitTemplate.convertAndSend(exchange, routingKey, msg,
      message -> {
        message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
        return message;
      },
      new CorrelationData(UUID.randomUUID().toString()));
}

三、消息接收确认

@Slf4j
@Component
@RabbitListener(queues = "confirm_test_queue")
public class ReceiverMessage1 {

  @RabbitHandler
  public void processHandler(String msg, Channel channel, Message message) throws IOException {
    try {
      log.info("收到消息:{}", msg);
      // 具体业务逻辑
      channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    } catch (Exception e) {
      if (message.getMessageProperties().getRedelivered()) {
        log.error("消息已重复处理失败,拒绝再次接收...");
        channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // 拒绝消息
      } else {
        log.error("消息即将再次返回队列处理...");
        channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
      }
    }
  }
}

消费消息有三种回执方法:

  • basicAck:表示成功确认,消息将被删除。
  • basicNack:表示失败确认,消息将重新入队列。
  • basicReject:拒绝消息,不能进行批量操作。

具体方法如下:

void basicAck(long deliveryTag, boolean multiple)
void basicNack(long deliveryTag, boolean multiple, boolean requeue)
void basicReject(long deliveryTag, boolean requeue)

四、测试

通过上述配置和代码,可以进行消息发送和接收的测试,验证消息确认机制的有效性。

五、常见问题及解决方案

开启消息确认机制时,消费消息别忘了 channel.basicAck,否则消息会一直存在,导致重复消费。

在处理业务逻辑时,如果发生异常,消息会被无限投递进队列,导致死循环。解决方案是先将消息进行应答,然后重新发送消息到队列尾部,保证消息不会丢失且正常业务可以继续进行。

channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
// 重新发送消息到队尾
channel.basicPublish(message.getMessageProperties().getReceivedExchange(),
    message.getMessageProperties().getReceivedRoutingKey(), MessageProperties.PERSISTENT_TEXT_PLAIN,
    JSON.toJSONBytes(msg));

为了进一步优化,可以设置消息重试次数,达到重试上限后手动确认并删除消息,将消息持久化到 MySQL 并推送报警,进行人工处理和定时任务补偿。

如何保证 MQ 的消费是幂等性的,需要根据具体业务而定,可以通过 MySQL 或 Redis 将消息持久化,并通过消息中的唯一性属性进行校验。

至此,关于如何使用 Spring Boot + RabbitMQ 消息确认机制的学习就结束了,希望能够解决大家的疑惑。理论与实践相结合,能更好地帮助大家掌握这一技能,快去试试吧!若想继续学习更多相关知识,请继续关注编程笔记网站,小编会继续努力为大家带来更多实用的文章!


推荐阅读
  • Python数据类型6 字典
    字典Python的字典数据类型是基于hash散列算法实现的,采用键值对(key:value)的形式,根据key的值计算value的地址,具有非常快的查取和插入速度。但它是无序的,包 ... [详细]
  • 利用RabbitMQ实现高效延迟任务处理
    本文详细探讨了如何利用RabbitMQ实现延迟任务,包括其应用场景、实现原理、系统设计以及具体的Spring Boot实现方式。 ... [详细]
  • Node.js 中 GET 和 POST 请求的数据处理
    本文详细介绍了如何在 Node.js 中使用 GET 和 POST 方法来处理客户端发送的数据。通过示例代码展示了如何解析 URL 参数和表单数据,并提供了完整的实现步骤。 ... [详细]
  • 前文|功能型_品读鸿蒙HDF架构
    前文|功能型_品读鸿蒙HDF架构 ... [详细]
  • 本文介绍了两种使用Java发送短信的方法:利用第三方平台的HTTP请求和通过硬件设备短信猫。重点讲解了如何通过Java代码配置和使用短信猫发送短信的过程,包括必要的编码转换、串口操作及短信发送的核心逻辑。 ... [详细]
  • 本文详细探讨了JavaScript中的闭包与柯里化技术,这两者是函数式编程的重要组成部分,对提升代码的灵活性和可维护性具有重要作用。 ... [详细]
  • 本文详细介绍了在使用EmguCV进行图像处理时常用的函数及其应用场景,旨在帮助开发者更好地理解和利用这些工具。 ... [详细]
  • 本文详细介绍了如何使用Python中的xlwt库将数据库中的数据导出至Excel文件,适合初学者和中级开发者参考。 ... [详细]
  • 前言无论是对于刚入行工作还是已经工作几年的java开发者来说,面试求职始终是你需要直面的一件事情。首先梳理自己的知识体系,针对性准备,会有事半功倍的效果。我们往往会把重点放在技术上 ... [详细]
  • 本文介绍了如何计算给定数组中所有非质数元素的总和,并提供了多种编程语言的实现示例。 ... [详细]
  • 本文探讨了Java中char数据类型的特点,包括其表示范围以及如何处理超出16位字符限制的情况。通过引入代码点和代码单元的概念,详细解释了Java处理增补字符的方法。 ... [详细]
  • 在DELL Inspiron 14R上部署CentOS X64 6.4的详细步骤
    本文详细记录了在DELL Inspiron 14R笔记本电脑上安装CentOS X64 6.4操作系统的过程,包括遇到的问题及解决方法。 ... [详细]
  • 本文分析了一个基于ASP代码改编的PHP MD5加密函数,指出其存在的问题,并提供了解决方案。通过对比ASP和PHP在处理相同数据时的不同表现,探讨了两种语言在实现MD5算法上的细微差别。 ... [详细]
  • 计算机架构基础 —— 冯·诺依曼模型
    本文探讨了计算机科学的基础——冯·诺依曼体系结构,介绍了其核心概念、发展历程及面临的挑战。内容涵盖早期计算机的发展、图灵机的概念、穿孔卡的应用、香农定理的重要性以及冯·诺依曼体系结构的具体实现与当前存在的瓶颈。 ... [详细]
  • 本文详细介绍了Java库中`com.ait.tooling.nativetools.client.collection.NFastArrayList`类的构造函数`()`的使用方法,并提供了多个实际应用中的代码示例,帮助开发者更好地理解和使用这一高效的数据结构。 ... [详细]
author-avatar
UP7家族--婵婵_172
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有