热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

RabbitMQ(四)实现延迟消息

1概述常用的延迟消息实现方式有:利用队列TTL死信队列方式实现利用消息延迟插件实现消息变成死信的原因有:​​​​消息过期。消息TTL或队列TTL消息

1 概述

常用的延迟消息实现方式有:


  • 利用 队列TTL + 死信队列 方式实现

  • 利用消息延迟插件实现

消息变成死信的原因有:​​​​


  • 消息过期。消息TTL或队列TTL

  • 消息被拒绝。消费者调用了 channel.basicNackchannel.basicReject ,并且设置 requeue=false

  • 队列满。

    当设置了最大队列长度或大小并达到最大值时,RabbitMQ 的默认行为是从队列前面丢弃或 dead-letter 消息(即队列中最早的消息)。要修改这种行为,请使用下面描述的 overflow 设置

    overflow

    常见参数说明


2 队列TTL + 死信队列方式

这里直接贴出 rabbitConfig 代码,其他的代码参考该文章:RabbitMQ (三)消息重试


1 RabbitConfig

主要操作:


  1. 创建死信队列和交换器,并绑定

  2. 创建队列,同时设置队列的TTL、绑定死信队列;创建交换器,并绑定,

package com.fmi110.rabbitmq.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;


/*** @author fmi110* @description rabbitMQ 配置类* @date 2021/7/1 15:08*/
@Configuration
@Slf4j
public class RabbitConfig {
​String dlQueueName  = "my-queue-dl"; // 普通队列名称String dlExchangeName = "my-exchange-dl"; // 死信交换器名称String dlRoutingKey   = "rabbit.test";
​String queueName = "retry-queue";String exchangeName = "my-exchange"; // 普通交换器名称
​/*** 创建死信队列** @return*/@Beanpublic Queue queueDL() {
​return QueueBuilder.durable(dlQueueName) // 持久化队列.build();}
​/*** 创建死信交换机** @return*/@Beanpublic TopicExchange exchangeDL() {return new TopicExchange(dlExchangeName, true, false);}
​/*** 绑定操作*/@Beanpublic Binding bindQueueDL2ExchangeDL(Queue queueDL, TopicExchange exchangeDL) {log.info(">>>> 队列与交换器绑定");return BindingBuilder.bind(queueDL).to(exchangeDL).with(dlRoutingKey);}
​/*** 创建持久化队列,同时绑定死信交换器** &#64;return*/&#64;Beanpublic Queue queue() {log.info(">>>> 创建队列 retry-queue");HashMap params &#61; new HashMap<>();params.put("x-dead-letter-exchange", dlExchangeName);params.put("x-dead-letter-routing-key", dlRoutingKey);
​params.put("x-message-ttl", 10 * 1000); // 队列过期时间 10s
​return QueueBuilder.durable(queueName) // 持久化队列.withArguments(params) // 关联死信交换器.build();}

​/*** 创建交换机** &#64;return*/&#64;Beanpublic TopicExchange exchange() {log.info(">>>> 创建交换器 my-exchange");boolean durable    &#61; true; // 持久化boolean autoDelete &#61; false; // 消费者全部解绑时不自动删除return new TopicExchange(exchangeName, durable, autoDelete);}
​/*** 绑定队列到交换机** &#64;param queue* &#64;param exchange* &#64;return*/&#64;Beanpublic Binding bindQueue2Exchange(Queue queue, TopicExchange exchange) {log.info(">>>> 队列与交换器绑定");return BindingBuilder.bind(queue).to(exchange).with("rabbit.test");}

}



2 RabbitConsumer 消费者

延迟消息通过队列的TTL产生&#xff0c;所以这里不应该设置普通队列的消费者&#xff0c;让消息过期然后自动转入死信队列&#xff0c;此时再进行消费以此实现延迟消息

package com.fmi110.rabbitmq;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

import java.util.concurrent.atomic.AtomicInteger;


/*** &#64;author fmi110* &#64;description 消息消费者* &#64;date 2021/7/1 16:08*/
&#64;Component
&#64;Slf4j
public class RabbitConsumer {/*** 死信队列消费者* &#64;param data* &#64;param channel* &#64;param tag* &#64;throws Exception*/&#64;RabbitListener(queues&#61;"my-queue-dl")public void consumeDL(String data, Channel channel, &#64;Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception{SimpleDateFormat dateFormat &#61; new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");log.info(">>>> {} 死信队列消费 tag &#61; {},消息内容 : {}", dateFormat.format(new Date()), tag, data);}
}



3 弊端

如上图所示实现了延迟10s的消息&#xff0c;但是如果需要实现延迟5s的消息&#xff0c;则需要新建一个TTL为5s的队列&#xff0c;所以如果延迟时间需要很多的话&#xff0c;就需要创建很多队列&#xff0c;实现起来比较麻烦。

再贴一段对消息设置TTL的代码&#xff1a;

