热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

RabbitMQDLX(DeadLetterExchanges)实现延迟队列

一、延迟队列设计队列中的消息成为死信(DeadLetter)的几种情况:1)Themessageisrejected(basic.rejectorbasic.na
一、延迟队列设计

队列中的消息成为死信(Dead Letter)的几种情况:

1) The message is rejected (basic.reject or basic.nack) with requeue=false,

2) The TTL for the message expires; or

3) The queue length limit is exceeded.

而通过配置队列的x-dead-letter-exchange及x-dead-letter-routing-key键值,Dead Letter就会被重新发送到指定的DLX中。

 


如上图所示,注意延迟队列并没有消费者,并且设置参数:

x-dead-letter-exchange:DLX

x-dead-letter-routing-key:DLK

x-max-length:1000000 // length = 1million

x-message-ttl:30000 // TTL = 30s

生产者发布的消息路由到该延迟队列,30秒后消息成为死信并被重新发送到死信交换器DLX中;消费者订阅死信队列DLQ,消费消息。

二、
延迟队列实现


  1. @Configuration
  2. @PropertySource("classpath:rabbitmq-cfg.properties")
  3. public class RabbitConfig {
  4. /**
  5. * 配置RabbitMQ连接工厂
  6. *
  7. * @param host
  8. * @param port
  9. * @param username
  10. * @param password
  11. * @param virtualHost
  12. * @param connectionTimeout
  13. * @return
  14. */
  15. @Bean
  16. public ConnectionFactory connectionFactory(@Value("${spring.rabbitmq.host}") String host,
  17. @Value("${spring.rabbitmq.port}") String port, @Value("${spring.rabbitmq.username}") String username,
  18. @Value("${spring.rabbitmq.password}") String password,
  19. @Value("${spring.rabbitmq.virtual-host}") String virtualHost,
  20. @Value("${spring.rabbitmq.connection-timeout}") String connectionTimeout) {
  21. CachingConnectionFactory ret = new CachingConnectionFactory();
  22. ret.setHost(host);
  23. ret.setPort(Integer.parseInt(port));
  24. ret.setUsername(username);
  25. ret.setPassword(password);
  26. ret.setVirtualHost(virtualHost);
  27. ret.setConnectionTimeout(Integer.parseInt(connectionTimeout));
  28. return ret;
  29. }

  30. /**
  31. * 声明30s延迟队列
  32. *
  33. * @param name
  34. * @return
  35. */
  36. @Bean
  37. public Queue queueDelay30s(@Value("${atf.queue.delay.30s}") String name) {
  38. Map arguments = new HashMap();
  39. arguments.put("x-message-ttl", 30 * 1000); // Message TTL = 30s
  40. arguments.put("x-max-length", 1000000); // Max length = 1million
  41. // 死信路由到死信交换器DLX
  42. arguments.put("x-dead-letter-exchange", "atf.exchange.delay.dlx");
  43. arguments.put("x-dead-letter-routing-key", "atf.routingkey.delay.dlk");
  44. return new Queue(name, true, false, false, arguments);
  45. }

  46. /**
  47. * 声明死信队列DLQ
  48. *
  49. * @param name
  50. * @return
  51. */
  52. @Bean
  53. public Queue queueDelayDlq(@Value("${atf.queue.delay.dlq}") String name) {
  54. return new Queue(name, true, false, false);
  55. }

  56. /**
  57. * 声明延迟交换器
  58. *
  59. * @param name
  60. * @return
  61. */
  62. @Bean
  63. public DirectExchange exchangeDelay(@Value("${atf.exchange.delay}") String name) {
  64. DirectExchange ret = new DirectExchange(name, true, false);
  65. ret.setInternal(false);
  66. return ret;
  67. }

  68. /**
  69. * 声明死信交换器DLX
  70. *
  71. * @param name
  72. * @return
  73. */
  74. @Bean
  75. public DirectExchange exchangeDelayDlx(@Value("${atf.exchange.delay.dlx}") String name) {
  76. return new DirectExchange(name, true, false);
  77. }

  78. /**
  79. * 绑定延迟交换器和30s延迟队列
  80. *
  81. * @param queue
  82. * @param exchange
  83. * @param routingKey
  84. * @return
  85. */
  86. @Bean
  87. public Binding bindingDelay(@Qualifier("queueDelay30s") Queue queue,
  88. @Qualifier("exchangeDelay") DirectExchange exchange,
  89. @Value("${atf.routingkey.delay.30s}") String routingKey) {
  90. return BindingBuilder.bind(queue).to(exchange).with(routingKey);
  91. }

  92. /**
  93. * 绑定死信交换器DLX和死信队列DLQ
  94. *
  95. * @param queue
  96. * @param exchange
  97. * @param routingKey
  98. * @return
  99. */
  100. @Bean
  101. public Binding bindingDelayConsume(@Qualifier("queueDelayDlq") Queue queue,
  102. @Qualifier("exchangeDelayDlx") DirectExchange exchange,
  103. @Value("${atf.routingkey.delay.dlk}") String routingKey) {
  104. return BindingBuilder.bind(queue).to(exchange).with(routingKey);
  105. }
  106. }


  1. public void sendDelay(AtfEventPayload payload) {
  2. rabbitTemplate.convertAndSend("atf.exchange.delay", "atf.routingkey.delay.30s", payload);
  3. }


  1. @RabbitListener(queues = "atf.queue.delay.dlq", containerFactory = "rabbitListenerContainerFactory")
  2. public void consumeDelay(AtfEventPayload payload) {
  3. try {
  4. AtWorkflow workflow = workflowSvs.find(payload.getWorkflowName());
  5. workflowSvs.execute(workflow, null);
  6. } catch (Exception e) {
  7. e.printStackTrace();
  8. }
  9. }

