热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

【SpringBoot】60、SpringBoot中整合RabbitMQ实现延时队列(死信队列篇)

延时插件实现篇,参考文章:【SpringBoot】43、SpringBoot中整合RabbitMQ实现延时队列(延时插件篇)

延时插件实现篇,参考文章:【SpringBoot】43、SpringBoot中整合RabbitMQ实现延时队列(延时插件篇)

一、实现原理


  • 1、什么是死信队列

死信队列:DLX,dead-letter-exchange
利用DLX,当消息在一个队列中变成死信 (dead message) 之后,它能被重新publish到另一个Exchange,这个Exchange就是DLX


  • 2、消息变成死信有以下几种情况

消息被拒绝(basic.reject / basic.nack),并且requeue = false
消息TTL过期
队列达到最大长度


  • 3、死信处理过程

DLX也是一个正常的Exchange,和一般的Exchange没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性。
当这个队列中有死信时,RabbitMQ就会自动的将这个消息重新发布到设置的Exchange上去,进而被路由到另一个队列。
可以监听这个队列中的消息做相应的处理。


  • 4、死信队列实现延时消息原理

我们将消息发送到消息队列中,并设置一个过期时间,该队列没有消费者
消息的过期时间到了之后,由于没有消费者,就会进入死信队列
我们用一个消费者接收死信队列的消息,就能达到延迟消息的目的


二、实现过程


  • 1、引入 maven 依赖

<!-- rabbitmq消息队列 -->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

  • 2、配置 MQ 连接信息

spring:rabbitmq:host: 127.0.0.1port: 5672username: guestpassword: guest

  • 3、创建队列&#xff0c;并绑定交换机

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;&#64;Configuration
public class RabbitMqConfig {/*** 死信队列*/public static final String DLX_QUEUE &#61; "dlx_queue";public static final String DLX_EXCHANGE &#61; "dlx_exchange";public static final String DLX_ROUTING_KEY &#61; "dlx_routing_key";/*** 正常队列*/public static final String MSG_QUEUE &#61; "msg_queue";public static final String MSG_EXCHANGE &#61; "msg_exchange";public static final String MSG_ROUTING_KEY &#61; "msg_routing_key";/*** 死信队列** &#64;return*/&#64;BeanQueue dlxQueue() {return new Queue(DLX_QUEUE, true, false, false);}/*** 死信交换机** &#64;return*/&#64;BeanDirectExchange dlxExchange() {return new DirectExchange(DLX_EXCHANGE, true, false);}/*** 绑定死信队列和死信交换机** &#64;return*/&#64;BeanBinding dlxBinding() {return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with(DLX_ROUTING_KEY);}/*** 普通消息队列** &#64;return*/&#64;BeanQueue msgQueue() {Map<String, Object> args &#61; new HashMap<>();//设置消息过期时间args.put("x-message-ttl", 1000 * 10);//设置死信交换机args.put("x-dead-letter-exchange", DLX_EXCHANGE);//设置死信 routing_keyargs.put("x-dead-letter-routing-key", DLX_ROUTING_KEY);return new Queue(MSG_QUEUE, true, false, false, args);}/*** 普通交换机** &#64;return*/&#64;BeanDirectExchange msgExchange() {return new DirectExchange(MSG_EXCHANGE, true, false);}/*** 绑定普通队列和与之对应的交换机** &#64;return*/&#64;BeanBinding msgBinding() {return BindingBuilder.bind(msgQueue()).to(msgExchange()).with(MSG_ROUTING_KEY);}
}

分别创建了私信队列和普通消息队列&#xff0c;普通消息队列设置了消息过期时间为 10s&#xff0c;设置了死信交换机、死信 routing_key&#xff0c;消息过期之后就能进入了死信队列中。

  • 4、死信队列消费者

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;&#64;Slf4j
&#64;Component
public class DlxConsumer {&#64;RabbitListener(queues &#61; RabbitMqConfig.DLX_QUEUE)public void handle(String message) {log.info("------------------收到消息&#xff1a;" &#43; message);}
}

  • 5、普通队列消息生产者

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;&#64;Slf4j
&#64;Component
public class MsgProducer {&#64;Autowiredprivate RabbitTemplate rabbitTemplate;public void sendMsg(String message) {try {rabbitTemplate.convertAndSend(RabbitMqConfig.MSG_EXCHANGE, RabbitMqConfig.MSG_ROUTING_KEY, message);log.info("-------------------消息发送成功");} catch (Exception e) {log.error("-------------------消息发送失败");}}
}

三、测试


  • 1、测试用例

import com.asurplus.common.rabbitmq.MsgProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;&#64;RestController
public class TestController {&#64;Autowiredprivate MsgProducer msgProducer;&#64;GetMapping("test")public String test() {msgProducer.sendMsg("今天天气好好");return "success";}
}

  • 2、发送消息

访问接口&#xff1a;

localhost:8080/test

在这里插入图片描述
消息发送成功&#xff0c;10s 后&#xff1a;
在这里插入图片描述
收到了消息&#xff0c;证明我的延时消息已经成功

如您在阅读中发现不足&#xff0c;欢迎留言&#xff01;&#xff01;&#xff01;


