热门标签 | HotTags
当前位置:  开发笔记 > 运维 > 正文

SpringBoot集成RabbitMQ的方法(死信队列)

这篇文章主要介绍了SpringBoot集成RabbitMQ的方法(死信队列),文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧

介绍

死信队列:没有被及时消费的消息存放的队列,消息没有被及时消费有以下几点原因:
1.有消息被拒绝(basic.reject/ basic.nack)并且requeue=false
2.队列达到最大长度
3.消息TTL过期

场景

1.小时进入初始队列,等待30分钟后进入5分钟队列
2.消息等待5分钟后进入执行队列
3.执行失败后重新回到5分钟队列
4.失败5次后,消息进入2小时队列
5.消息等待2小时进入执行队列
6.失败5次后,将消息丢弃或做其他处理

使用

安装MQ

使用docker方式安装,选择带mangement的版本

docker pull rabbitmq:management
docker run -d -p 5672:5672 -p 15672:15672 --name rabbitmq rabbitmq:management

访问 localhost: 15672,默认账号密码guest/guest

项目配置

(1)创建springboot项目
(2)在application.properties配置文件中配置mq连接信息

spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

(3)队列配置

package com.df.ps.mq;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.beans.factory.annotation.Autowire;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class MqConfig {

  //time
  @Value("${spring.df.buffered.min:120}")
  private int springdfBufferedTime;

  @Value("${spring.df.high-buffered.min:5}")
  private int springdfHighBufferedTime;

  @Value("${spring.df.low-buffered.min:120}")
  private int springdfLowBufferedTime;

  // 30min Buffered Queue
  @Value("${spring.df.queue:spring-df-buffered-queue}")
  private String springdfBufferedQueue;

  @Value("${spring.df.topic:spring-df-buffered-topic}")
  private String springdfBufferedTopic;

  @Value("${spring.df.route:spring-df-buffered-route}")
  private String springdfBufferedRouteKey;

  // 5M Buffered Queue
  @Value("${spring.df.high-buffered.queue:spring-df-high-buffered-queue}")
  private String springdfHighBufferedQueue;

  @Value("${spring.df.high-buffered.topic:spring-df-high-buffered-topic}")
  private String springdfHighBufferedTopic;

  @Value("${spring.df.high-buffered.route:spring-df-high-buffered-route}")
  private String springdfHighBufferedRouteKey;

  // High Queue
  @Value("${spring.df.high.queue:spring-df-high-queue}")
  private String springdfHighQueue;

  @Value("${spring.df.high.topic:spring-df-high-topic}")
  private String springdfHighTopic;

  @Value("${spring.df.high.route:spring-df-high-route}")
  private String springdfHighRouteKey;

  // 2H Low Buffered Queue
  @Value("${spring.df.low-buffered.queue:spring-df-low-buffered-queue}")
  private String springdfLowBufferedQueue;

  @Value("${spring.df.low-buffered.topic:spring-df-low-buffered-topic}")
  private String springdfLowBufferedTopic;

  @Value("${spring.df.low-buffered.route:spring-df-low-buffered-route}")
  private String springdfLowBufferedRouteKey;

  // Low Queue
  @Value("${spring.df.low.queue:spring-df-low-queue}")
  private String springdfLowQueue;

  @Value("${spring.df.low.topic:spring-df-low-topic}")
  private String springdfLowTopic;

  @Value("${spring.df.low.route:spring-df-low-route}")
  private String springdfLowRouteKey;


  @Bean(autowire = Autowire.BY_NAME, value = "springdfBufferedQueue")
  Queue springdfBufferedQueue() {
    int bufferedTime = 1000 * 60 * springdfBufferedTime;
    return createBufferedQueue(springdfBufferedQueue, springdfHighBufferedTopic, springdfHighBufferedRouteKey, bufferedTime);
  }

  @Bean(autowire = Autowire.BY_NAME, value = "springdfHighBufferedQueue")
  Queue springdfHighBufferedQueue() {
    int highBufferedTime = 1000 * 60 * springdfHighBufferedTime;
    return createBufferedQueue(springdfHighBufferedQueue, springdfHighTopic, springdfHighRouteKey, highBufferedTime);
  }

  @Bean(autowire = Autowire.BY_NAME, value = "springdfHighQueue")
  Queue springdfHighQueue() {
    return new Queue(springdfHighQueue, true);
  }

  @Bean(autowire = Autowire.BY_NAME, value = "springdfLowBufferedQueue")
  Queue springdfLowBufferedQueue() {
    int lowBufferedTime = 1000 * 60 * springdfLowBufferedTime;
    return createBufferedQueue(springdfLowBufferedQueue, springdfLowTopic, springdfLowRouteKey, lowBufferedTime);
  }

  @Bean(autowire = Autowire.BY_NAME, value = "springdfLowQueue")
  Queue springdfLowQueue() {
    return new Queue(springdfLowQueue, true);
  }


  @Bean(autowire = Autowire.BY_NAME, value = "springdfBufferedTopic")
  TopicExchange springdfBufferedTopic() {
    return new TopicExchange(springdfBufferedTopic);
  }

  @Bean
  Binding springBuffereddf(Queue springdfBufferedQueue, TopicExchange springdfBufferedTopic) {
    return BindingBuilder.bind(springdfBufferedQueue).to(springdfBufferedTopic).with(springdfBufferedRouteKey);
  }


  @Bean(autowire = Autowire.BY_NAME, value = "springdfHighBufferedTopic")
  TopicExchange springdfHighBufferedTopic() {
    return new TopicExchange(springdfHighBufferedTopic);
  }

  @Bean
  Binding springHighBuffereddf(Queue springdfHighBufferedQueue, TopicExchange springdfHighBufferedTopic) {
    return BindingBuilder.bind(springdfHighBufferedQueue).to(springdfHighBufferedTopic).with(springdfHighBufferedRouteKey);
  }

  @Bean(autowire = Autowire.BY_NAME, value = "springdfHighTopic")
  TopicExchange springdfHighTopic() {
    return new TopicExchange(springdfHighTopic);
  }

  @Bean
  Binding springHighdf(Queue springdfHighQueue, TopicExchange springdfHighTopic) {
    return BindingBuilder.bind(springdfHighQueue).to(springdfHighTopic).with(springdfHighRouteKey);
  }

  @Bean(autowire = Autowire.BY_NAME, value = "springdfLowBufferedTopic")
  TopicExchange springdfLowBufferedTopic() {
    return new TopicExchange(springdfLowBufferedTopic);
  }

  @Bean
  Binding springLowBuffereddf(Queue springdfLowBufferedQueue, TopicExchange springdfLowBufferedTopic) {
    return BindingBuilder.bind(springdfLowBufferedQueue).to(springdfLowBufferedTopic).with(springdfLowBufferedRouteKey);
  }

  @Bean(autowire = Autowire.BY_NAME, value = "springdfLowTopic")
  TopicExchange springdfLowTopic() {
    return new TopicExchange(springdfLowTopic);
  }

  @Bean
  Binding springLowdf(Queue springdfLowQueue, TopicExchange springdfLowTopic) {
    return BindingBuilder.bind(springdfLowQueue).to(springdfLowTopic).with(springdfLowRouteKey);
  }


  @Bean
  SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
                       MessageListenerAdapter listenerAdapter) {
    SimpleMessageListenerContainer cOntainer= new SimpleMessageListenerContainer();
    container.setConnectionFactory(connectionFactory);
    container.setQueueNames(springdfHighQueue, springdfLowQueue);
    container.setMessageListener(listenerAdapter);
    return container;
  }

  @Bean
  MessageListenerAdapter listenerAdapter(IntegrationReceiver receiver) {


    MessageListenerAdapter adapter = new MessageListenerAdapter(receiver);
    adapter.setDefaultListenerMethod("receive");
    Map queueOrTagToMethodName = new HashMap<>();
    queueOrTagToMethodName.put(springdfHighQueue, "springdfHighReceive");
    queueOrTagToMethodName.put(springdfLowQueue, "springdfLowReceive");
    adapter.setQueueOrTagToMethodName(queueOrTagToMethodName);
    return adapter;

  }


  private Queue createBufferedQueue(String queueName, String topic, String routeKey, int bufferedTime) {
    Map args = new HashMap<>();
    args.put("x-dead-letter-exchange", topic);
    args.put("x-dead-letter-routing-key", routeKey);
    args.put("x-message-ttl", bufferedTime);
    // 是否持久化
    boolean durable = true;
    // 仅创建者可以使用的私有队列,断开后自动删除
    boolean exclusive = false;
    // 当所有消费客户端连接断开后,是否自动删除队列
    boolean autoDelete = false;

    return new Queue(queueName, durable, exclusive, autoDelete, args);
  }
}