像支付宝,微信支付后的回调机制,可以使用延迟队列实现。可是创建延时不同时间的队列,死信后被消费,消费异常后,消息进入不同的队列。

/**
* 默认及时消息交换机
* @return
*/
@Bean()
public DirectExchange defaultDirectExchange() {return new DirectExchange(MessageQueueConstants.DEFAULT_DIRECT_EXCHANGE_NAME, true, false);
}/**
* 默认延迟消息死信队列
* @return
*/
@Bean
public Queue defaultDeadLetterQueue10() {Map, Object> arguments &#61; new HashMap<>();
arguments.put("x-dead-letter-exchange",MessageQueueConstants.DEFAULT_DIRECT_EXCHANGE_NAME);//设置交换机路由
arguments.put("x-dead-letter-routing-key", MessageQueueConstants.DEFAULT_REPEAT_TRADE_QUEUE_NAME);//设置转发队列名称
arguments.put("x-message-ttl", 10 * 1000); // Message TTL &#61; 10s
Queue queue &#61; new Queue(MessageQueueConstants.DEFAULT_DEAD_LETTER_QUEUE_NAME10,true,false,false,arguments);
return queue;
}/**
* 默认延迟消息死信队列
* &#64;return
*/
&#64;Bean
public Queue defaultDeadLetterQueue30() {Map, Object> arguments &#61; new HashMap<>();
arguments.put("x-dead-letter-exchange",MessageQueueConstants.DEFAULT_DIRECT_EXCHANGE_NAME);//设置交换机路由
arguments.put("x-dead-letter-routing-key", MessageQueueConstants.DEFAULT_REPEAT_TRADE_QUEUE_NAME);//设置转发队列名称
arguments.put("x-message-ttl", 30 * 1000); // Message TTL &#61; 30s
Queue queue &#61; new Queue(MessageQueueConstants.DEFAULT_DEAD_LETTER_QUEUE_NAME30,true,false,false,arguments);
return queue;
}/**
* 默认延迟消息死信队列
* &#64;return
*/
&#64;Bean
public Queue defaultDeadLetterQueue180() {Map, Object> arguments &#61; new HashMap<>();
arguments.put("x-dead-letter-exchange",MessageQueueConstants.DEFAULT_DIRECT_EXCHANGE_NAME);//设置交换机路由
arguments.put("x-dead-letter-routing-key", MessageQueueConstants.DEFAULT_REPEAT_TRADE_QUEUE_NAME);//设置转发队列名称
arguments.put("x-message-ttl", 1000*60*3); // Message TTL &#61; 180s
Queue queue &#61; new Queue(MessageQueueConstants.DEFAULT_DEAD_LETTER_QUEUE_NAME180,true,false,false,arguments);
return queue;
}/**
* 默认延迟消息死信队列
* &#64;return
*/
&#64;Bean
public Queue defaultDeadLetterQueue1200() {Map, Object> arguments &#61; new HashMap<>();
arguments.put("x-dead-letter-exchange",MessageQueueConstants.DEFAULT_DIRECT_EXCHANGE_NAME);//设置交换机路由
arguments.put("x-dead-letter-routing-key", MessageQueueConstants.DEFAULT_REPEAT_TRADE_QUEUE_NAME);//设置转发队列名称
arguments.put("x-message-ttl", 1000*60*20); // Message TTL &#61; 1200s
Queue queue &#61; new Queue(MessageQueueConstants.DEFAULT_DEAD_LETTER_QUEUE_NAME1200,true,false,false,arguments);
return queue;
}/**
* 默认延迟消息死信队列
* &#64;return
*/
&#64;Bean
public Queue defaultDeadLetterQueue3600() {Map, Object> arguments &#61; new HashMap<>();
arguments.put("x-dead-letter-exchange",MessageQueueConstants.DEFAULT_DIRECT_EXCHANGE_NAME);//设置交换机路由
arguments.put("x-dead-letter-routing-key", MessageQueueConstants.DEFAULT_REPEAT_TRADE_QUEUE_NAME);//设置转发队列名称
arguments.put("x-message-ttl", 1000*60*60); // Message TTL &#61; 3600s
Queue queue &#61; new Queue(MessageQueueConstants.DEFAULT_DEAD_LETTER_QUEUE_NAME3600,true,false,false,arguments);
return queue;
}&#64;Bean
public Binding defaultDeadLetterBinding10() {Binding bind &#61; BindingBuilder.bind(defaultDeadLetterQueue10()).to(defaultDirectExchange()).with(MessageQueueConstants.DEFAULT_DEAD_LETTER_QUEUE_NAME10);
return bind;
}
&#64;Bean
public Binding defaultDeadLetterBinding30() {Binding bind &#61; BindingBuilder.bind(defaultDeadLetterQueue30()).to(defaultDirectExchange()).with(MessageQueueConstants.DEFAULT_DEAD_LETTER_QUEUE_NAME30);
return bind;
}
&#64;Bean
public Binding defaultDeadLetterBinding180() {Binding bind &#61; BindingBuilder.bind(defaultDeadLetterQueue180()).to(defaultDirectExchange()).with(MessageQueueConstants.DEFAULT_DEAD_LETTER_QUEUE_NAME180);
return bind;
}
&#64;Bean
public Binding defaultDeadLetterBinding1200() {Binding bind &#61; BindingBuilder.bind(defaultDeadLetterQueue1200()).to(defaultDirectExchange()).with(MessageQueueConstants.DEFAULT_DEAD_LETTER_QUEUE_NAME1200);
return bind;
}
&#64;Bean
public Binding defaultDeadLetterBinding3600() {Binding bind &#61; BindingBuilder.bind(defaultDeadLetterQueue3600()).to(defaultDirectExchange()).with(MessageQueueConstants.DEFAULT_DEAD_LETTER_QUEUE_NAME3600);
return bind;
}/**
* 默认延迟消息死信接受转发消息队列
* &#64;return
*/
&#64;Bean
public Queue defaultRepeatTradeQueue() {Queue queue &#61; new Queue(MessageQueueConstants.DEFAULT_REPEAT_TRADE_QUEUE_NAME,true,false,false);
return queue;
}&#64;Bean
public Binding defaultRepeatTradeBinding() {return BindingBuilder.bind(defaultRepeatTradeQueue()).to(defaultDirectExchange()).with(MessageQueueConstants.DEFAULT_REPEAT_TRADE_QUEUE_NAME);
}