   AtomicInteger aint &#61; new AtomicInteger();public void send(String msg) {String exchangeName &#61; "my-exchange";String routingKey   &#61; "rabbit.test";// rabbitTemplate.convertAndSend(exchangeName, routingKey, msg);SimpleDateFormat dateFormat &#61; new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
​MessageProperties messageProperties &#61; new MessageProperties();messageProperties.setCorrelationId(UUID.randomUUID().toString().replace("-", ""));// TTL 为5sint i &#61; 9 * 1000;
​if (aint.incrementAndGet() % 2 &#61;&#61; 0) {i &#61; 5 * 1000;}msg &#61; "message send at " &#43; dateFormat.format(new Date()) &#43;", expired at "&#43;dateFormat.format(new Date().getTime()&#43;i);messageProperties.setExpiration(String.valueOf(i)); // 设置过期时间Message message &#61; new Message(msg.getBytes(StandardCharsets.UTF_8), messageProperties);rabbitTemplate.send(exchangeName, routingKey, message);}

可以看到消息的过期时间与期望的不一致。因为队列是先进后出的&#xff0c;只有在头部的消息&#xff0c;系统才对其进行过期检测。所以如果消息不再队列头部&#xff0c;即使时间已经过期&#xff0c;也不会导致消息进入死信队列&#xff01;&#xff01;&#xff01;

当同时设置了消息的TTL和队列的TTL时&#xff0c;过期时间谁小谁生效&#xff08;队列头部的消息才进行TTL检测&#xff09;。


3 使用延迟插件实现

插件的安装参考 docker安装rabbitMQ


1 RabbitConfig

使用延迟插件实现&#xff0c;需要创建延迟交换器&#xff0c;使用 CustomExchange 类创建&#xff0c;同时指定交换器类型为 x-delayed-message &#xff0c;此外还需要设置属性 x-delayed-type &#xff0c;创建的交换器如下图所示

package com.fmi110.rabbitmq.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;

/*** &#64;author fmi110* &#64;description 配置交换器、队列* &#64;date 2021/7/3 9:58*/
&#64;Slf4j
&#64;Configuration
public class RabbitConfig2 {
​String exchangeName &#61; "delay-exchange";String queueName    &#61; "delay-queue";String exchangeType &#61; "x-delayed-message";
​&#64;Beanpublic CustomExchange exchange() {
​HashMap args &#61; new HashMap<>();args.put("x-delayed-type", "topic");return new CustomExchange(exchangeName, exchangeType, true, false, args);}
​&#64;Beanpublic Queue queue() {return new Queue(queueName, true, false, false);}
​&#64;Beanpublic Binding binding(CustomExchange exchange, Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("rabbit.delay").noargs();
​}
}

2 RabbitProducer

这里开启了消息投递失败回调。测试中发现&#xff0c;使用延迟插件&#xff0c;虽然消息正常投递了&#xff0c;但是始终会报 “NO_ROUTER” 提示路由失败。虽然不影响功能。运行截图见后文。目前不确定是我设置问题还是框架的问题...

package com.fmi110.rabbitmq;

import com.rabbitmq.client.AMQP;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;

import javax.annotation.PostConstruct;
import java.nio.charset.StandardCharsets;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;

/*** &#64;author fmi110* &#64;description 消息生产者* &#64;date 2021/7/1 15:08*/
&#64;Component
&#64;Slf4j
public class RabbitProducer {&#64;AutowiredRabbitTemplate rabbitTemplate;
​/*** 1 设置 confirm 回调&#xff0c;消息发送到 exchange 时回调* 2 设置 return callback ,当路由规则无法匹配到消息队列时&#xff0c;回调*

* correlationData&#xff1a;消息发送时&#xff0c;传递的参数&#xff0c;里边只有一个id属性&#xff0c;标识消息用*/&#64;PostConstructpublic void enableConfirmCallback() {// #1/*** 连接不上 exchange或exchange不存在时回调*/rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {if (!ack) {log.error("消息发送失败");// TODO 记录日志&#xff0c;发送通知等逻辑}});
​// #2/*** 消息投递到队列失败时&#xff0c;才会回调该方法* message:发送的消息* exchange:消息发往的交换器的名称* routingKey:消息携带的路由关键字信息*/rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {log.error("{}&#xff0c;exchange&#61;{},routingKey&#61;{}",replyText,exchange,routingKey);// TODO 路由失败后续处理逻辑});}
​public void sendDelayMsg(String delay) {int               delayInt          &#61; StringUtils.isEmpty(delay) ? 0 : Integer.valueOf(delay);String            exchangeName      &#61; "delay-exchange";String            routingKey        &#61; "rabbit.delay";SimpleDateFormat  dateFormat        &#61; new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");String            msg               &#61; "message send at " &#43; dateFormat.format(new Date()) &#43; ", expired at " &#43; dateFormat.format(new Date().getTime() &#43; delayInt * 1000);
//       MessageProperties messageProperties &#61; new MessageProperties();
//       messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);// 持久化消息
//       messageProperties.setDelay(delayInt * 1000);
//       Message message &#61; new Message(msg.getBytes(StandardCharsets.UTF_8), messageProperties);
//       rabbitTemplate.send(exchangeName,routingKey,message);
​rabbitTemplate.convertAndSend(exchangeName, routingKey, msg, message ->{message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);  //消息持久化message.getMessageProperties().setDelay(delayInt * 1000);   // 单位为毫秒return message;});
​}
}

3 RabbitConsumer

消费者&#xff0c;指定监听对应的消息队列即可。

package com.fmi110.rabbitmq;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.atomic.AtomicInteger;


/*** &#64;author fmi110* &#64;description 消息消费者* &#64;date 2021/7/1 16:08*/
&#64;Component
&#64;Slf4j
public class RabbitConsumer {
​&#64;RabbitListener(queues&#61;"delay-queue")public void consumeDelay(String data, Channel channel, &#64;Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception{SimpleDateFormat dateFormat &#61; new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");log.info(">>>> {} 延迟队列消费 tag &#61; {},消息内容 : {}", dateFormat.format(new Date()), tag, data);}
}



4 controller

package com.fmi110.rabbitmq.controller;


import com.fmi110.rabbitmq.RabbitProducer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;
import sun.rmi.runtime.Log;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
&#64;Slf4j
&#64;RestController
public class TestController {&#64;AutowiredRabbitProducer rabbitProducer;
​&#64;GetMapping("/delay")public Object delay(String delay) {rabbitProducer.sendDelayMsg(delay); // 发送消息HashMap result &#61; new HashMap<>();result.put("code", 0);result.put("msg", "success");return result;}
}

5 依赖

   