消费者配置

package com.df.ps.mq;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;

import java.util.Map;

public class MqReceiver {

  private static Logger logger = LoggerFactory.getLogger(MqReceiver.class);

  @Value("${high-retry:5}")
  private int highRetry;

  @Value("${low-retry:5}")
  private int lowRetry;

  @Value("${spring.df.high-buffered.topic:spring-df-high-buffered-topic}")
  private String springdfHighBufferedTopic;

  @Value("${spring.df.high-buffered.route:spring-df-high-buffered-route}")
  private String springdfHighBufferedRouteKey;

  @Value("${spring.df.low-buffered.topic:spring-df-low-buffered-topic}")
  private String springdfLowBufferedTopic;

  @Value("${spring.df.low-buffered.route:spring-df-low-buffered-route}")
  private String springdfLowBufferedRouteKey;

  private final RabbitTemplate rabbitTemplate;
  @Autowired
  public MqReceiver(RabbitTemplate rabbitTemplate) {
    this.rabbitTemplate = rabbitTemplate;
  }

  public void receive(Object message) {
    if (logger.isInfoEnabled()) {
      logger.info("default receiver: " + message);
    }
  }

  /**
   * 消息从初始队列进入5分钟的高速缓冲队列
   * @param message
   */
  public void highReceiver(Object message){
    ObjectMapper mapper = new ObjectMapper();
    Map msg = mapper.convertValue(message, Map.class);

    try{
      logger.info("这里做消息处理...");
    }catch (Exception e){
      int times = msg.get("times") == null &#63; 0 : (int) msg.get("times");
      if (times 

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。


推荐阅读
  • Java序列化对象传给PHP的方法及原理解析
    本文介绍了Java序列化对象传给PHP的方法及原理,包括Java对象传递的方式、序列化的方式、PHP中的序列化用法介绍、Java是否能反序列化PHP的数据、Java序列化的原理以及解决Java序列化中的问题。同时还解释了序列化的概念和作用,以及代码执行序列化所需要的权限。最后指出,序列化会将对象实例的所有字段都进行序列化,使得数据能够被表示为实例的序列化数据,但只有能够解释该格式的代码才能够确定数据的内容。 ... [详细]
  • android listview OnItemClickListener失效原因
    最近在做listview时发现OnItemClickListener失效的问题,经过查找发现是因为button的原因。不仅listitem中存在button会影响OnItemClickListener事件的失效,还会导致单击后listview每个item的背景改变,使得item中的所有有关焦点的事件都失效。本文给出了一个范例来说明这种情况,并提供了解决方法。 ... [详细]
  • 本文讨论了Alink回归预测的不完善问题,指出目前主要针对Python做案例,对其他语言支持不足。同时介绍了pom.xml文件的基本结构和使用方法,以及Maven的相关知识。最后,对Alink回归预测的未来发展提出了期待。 ... [详细]
  • 本文讲述了如何通过代码在Android中更改Recycler视图项的背景颜色。通过在onBindViewHolder方法中设置条件判断,可以实现根据条件改变背景颜色的效果。同时,还介绍了如何修改底部边框颜色以及提供了RecyclerView Fragment layout.xml和项目布局文件的示例代码。 ... [详细]
  • 在说Hibernate映射前,我们先来了解下对象关系映射ORM。ORM的实现思想就是将关系数据库中表的数据映射成对象,以对象的形式展现。这样开发人员就可以把对数据库的操作转化为对 ... [详细]
  • 本文介绍了在SpringBoot中集成thymeleaf前端模版的配置步骤,包括在application.properties配置文件中添加thymeleaf的配置信息,引入thymeleaf的jar包,以及创建PageController并添加index方法。 ... [详细]
  • 本文讨论了在Spring 3.1中,数据源未能自动连接到@Configuration类的错误原因,并提供了解决方法。作者发现了错误的原因,并在代码中手动定义了PersistenceAnnotationBeanPostProcessor。作者删除了该定义后,问题得到解决。此外,作者还指出了默认的PersistenceAnnotationBeanPostProcessor的注册方式,并提供了自定义该bean定义的方法。 ... [详细]
  • eclipse学习(第三章:ssh中的Hibernate)——11.Hibernate的缓存(2级缓存,get和load)
    本文介绍了eclipse学习中的第三章内容,主要讲解了ssh中的Hibernate的缓存,包括2级缓存和get方法、load方法的区别。文章还涉及了项目实践和相关知识点的讲解。 ... [详细]
  • ZSI.generate.Wsdl2PythonError: unsupported local simpleType restriction ... [详细]
  • XML介绍与使用的概述及标签规则
    本文介绍了XML的基本概念和用途,包括XML的可扩展性和标签的自定义特性。同时还详细解释了XML标签的规则,包括标签的尖括号和合法标识符的组成,标签必须成对出现的原则以及特殊标签的使用方法。通过本文的阅读,读者可以对XML的基本知识有一个全面的了解。 ... [详细]
  • Webmin远程命令执行漏洞复现及防护方法
    本文介绍了Webmin远程命令执行漏洞CVE-2019-15107的漏洞详情和复现方法,同时提供了防护方法。漏洞存在于Webmin的找回密码页面中,攻击者无需权限即可注入命令并执行任意系统命令。文章还提供了相关参考链接和搭建靶场的步骤。此外,还指出了参考链接中的数据包不准确的问题,并解释了漏洞触发的条件。最后,给出了防护方法以避免受到该漏洞的攻击。 ... [详细]
  • Java验证码——kaptcha的使用配置及样式
    本文介绍了如何使用kaptcha库来实现Java验证码的配置和样式设置,包括pom.xml的依赖配置和web.xml中servlet的配置。 ... [详细]
  • HDFS2.x新特性
    一、集群间数据拷贝scp实现两个远程主机之间的文件复制scp-rhello.txtroothadoop103:useratguiguhello.txt推pushscp-rr ... [详细]
  • Android系统移植与调试之如何修改Android设备状态条上音量加减键在横竖屏切换的时候的显示于隐藏
    本文介绍了如何修改Android设备状态条上音量加减键在横竖屏切换时的显示与隐藏。通过修改系统文件system_bar.xml实现了该功能,并分享了解决思路和经验。 ... [详细]
  • flowable工作流 流程变量_信也科技工作流平台的技术实践
    1背景随着公司业务发展及内部业务流程诉求的增长,目前信息化系统不能够很好满足期望,主要体现如下:目前OA流程引擎无法满足企业特定业务流程需求,且移动端体 ... [详细]
author-avatar
伤心的海2012_626
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有