延时插件实现篇,参考文章:【SpringBoot】43、SpringBoot中整合RabbitMQ实现延时队列(延时插件篇)
一、实现原理
死信队列:DLX,dead-letter-exchange
利用DLX,当消息在一个队列中变成死信 (dead message) 之后,它能被重新publish到另一个Exchange,这个Exchange就是DLX
消息被拒绝(basic.reject / basic.nack),并且requeue = false
消息TTL过期
队列达到最大长度
DLX也是一个正常的Exchange,和一般的Exchange没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性。
当这个队列中有死信时,RabbitMQ就会自动的将这个消息重新发布到设置的Exchange上去,进而被路由到另一个队列。
可以监听这个队列中的消息做相应的处理。
我们将消息发送到消息队列中,并设置一个过期时间,该队列没有消费者
消息的过期时间到了之后,由于没有消费者,就会进入死信队列
我们用一个消费者接收死信队列的消息,就能达到延迟消息的目的
二、实现过程
<!-- rabbitmq消息队列 -->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
spring:rabbitmq:host: 127.0.0.1port: 5672username: guestpassword: guest
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;&#64;Configuration
public class RabbitMqConfig {public static final String DLX_QUEUE &#61; "dlx_queue";public static final String DLX_EXCHANGE &#61; "dlx_exchange";public static final String DLX_ROUTING_KEY &#61; "dlx_routing_key";public static final String MSG_QUEUE &#61; "msg_queue";public static final String MSG_EXCHANGE &#61; "msg_exchange";public static final String MSG_ROUTING_KEY &#61; "msg_routing_key";&#64;BeanQueue dlxQueue() {return new Queue(DLX_QUEUE, true, false, false);}&#64;BeanDirectExchange dlxExchange() {return new DirectExchange(DLX_EXCHANGE, true, false);}&#64;BeanBinding dlxBinding() {return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with(DLX_ROUTING_KEY);}&#64;BeanQueue msgQueue() {Map<String, Object> args &#61; new HashMap<>();args.put("x-message-ttl", 1000 * 10);args.put("x-dead-letter-exchange", DLX_EXCHANGE);args.put("x-dead-letter-routing-key", DLX_ROUTING_KEY);return new Queue(MSG_QUEUE, true, false, false, args);}&#64;BeanDirectExchange msgExchange() {return new DirectExchange(MSG_EXCHANGE, true, false);}&#64;BeanBinding msgBinding() {return BindingBuilder.bind(msgQueue()).to(msgExchange()).with(MSG_ROUTING_KEY);}
}
分别创建了私信队列和普通消息队列&#xff0c;普通消息队列设置了消息过期时间为 10s&#xff0c;设置了死信交换机、死信 routing_key&#xff0c;消息过期之后就能进入了死信队列中。
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;&#64;Slf4j
&#64;Component
public class DlxConsumer {&#64;RabbitListener(queues &#61; RabbitMqConfig.DLX_QUEUE)public void handle(String message) {log.info("------------------收到消息&#xff1a;" &#43; message);}
}
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;&#64;Slf4j
&#64;Component
public class MsgProducer {&#64;Autowiredprivate RabbitTemplate rabbitTemplate;public void sendMsg(String message) {try {rabbitTemplate.convertAndSend(RabbitMqConfig.MSG_EXCHANGE, RabbitMqConfig.MSG_ROUTING_KEY, message);log.info("-------------------消息发送成功");} catch (Exception e) {log.error("-------------------消息发送失败");}}
}
三、测试
import com.asurplus.common.rabbitmq.MsgProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;&#64;RestController
public class TestController {&#64;Autowiredprivate MsgProducer msgProducer;&#64;GetMapping("test")public String test() {msgProducer.sendMsg("今天天气好好");return "success";}
}
访问接口&#xff1a;
localhost:8080/test
消息发送成功&#xff0c;10s 后&#xff1a;
收到了消息&#xff0c;证明我的延时消息已经成功
如您在阅读中发现不足&#xff0c;欢迎留言&#xff01;&#xff01;&#xff01;