   org.springframework.bootspring-boot-starter-web
org.springframework.bootspring-boot-starter-amqp
org.projectlomboklombokcompile

6 运行截图


推荐阅读
  • 本文提供了 RabbitMQ 3.7 的快速上手指南,详细介绍了环境搭建、生产者和消费者的配置与使用。通过官方教程的指引,读者可以轻松完成初步测试和实践,快速掌握 RabbitMQ 的核心功能和基本操作。 ... [详细]
  • 如何使用 net.sf.extjwnl.data.Word 类及其代码示例详解 ... [详细]
  • 在RabbitMQ中,消息发布者默认情况下不会接收到关于消息在Broker中状态的反馈,这可能导致消息丢失的问题。为了确保消息的可靠传输与投递,可以采用确认机制(如发布确认和事务模式)来验证消息是否成功抵达Broker,并采取相应的重试策略以提高系统的可靠性。此外,还可以配置消息持久化和镜像队列等高级功能,进一步增强消息的可靠性和高可用性。 ... [详细]
  • 深入解析JWT的实现与应用
    本文深入探讨了JSON Web Token (JWT) 的实现机制及其应用场景。JWT 是一种基于 RFC 7519 标准的开放性认证协议,用于在各方之间安全地传输信息。文章详细分析了 JWT 的结构、生成和验证过程,并讨论了其在现代 Web 应用中的实际应用案例,为开发者提供了全面的理解和实践指导。 ... [详细]
  • 探讨 jBPM 数据库表结构设计的精要与实践
    探讨 jBPM 数据库表结构设计的精要与实践 ... [详细]
  • 精通jQuery:深入解析事件处理机制与应用技巧
    本文详细探讨了jQuery的事件处理机制及其应用技巧,通过具体的代码示例,逐一解析了每个jQuery代码片段与其对应的HTML结构。文章以标记为基准,CSS作为通用样式,确保每段代码都能独立运行。HTML和CSS代码统一放置在文章末尾,方便读者参考和实践。 ... [详细]
  • 深入解析 org.hibernate.event.spi.EventSource.getFactory() 方法及其应用实例 ... [详细]
  • 探讨 javax.jms.JMSException 中 getLocalizedMessage 方法的应用与实例代码分析 ... [详细]
  • PHP中元素的计量单位是什么? ... [详细]
  • 本文将详细介绍在Android应用中添加自定义返回按钮的方法,帮助开发者更好地理解和实现这一功能。通过具体的代码示例和步骤说明,本文旨在为初学者提供清晰的指导,确保他们在开发过程中能够顺利集成返回按钮,提升用户体验。 ... [详细]
  • 本文深入探讨了 MXOTDLL.dll 在 C# 环境中的应用与优化策略。针对近期公司从某生物技术供应商采购的指纹识别设备,该设备提供的 DLL 文件是用 C 语言编写的。为了更好地集成到现有的 C# 系统中,我们对原生的 C 语言 DLL 进行了封装,并利用 C# 的互操作性功能实现了高效调用。此外,文章还详细分析了在实际应用中可能遇到的性能瓶颈,并提出了一系列优化措施,以确保系统的稳定性和高效运行。 ... [详细]
  • MySQL性能优化与调参指南【数据库管理】
    本文详细探讨了MySQL数据库的性能优化与参数调整技巧,旨在帮助数据库管理员和开发人员提升系统的运行效率。内容涵盖索引优化、查询优化、配置参数调整等方面,结合实际案例进行深入分析,提供实用的操作建议。此外,还介绍了常见的性能监控工具和方法,助力读者全面掌握MySQL性能优化的核心技能。 ... [详细]
  • 深入解析 javax.faces.view.ViewDeclarationLanguageWrapper.getWrapped() 方法及其应用实例 ... [详细]
  • 本文介绍了一种专为清洁工人设计的自定义文本烟花效果。通过该功能,用户可以输入特定的感谢或祝福语句,系统将生成绚丽的烟花动画,以表达对清洁工人的敬意和感激之情。该特效不仅美观,还能增强用户的互动体验,提升公共场合的氛围。 ... [详细]
  • 软件开发史上最具影响力的十位编程大师(附图解)
    在软件开发领域,有十位编程大师对行业发展产生了深远影响。本文基于国外知名社区的一项评选,通过图文并茂的形式,详细介绍了这十位杰出人物,包括游戏开发先驱John Carmack等,为读者呈现了他们卓越的技术贡献与创新精神。 ... [详细]
author-avatar
晨雨心秋-济微路唐姜合
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有