推荐阅读
  • 本文提供了 RabbitMQ 3.7 的快速上手指南,详细介绍了环境搭建、生产者和消费者的配置与使用。通过官方教程的指引,读者可以轻松完成初步测试和实践,快速掌握 RabbitMQ 的核心功能和基本操作。 ... [详细]
  • 题目旨在解决树上的路径最优化问题,具体为在给定的树中寻找一条长度介于L到R之间的路径,使该路径上的边权平均值最大化。通过点分治策略,可以有效地处理此类问题。若无长度限制,可采用01分数规划模型,将所有边权减去一个常数m,从而简化计算过程。此外,利用单调队列优化动态规划过程,进一步提高算法效率。 ... [详细]
  • IIS 7及7.5版本中应用程序池的最佳配置策略与实践
    在IIS 7及7.5版本中,优化应用程序池的配置是提升Web站点性能的关键步骤。具体操作包括:首先定位到目标Web站点的应用程序池,然后通过“应用程序池”菜单找到对应的池,右键选择“高级设置”。在一般优化方案中,建议调整以下几个关键参数:1. **基本设置**: - **队列长度**:默认值为1000,可根据实际需求调整队列长度,以提高处理请求的能力。此外,还可以进一步优化其他参数,如处理器使用限制、回收策略等,以确保应用程序池的高效运行。这些优化措施有助于提升系统的稳定性和响应速度。 ... [详细]
  • Prim算法在处理稠密图时表现出色,尤其适用于边数远多于顶点数的情形。传统实现的时间复杂度为 \(O(n^2)\),但通过引入优先队列进行优化,可以在点数为 \(m\)、边数为 \(n\) 的情况下显著降低时间复杂度,提高算法效率。这种优化方法不仅能够加速最小生成树的构建过程,还能在大规模数据集上保持良好的性能表现。 ... [详细]
  • Java 中优先级队列的轮询方法详解与应用 ... [详细]
  • 深入解析Gradle中的Project核心组件
    在Gradle构建系统中,`Project` 是一个核心组件,扮演着至关重要的角色。通过使用 `./gradlew projects` 命令,可以清晰地列出当前项目结构中包含的所有子项目,这有助于开发者更好地理解和管理复杂的多模块项目。此外,`Project` 对象还提供了丰富的配置选项和生命周期管理功能,使得构建过程更加灵活高效。 ... [详细]
  • 题目描述:小K不幸被LL邪教洗脑,洗脑程度之深使他决定彻底脱离这个邪教。在最终离开前,他计划再进行一次亚瑟王游戏。作为最后一战,他希望这次游戏能够尽善尽美。众所周知,亚瑟王游戏的结果很大程度上取决于运气,但通过合理的策略和算法优化,可以提高获胜的概率。本文将详细解析洛谷P3239 [HNOI2015] 亚瑟王问题,并提供具体的算法实现方法,帮助读者更好地理解和应用相关技术。 ... [详细]
  • 本文详细介绍了HDFS的基础知识及其数据读写机制。首先,文章阐述了HDFS的架构,包括其核心组件及其角色和功能。特别地,对NameNode进行了深入解析,指出其主要负责在内存中存储元数据、目录结构以及文件块的映射关系,并通过持久化方案确保数据的可靠性和高可用性。此外,还探讨了DataNode的角色及其在数据存储和读取过程中的关键作用。 ... [详细]
  • Java 8 引入了 Stream API,这一新特性极大地增强了集合数据的处理能力。通过 Stream API,开发者可以更加高效、简洁地进行集合数据的遍历、过滤和转换操作。本文将详细解析 Stream API 的核心概念和常见用法,帮助读者更好地理解和应用这一强大的工具。 ... [详细]
  • 本研究基于状态空间方法,通过动态可视化技术实现了汉诺塔问题的求解过程,即将n个盘子从A柱移动到C柱。本文提供了一个使用C语言在控制台进行动画绘制的示例,并详细注释了程序逻辑,以帮助读者更好地理解和学习该算法。 ... [详细]
  • Java 并发容器 ConcurrentLinkedQueue 的 peek() 方法解析与应用 ... [详细]
  • 在RabbitMQ中,消息发布者默认情况下不会接收到关于消息在Broker中状态的反馈,这可能导致消息丢失的问题。为了确保消息的可靠传输与投递,可以采用确认机制(如发布确认和事务模式)来验证消息是否成功抵达Broker,并采取相应的重试策略以提高系统的可靠性。此外,还可以配置消息持久化和镜像队列等高级功能,进一步增强消息的可靠性和高可用性。 ... [详细]
  • 深入解析队列机制及其广泛的应用场景
    本文深入探讨了队列机制的核心原理及其在多种应用场景中的广泛应用。首先,文章详细解析了队列的基本概念、操作方法及其时间复杂度。接着,通过具体实例,阐述了队列在操作系统任务调度、网络通信、事件处理等领域的实际应用。此外,文章还对比了队列与其他常见数据结构(如栈、数组和链表)的优缺点,帮助读者更好地理解和选择合适的数据结构。最后,通过具体的编程示例,进一步巩固了对队列机制的理解和应用。 ... [详细]
  • `ArrayDeque` 类中的 `removeLast()` 方法用于移除并返回双端队列中的最后一个元素。该方法在处理数据结构时非常有用,特别是在需要高效地从队列末尾移除元素的场景中。本文将详细介绍 `removeLast()` 方法的工作原理,并通过具体的应用实例展示其使用方法和优势。 ... [详细]
  • 本文探讨了Huffman树在数据结构中的应用及其原理。Huffman树,即哈夫曼树,是一种高效的数据压缩技术,通过构建最优二叉树实现编码,广泛应用于文件压缩和网络传输中,有效减少数据存储和传输的空间需求。 ... [详细]
author-avatar
挖墙找红杏000
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有