热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

SpringBoot+RabbitMQ消息确认机制详解

本文详细介绍如何在SpringBoot项目中使用RabbitMQ的消息确认机制,包括消息发送确认和消息接收确认,帮助开发者解决在实际操作中可能遇到的问题。

本文详细介绍了如何在 Spring Boot 项目中使用 RabbitMQ 的消息确认机制,包括消息发送确认和消息接收确认。希望通过本文能够帮助开发者解决在实际操作中可能遇到的问题。

一、环境准备

首先,需要在项目中添加 Spring Boot AMQP 依赖:


  org.springframework.boot
  spring-boot-starter-amqp

然后,配置 RabbitMQ 连接信息和消息确认机制:

spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
# 开启发送者确认机制
spring.rabbitmq.publisher-cOnfirms=true
# 开启发送者返回机制
spring.rabbitmq.publisher-returns=true
# 设置消费端手动 ACK
spring.rabbitmq.listener.simple.acknowledge-mode=manual
# 是否支持重试
spring.rabbitmq.listener.simple.retry.enabled=true

定义交换机和队列,并将队列绑定到交换机上:

@Configuration
public class QueueConfig {

  @Bean(name = "confirmTestQueue")
  public Queue confirmTestQueue() {
    return new Queue("confirm_test_queue", true, false, false);
  }

  @Bean(name = "confirmTestExchange")
  public FanoutExchange confirmTestExchange() {
    return new FanoutExchange("confirmTestExchange");
  }

  @Bean
  public Binding confirmTestFanoutExchangeAndQueue(
      @Qualifier("confirmTestExchange") FanoutExchange confirmTestExchange,
      @Qualifier("confirmTestQueue") Queue confirmTestQueue) {
    return BindingBuilder.bind(confirmTestQueue).to(confirmTestExchange);
  }
}

RabbitMQ 的消息确认分为两部分:发送消息确认和消息接收确认。

Spring Boot + RabbitMQ 消息确认机制

二、消息发送确认

当消息被 RabbitMQ Broker 接收到时,会触发 ConfirmCallback 回调:

@Slf4j
@Component
public class ConfirmCallbackService implements RabbitTemplate.ConfirmCallback {

  @Override
  public void confirm(CorrelationData correlationData, boolean ack, String cause) {
    if (!ack) {
      log.error("消息发送异常!");
    } else {
      log.info("发送者已收到确认,correlatiOnData={}, ack={}, cause={}", correlationData.getId(), ack, cause);
    }
  }
}

实现 ConfirmCallback 接口,重写 confirm() 方法,参数包括 correlationData、ack 和 cause:

  • correlationData:包含消息的唯一标识 ID。
  • ack:消息是否成功投递到 Broker,true 表示成功。
  • cause:投递失败的原因。

如果消息未能投递到目标队列,将触发 ReturnCallback 回调:

@Slf4j
@Component
public class ReturnCallbackService implements RabbitTemplate.ReturnCallback {

  @Override
  public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
    log.info("returnedMessage ===> replyCode={}, replyText={}, exchange={}, routingKey={}", replyCode, replyText, exchange, routingKey);
  }
}

实现 ReturnCallback 接口,重写 returnedMessage() 方法,参数包括 message、replyCode、replyText、exchange 和 routingKey。

具体的消息发送代码如下:

@Autowired
private RabbitTemplate rabbitTemplate;

@Autowired
private ConfirmCallbackService confirmCallbackService;

@Autowired
private ReturnCallbackService returnCallbackService;

public void sendMessage(String exchange, String routingKey, Object msg) {
  rabbitTemplate.setMandatory(true);
  rabbitTemplate.setConfirmCallback(confirmCallbackService);
  rabbitTemplate.setReturnCallback(returnCallbackService);
  rabbitTemplate.convertAndSend(exchange, routingKey, msg,
      message -> {
        message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
        return message;
      },
      new CorrelationData(UUID.randomUUID().toString()));
}

三、消息接收确认

@Slf4j
@Component
@RabbitListener(queues = "confirm_test_queue")
public class ReceiverMessage1 {

  @RabbitHandler
  public void processHandler(String msg, Channel channel, Message message) throws IOException {
    try {
      log.info("收到消息:{}", msg);
      // 具体业务逻辑
      channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    } catch (Exception e) {
      if (message.getMessageProperties().getRedelivered()) {
        log.error("消息已重复处理失败,拒绝再次接收...");
        channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // 拒绝消息
      } else {
        log.error("消息即将再次返回队列处理...");
        channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
      }
    }
  }
}

消费消息有三种回执方法:

  • basicAck:表示成功确认,消息将被删除。
  • basicNack:表示失败确认,消息将重新入队列。
  • basicReject:拒绝消息,不能进行批量操作。

具体方法如下:

void basicAck(long deliveryTag, boolean multiple)
void basicNack(long deliveryTag, boolean multiple, boolean requeue)
void basicReject(long deliveryTag, boolean requeue)

四、测试

通过上述配置和代码,可以进行消息发送和接收的测试,验证消息确认机制的有效性。

五、常见问题及解决方案