推荐阅读
  • 本文提供了 RabbitMQ 3.7 的快速上手指南,详细介绍了环境搭建、生产者和消费者的配置与使用。通过官方教程的指引,读者可以轻松完成初步测试和实践,快速掌握 RabbitMQ 的核心功能和基本操作。 ... [详细]
  • 深入解析JWT的实现与应用
    本文深入探讨了JSON Web Token (JWT) 的实现机制及其应用场景。JWT 是一种基于 RFC 7519 标准的开放性认证协议,用于在各方之间安全地传输信息。文章详细分析了 JWT 的结构、生成和验证过程,并讨论了其在现代 Web 应用中的实际应用案例,为开发者提供了全面的理解和实践指导。 ... [详细]
  • 本项目在Java Maven框架下,利用POI库实现了Excel数据的高效导入与导出功能。通过优化数据处理流程,提升了数据操作的性能和稳定性。项目已发布至GitHub,当前最新版本为0.0.5。该项目不仅适用于小型应用,也可扩展用于大型企业级系统,提供了灵活的数据管理解决方案。GitHub地址:https://github.com/83945105/holygrail,Maven坐标:`com.github.83945105:holygrail:0.0.5`。 ... [详细]
  • Java 8 引入了 Stream API,这一新特性极大地增强了集合数据的处理能力。通过 Stream API,开发者可以更加高效、简洁地进行集合数据的遍历、过滤和转换操作。本文将详细解析 Stream API 的核心概念和常见用法,帮助读者更好地理解和应用这一强大的工具。 ... [详细]
  • 本文将深入探讨Java编程语言中顶级类`Object`的源码实现,旨在为Java新手提供进阶指导。`Object`类是所有Java类的基类,了解其内部机制对于提升编程技能至关重要。文章首先介绍了API文档的使用方法,这对于有开发经验的Java程序员来说是不可或缺的工具。通过详细解析`Object`类的关键方法和属性,读者可以更好地理解Java的核心原理和设计思想。此外,文章还提供了实际代码示例,帮助读者在实践中掌握这些知识。 ... [详细]
  • 如何使用 net.sf.extjwnl.data.Word 类及其代码示例详解 ... [详细]
  • Ceph API微服务实现RBD块设备的高效创建与安全删除
    本文旨在实现Ceph块存储中RBD块设备的高效创建与安全删除功能。开发环境为CentOS 7,使用 IntelliJ IDEA 进行开发。首先介绍了 librbd 的基本概念及其在 Ceph 中的作用,随后详细描述了项目 Gradle 配置的优化过程,确保了开发环境的稳定性和兼容性。通过这一系列步骤,我们成功实现了 RBD 块设备的快速创建与安全删除,提升了系统的整体性能和可靠性。 ... [详细]
  • 在处理高并发场景时,确保业务逻辑的正确性是关键。本文深入探讨了Java原生锁机制的多种细粒度实现方法,旨在通过使用数据的时间戳、ID等关键字段进行锁定,以最小化对系统性能的影响。文章详细分析了不同锁策略的优缺点,并提供了实际应用中的最佳实践,帮助开发者在高并发环境下高效地实现锁机制。 ... [详细]
  • Liferay Portal 中 AutoEscape 构造函数的应用与实例代码解析 ... [详细]
  • 在Spring框架中,基于Schema的异常通知与环绕通知的实现方法具有重要的实践价值。首先,对于异常通知,需要创建一个实现ThrowsAdvice接口的通知类。尽管ThrowsAdvice接口本身不包含任何方法,但开发者需自定义方法来处理异常情况。此外,环绕通知则通过实现MethodInterceptor接口来实现,允许在方法调用前后执行特定逻辑,从而增强功能或进行必要的控制。这两种通知机制的结合使用,能够有效提升应用程序的健壮性和灵活性。 ... [详细]
  • 深入解析零拷贝技术(Zerocopy)及其应用优势
    零拷贝技术(Zero-copy)是Netty框架中的一个关键特性,其核心在于减少数据在操作系统内核与用户空间之间的传输次数。通过避免不必要的内存复制操作,零拷贝显著提高了数据传输的效率和性能。本文将深入探讨零拷贝的工作原理及其在实际应用中的优势,包括降低CPU负载、减少内存带宽消耗以及提高系统吞吐量等方面。 ... [详细]
  • 深入解析十大经典排序算法:动画演示、原理分析与代码实现
    本文深入探讨了十种经典的排序算法,不仅通过动画直观展示了每种算法的运行过程,还详细解析了其背后的原理与机制,并提供了相应的代码实现,帮助读者全面理解和掌握这些算法的核心要点。 ... [详细]
  • 本文介绍了一种使用HashSet迭代器的方法来验证字符串集合中的元素。具体而言,通过迭代器遍历名为“名称”的字符串哈希集,并删除其中包含任何非大写字母的项。该方法能够高效地过滤出不符合条件的字符串,确保集合中仅保留纯大写字母的条目。 ... [详细]
  • 本研究基于状态空间方法,通过动态可视化技术实现了汉诺塔问题的求解过程,即将n个盘子从A柱移动到C柱。本文提供了一个使用C语言在控制台进行动画绘制的示例,并详细注释了程序逻辑,以帮助读者更好地理解和学习该算法。 ... [详细]
  • 本文深入探讨了Java枚举类型的使用与实践,详细解析了枚举的基本用法及其在实际开发中的应用。首先介绍了枚举作为常量的替代方案,自JDK 1.5起,通过枚举可以更加简洁、安全地定义常量,避免了传统方式中可能出现的错误。此外,文章还探讨了枚举在实现单例模式、状态机等场景中的优势,并提供了多个实际案例,帮助开发者更好地理解和运用这一强大的语言特性。 ... [详细]
author-avatar
注定的等待幸福的到来
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有