主要有2种方式,如果同时指定了Message TTL
和Queue TTL
,则优先较小的那一个:
-
指定一条消息的过期时间。
-
给队列设置消息过期时间,队列中的所有消息都有同样的过期时间。
队列设置的方式
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;@Configuration
public class DirectRabbitConfig {//队列 起名&#xff1a;TestDirectQueue&#64;Beanpublic Queue TestDirectQueue() {// durable:是否持久化,默认是false,持久化队列&#xff1a;会被存储在磁盘上&#xff0c;当消息代理重启时仍然存在&#xff0c;暂存队列&#xff1a;当前连接有效// exclusive:默认也是false&#xff0c;只能被当前创建的连接使用&#xff0c;而且当连接关闭后队列即被删除。此参考优先级高于durable// autoDelete:是否自动删除&#xff0c;当没有生产者或者消费者使用此队列&#xff0c;该队列会自动删除。Map map &#61; new HashMap<>();// 队列中的消息未被消费则10秒后过期map.put("x-message-ttl", 10000); return new Queue("TestDirectQueue", true, false, false, map);}//Direct交换机 起名&#xff1a;TestDirectExchange&#64;BeanDirectExchange TestDirectExchange() {return new DirectExchange("TestDirectExchange", true, false);}//绑定 将队列和交换机绑定, 并设置用于匹配键&#xff1a;TestDirectRouting&#64;BeanBinding bindingDirect() {return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("TestDirectRouting");}}
声明队列时设置1个x-message-ttl
的属性&#xff0c;并设置过期时间&#xff0c;凡是推送到该队列中的所有消息&#xff0c;都会有一个10秒后过期的属性。
可以看到创建的队列有TTL
的特性&#xff0c;表示该队列中的消息会自动过期。
单独设置某条消息的方式
import cn.huawei.rabbitmqtest1.pojo.User;
import com.alibaba.fastjson.JSON;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import java.nio.charset.StandardCharsets;
import java.util.UUID;&#64;RestController
public class SendMessageController {&#64;AutowiredRabbitTemplate rabbitTemplate; //使用RabbitTemplate,这提供了接收/发送等等方法&#64;GetMapping("/sendDirectMessage")public void sendDirectMessage() {MessageProperties messageProperties &#61; new MessageProperties();// 设置过期时间&#xff0c;单位&#xff1a;毫秒messageProperties.setExpiration("30000"); for (int i &#61; 1; i <&#61; 50; i&#43;&#43;) {//这个参数是用来做消息的唯一标识//发布消息时使用&#xff0c;存储在消息的headers中CorrelationData correlationData &#61; new CorrelationData(UUID.randomUUID().toString());User user &#61; new User(i &#43; "", "陈四 " &#43; i);Message message &#61; new Message(JSON.toJSONString(user).getBytes(StandardCharsets.UTF_8), messageProperties);rabbitTemplate.convertAndSend("TestDirectExchange", "TestDirectRouting", message, correlationData);}}
}
消息推送到队列后&#xff0c;如果指定时间内没有被消费&#xff0c;则会自动过期。
注意&#xff1a;
RabbitMQ
只会对队列头部的消息进行过期淘汰。如果单独给消息设置TTL&#xff0c;先入队列的消息过期时间如果设置比较长&#xff0c;后入队列的设置时间比较短。会造成消息不会及时地过期淘汰&#xff0c;导致消息的堆积。