开启消息确认机制时,消费消息别忘了 channel.basicAck,否则消息会一直存在,导致重复消费。

在处理业务逻辑时,如果发生异常,消息会被无限投递进队列,导致死循环。解决方案是先将消息进行应答,然后重新发送消息到队列尾部,保证消息不会丢失且正常业务可以继续进行。

channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
// 重新发送消息到队尾
channel.basicPublish(message.getMessageProperties().getReceivedExchange(),
    message.getMessageProperties().getReceivedRoutingKey(), MessageProperties.PERSISTENT_TEXT_PLAIN,
    JSON.toJSONBytes(msg));

为了进一步优化,可以设置消息重试次数,达到重试上限后手动确认并删除消息,将消息持久化到 MySQL 并推送报警,进行人工处理和定时任务补偿。

如何保证 MQ 的消费是幂等性的,需要根据具体业务而定,可以通过 MySQL 或 Redis 将消息持久化,并通过消息中的唯一性属性进行校验。

至此,关于如何使用 Spring Boot + RabbitMQ 消息确认机制的学习就结束了,希望能够解决大家的疑惑。理论与实践相结合,能更好地帮助大家掌握这一技能,快去试试吧!若想继续学习更多相关知识,请继续关注编程笔记网站,小编会继续努力为大家带来更多实用的文章!


推荐阅读
  • 本文将介绍如何编写一些有趣的VBScript脚本,这些脚本可以在朋友之间进行无害的恶作剧。通过简单的代码示例,帮助您了解VBScript的基本语法和功能。 ... [详细]
  • 本文详细介绍了Akka中的BackoffSupervisor机制,探讨其在处理持久化失败和Actor重启时的应用。通过具体示例,展示了如何配置和使用BackoffSupervisor以实现更细粒度的异常处理。 ... [详细]
  • 1.如何在运行状态查看源代码?查看函数的源代码,我们通常会使用IDE来完成。比如在PyCharm中,你可以Ctrl+鼠标点击进入函数的源代码。那如果没有IDE呢?当我们想使用一个函 ... [详细]
  • 本文详细介绍了 Dockerfile 的编写方法及其在网络配置中的应用,涵盖基础指令、镜像构建与发布流程,并深入探讨了 Docker 的默认网络、容器互联及自定义网络的实现。 ... [详细]
  • 本文详细介绍了Java中org.eclipse.ui.forms.widgets.ExpandableComposite类的addExpansionListener()方法,并提供了多个实际代码示例,帮助开发者更好地理解和使用该方法。这些示例来源于多个知名开源项目,具有很高的参考价值。 ... [详细]
  • 在前两篇文章中,我们探讨了 ControllerDescriptor 和 ActionDescriptor 这两个描述对象,分别对应控制器和操作方法。本文将基于 MVC3 源码进一步分析 ParameterDescriptor,即用于描述 Action 方法参数的对象,并详细介绍其工作原理。 ... [详细]
  • 本文深入探讨了 Java 中的 Serializable 接口,解释了其实现机制、用途及注意事项,帮助开发者更好地理解和使用序列化功能。 ... [详细]
  • DNN Community 和 Professional 版本的主要差异
    本文详细解析了 DotNetNuke (DNN) 的两种主要版本:Community 和 Professional。通过对比两者的功能和附加组件,帮助用户选择最适合其需求的版本。 ... [详细]
  • XNA 3.0 游戏编程:从 XML 文件加载数据
    本文介绍如何在 XNA 3.0 游戏项目中从 XML 文件加载数据。我们将探讨如何将 XML 数据序列化为二进制文件,并通过内容管道加载到游戏中。此外,还会涉及自定义类型读取器和写入器的实现。 ... [详细]
  • 本文详细介绍了如何在Linux系统上安装和配置Smokeping,以实现对网络链路质量的实时监控。通过详细的步骤和必要的依赖包安装,确保用户能够顺利完成部署并优化其网络性能监控。 ... [详细]
  • c# – UWP:BrightnessOverride StartOverride逻辑 ... [详细]
  • 本文详细介绍了如何使用 Yii2 的 GridView 组件在列表页面实现数据的直接编辑功能。通过具体的代码示例和步骤,帮助开发者快速掌握这一实用技巧。 ... [详细]
  • 使用 Azure Service Principal 和 Microsoft Graph API 获取 AAD 用户列表
    本文介绍了一段通用代码示例,该代码不仅能够操作 Azure Active Directory (AAD),还可以通过 Azure Service Principal 的授权访问和管理 Azure 订阅资源。Azure 的架构可以分为两个层级:AAD 和 Subscription。 ... [详细]
  • 深入解析Spring Cloud Ribbon负载均衡机制
    本文详细介绍了Spring Cloud中的Ribbon组件如何实现服务调用的负载均衡。通过分析其工作原理、源码结构及配置方式,帮助读者理解Ribbon在分布式系统中的重要作用。 ... [详细]
  • Android 渐变圆环加载控件实现
    本文介绍了如何在 Android 中创建一个自定义的渐变圆环加载控件,该控件已在多个知名应用中使用。我们将详细探讨其工作原理和实现方法。 ... [详细]
author-avatar
UP7家族--婵婵_172
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有