热门标签 | HotTags
当前位置:  开发笔记 > 前端 > 正文

SpringBoot整合RabbitMQ消息队列的完整步骤

这篇文章主要给大家介绍了关于SpringBoot整合RabbitMQ消息队列的相关资料,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧

SpringBoot整合RabbitMQ

主要实现RabbitMQ以下三种消息队列:

  • 简单消息队列(演示direct模式)
  • 基于RabbitMQ特性的延时消息队列
  • 基于RabbitMQ相关插件的延时消息队列

公共资源

1. 引入pom依赖

 
     org.springframework.boot
     spring-boot-starter-amqp

2. 配置yml文件

基于上篇《RabbitMQ安装与配置》实现的情况下,进行基础配置。

spring:
  rabbitmq:
    host: 121.5.168.31
    port: 5672    # 默认可省略
    virtual-host: /*** # 虚拟主机
    username: *** # 用户名
    password: *** # 用户密码
    # 开启投递成功回调 P -> Exchange
    publisher-confirm-type: correlated
    # 开启投递消息到队列失败回调 Exchange -> Queue
    publisher-returns: true
    # 开启手动ACK确认模式 Queue -> C
    listener:
      simple:
        acknowledge-mode: manual # 代表手动ACK确认
        # 一些基本参数的设置
        concurrency: 3
        prefetch: 15
        retry:
          enabled: true
          max-attempts: 5
        max-concurrency: 10

3. 公共Constants类

/**
 * @author Mr.Horse
 * @version 1.0
 * @description: {description}
 * @date 2021/4/23 15:28
 */

public class Constants {

    /**
     * 第一个配置Queue,Exchange,Key(非注解方式)
     */
    public final static String HORSE_SIMPLE_QUEUE = "HORSE_SIMPLE_QUEUE";
    public final static String HORSE_SIMPLE_EXCHANGE = "HORSE_SIMPLE_EXCHANGE";
    public final static String HORSE_SIMPLE_KEY = "HORSE_SIMPLE_KEY";

    /**
     * 第二个配置Queue,Exchange,Key(注解方式)
     */
    public final static String HORSE_ANNOTATION_QUEUE = "HORSE_ANNOTATION_QUEUE";
    public final static String HORSE_ANNOTATION_EXCHANGE = "HORSE_ANNOTATION_EXCHANGE";
    public final static String HORSE_ANNOTATION_KEY = "HORSE_ANNOTATION_KEY";


    //************************************延时消息队列配置信息**************************
    /**
     * 延时队列信息配置
     */
    public final static String HORSE_DELAY_EXCHANGE = "HORSE_DELAY_EXCHANGE";
    public final static String HORSE_DELAY_QUEUE = "HORSE_DELAY_QUEUE";
    public final static String HORSE_DELAY_KEY = "HORSE_DELAY_KEY";

    /**
     * 死信队列
     */
    public final static String HORSE_DEAD_EXCHANGE = "HORSE_DEAD_EXCHANGE";
    public final static String HORSE_DEAD_QUEUE = "HORSE_DEAD_QUEUE";
    public final static String HORSE_DEAD_KEY = "HORSE_DEAD_KEY";

    //**************************************延时消息队列配置信息(插件版)******************************
    /**
     * 新延时队列信息配置
     */
    public final static String HORSE_PLUGIN_EXCHANGE = "HORSE_PLUGIN_EXCHANGE";
    public final static String HORSE_PLUGIN_QUEUE = "HORSE_PLUGIN_QUEUE";
    public final static String HORSE_PLUGIN_KEY = "HORSE_PLUGIN_KEY";

}

简单消息队列(direct模式)

4. RabbitTemplate模板配置

主要定义消息投递Exchange成功回调函数和消息从Exchange投递到消息队列失败的回调函数。

package com.topsun.rabbit;

import com.sun.org.apache.xpath.internal.operations.Bool;
import com.topsun.constants.Constants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author Mr.Horse
 * @version 1.0
 * @description: {description}
 * @date 2021/4/23 14:17
 */
@Configuration
public class RabbitConfig {

    private static Logger logger = LoggerFactory.getLogger(RabbitConfig.class);


    @Autowired
    private CachingConnectionFactory connectionFactory;

    /**
     * @return
     */
    @Bean
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        // 触发setReturnCallback回调必须设置mandatory=true,否则Exchange没有找到Queue就会丢弃掉消息, 而不会触发回调
        rabbitTemplate.setMandatory(Boolean.TRUE);
        // 设置序列化机制
        rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
        // 消息由投递到Exchange中时触发的回调
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) ->
                logger.info("消息发送到Exchange情况反馈:唯一标识:correlatiOnData={},消息确认:ack={},原因:cause={}",
                        correlationData, ack, cause)
        );
        // 消息由Exchange发送到Queue时失败触发的回调
        rabbitTemplate.setReturnsCallback((returnedMessage) -> {
            // 如果是插件形式实现的延时队列,则直接返回
            // 原因: 因为发送方确实没有投递到队列上,只是在交换器上暂存,等过期时间到了 才会发往队列,从而实现延时队列的操作
            if (Constants.HORSE_PLUGIN_EXCHANGE.equals(returnedMessage.getExchange())) {
                return;
            }
            logger.warn("消息由Exchange发送到Queue时失败:message={},replyCode={},replyText={},exchange={},rountingKey={}",
                    returnedMessage.getMessage(), returnedMessage.getReplyText(), returnedMessage.getReplyText(),
                    returnedMessage.getExchange(), returnedMessage.getRoutingKey());
        });
        return rabbitTemplate;
    }

    //*******************************************直接配置绑定关系*****************************************
    /**
     * 声明队列
     *
     * @return
     */
    @Bean
    public Queue horseQueue() {
        return new Queue(Constants.HORSE_SIMPLE_QUEUE, Boolean.TRUE);
    }

    /**
     * 声明指定模式交换机
     *
     * @return
     */
    @Bean
    public DirectExchange horseExchange() {
        return new DirectExchange(Constants.HORSE_SIMPLE_EXCHANGE, Boolean.TRUE, Boolean.FALSE);
    }

    /**
     * 绑定交换机,队列,路由Key
     *
     * @return
     */
    @Bean
    public Binding horseBinding() {
        return BindingBuilder.bind(horseQueue()).to(horseExchange()).with(Constants.HORSE_SIMPLE_KEY);
    }

}

5. 定义消息监听器

基于 @RabbitListenerzi注解,实现自定义消息监听器。主要有两种实现方式:

  • 如果在配置类中声明了Queue、Excehange以及他们直接的绑定,这里直接指定队列进行消息监听
  • 如果前面什么也没做,这里可以直接用注解的方式进行绑定实现消息监听
package com.topsun.rabbit;

import com.rabbitmq.client.Channel;
import com.topsun.constants.Constants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

/**
 * @author Mr.Horse
 * @version 1.0
 * @description: {description}
 * @date 2021/4/23 14:58
 */

@Component
public class MsgListener {

    private static Logger logger = LoggerFactory.getLogger(MsgListener.class);

    /**
     * 配置类中已经完成绑定,这里直接根据队列值接收
     *
     * @param message
     * @param channel
     * @param msg
     */
    @RabbitListenerzi(queues = Constants.HORSE_SIMPLE_QUEUE)
    public void customListener(Message message, Channel channel, String msg) {
        // 获取每条消息唯一标识(用于手动ACK确认)
        long tag = message.getMessageProperties().getDeliveryTag();
        try {
            logger.info(" ==> customListener接收" + msg);
            // 手动ACK确认
            channel.basicAck(tag, false);
        } catch (IOException e) {
            logger.error(" ==> 消息接收失败: {}", tag);
        }
    }

    /**
     * 根据注解的形式进行绑定接收
     *
     * @param message
     * @param channel
     * @param msg
     */
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = Constants.HORSE_ANNOTATION_QUEUE, durable = "true"),
            exchange = @Exchange(value = Constants.HORSE_ANNOTATION_EXCHANGE, ignoreDeclaratiOnExceptions= "true"),
            key = {Constants.HORSE_ANNOTATION_KEY}
    ))
    public void annotationListener(Message message, Channel channel, String msg) {
        // 获取每条消息唯一标识(用于手动ACK确认)
        long tag = message.getMessageProperties().getDeliveryTag();
        try {
            logger.info(" ==> annotationListener接收" + msg);
            // 手动ACK确认
            channel.basicAck(tag, false);
        } catch (IOException e) {
            logger.error(" ==> 消息接收失败: {}", tag);
        }
    }

}


6. 测试接口

这里发送100条消息:

  • 奇数条到非注解方式的消息监听器
  • 偶数条到注解式消息监听器
@GetMapping("/rabbit")
    public void sendMsg() {
        for (int i = 1; i <= 100; i++) {
            String msg = "第" + i + "条消息";
            logger.info("==> 发送" + msg);
            if (i % 2 == 1) {
                rabbitTemplate.convertAndSend(Constants.HORSE_SIMPLE_EXCHANGE, Constants.HORSE_SIMPLE_KEY, msg, new CorrelationData(String.valueOf(i)));
            } else {
                rabbitTemplate.convertAndSend(Constants.HORSE_ANNOTATION_EXCHANGE, Constants.HORSE_ANNOTATION_KEY, msg, new CorrelationData(String.valueOf(i)));
            }
        }
    }

结果:自行测试过,非常成功:smile::smile::smile:

延时消息队列

原理:生产者生产一条延时消息,根据需要延时时间的不同,利用不同的routingkey将消息路由到不同的延时队列,每个队列都设置了不同的TTL属性,并绑定在同一个死信交换机中,消息过期后,根据routingkey的不同,又会被路由到不同的死信队列中,消费者只需要监听对应的死信队列进行处理即可。

7. 配置绑定相关信息

/**
 * @author Mr.Horse
 * @version 1.0
 * @description: {description}
 * @date 2021/4/24 14:22
 */

@Configuration
public class DelayRabbitConfig {

    private static Logger logger = LoggerFactory.getLogger(DelayRabbitConfig.class);

    /**
     * 声明延时队列交换机
     *
     * @return
     */
    @Bean
    public DirectExchange delayExchange() {
        return new DirectExchange(Constants.HORSE_DELAY_EXCHANGE, Boolean.TRUE, Boolean.FALSE);
    }

    /**
     * 声明死信队列交换机
     *
     * @return
     */
    @Bean
    public DirectExchange deadExchange() {
        return new DirectExchange(Constants.HORSE_DEAD_EXCHANGE, Boolean.TRUE, Boolean.FALSE);
    }

    /**
     * 声明延时队列 延时10s(单位:ms),并将延时队列绑定到对应的死信交换机和路由Key
     *
     * @return
     */
    @Bean
    public Queue delayQueue() {
        Map args = new HashMap<>(3);
        // x-dead-letter-exchange    这里声明当前队列绑定的死信交换机
        args.put("x-dead-letter-exchange", Constants.HORSE_DEAD_EXCHANGE);
        // x-dead-letter-routing-key  这里声明当前队列的死信路由key
        args.put("x-dead-letter-routing-key", Constants.HORSE_DEAD_KEY);
        // x-message-ttl  声明队列的TTL(过期时间)
        // 可以在这里直接写死,也可以进行动态的设置(推荐动态设置)
        // args.put("x-message-ttl", 10000);
        return QueueBuilder.durable(Constants.HORSE_DELAY_QUEUE).withArguments(args).build();
    }

    /**
     * 声明死信队列
     *
     * @return
     */
    @Bean
    public Queue deadQueue() {
        return new Queue(Constants.HORSE_DEAD_QUEUE, Boolean.TRUE);
    }


    /**
     * 延时队列绑定管理
     *
     * @return
     */
    @Bean
    public Binding delayBinding() {
        return BindingBuilder.bind(delayQueue()).to(delayExchange()).with(Constants.HORSE_DELAY_KEY);
    }

    /**
     * 死信队列绑定管理
     *
     * @return
     */
    @Bean
    public Binding deadBinding() {
        return BindingBuilder.bind(deadQueue()).to(deadExchange()).with(Constants.HORSE_DEAD_KEY);
    }

    //**********************************延时消息队列配置信息(插件版)************************************

    @Bean
    public Queue pluginQueue() {
        return new Queue(Constants.HORSE_PLUGIN_QUEUE);
    }

   /**
     * 设置延时队列的交换机,必须是 CustomExchange 类型交换机
     * 参数必须,不能改变
     * @return
     */
    @Bean
    public CustomExchange customPluginExchange() {
        Map args = new HashMap<>(2);
        args.put("x-delayed-type", "direct");
        return new CustomExchange(Constants.HORSE_PLUGIN_EXCHANGE, "x-delayed-message", Boolean.TRUE, Boolean.FALSE, args);
    }

    @Bean
    public Binding pluginBinding() {
        return BindingBuilder.bind(pluginQueue()).to(customPluginExchange()).with(Constants.HORSE_PLUGIN_KEY).noargs();
    }

}

8. 定义延时监听器

/**
 * @author Mr.Horse
 * @version 1.0
 * @description: {description}
 * @date 2021/4/24 14:51
 */
@Component
public class DelayMsgListener {

    private static Logger logger = LoggerFactory.getLogger(DelayMsgListener.class);


    /**
     * 监听死信队列
     *
     * @param message
     * @param channel
     * @param msg
     */
    @RabbitListener(queues = Constants.HORSE_DEAD_QUEUE)
    public void consumeDeadListener(Message message, Channel channel, String msg) {
        long tag = message.getMessageProperties().getDeliveryTag();
        try {
            logger.info(" ==> consumeDeadListener接收" + msg);
            // 手动ACK确认
            channel.basicAck(tag, false);
        } catch (IOException e) {
            logger.error(" ==> 消息接收失败: {}", tag);
        }
    }

    /**
     * 监听延时队列(插件版)
     *
     * @param message
     * @param channel
     * @param msg
     */
    @RabbitListener(queues = Constants.HORSE_PLUGIN_QUEUE)
    public void consumePluginListener(Message message, Channel channel, String msg) {
        long tag = message.getMessageProperties().getDeliveryTag();
        try {
            logger.info(" ==> consumePluginListener" + msg);
            // 手动ACK确认
            channel.basicAck(tag, false);
        } catch (IOException e) {
            logger.error(" ==> 消息接收失败: {}", tag);
        }
    }

}

9. 测试接口

   // 基于特性的延时队列
	@GetMapping("/delay/rabbit")
    public void delayMsg(@RequestParam("expire") Long expire) {
        for (int i = 1; i <= 10; i++) {
            String msg = "第" + i + "条消息";
            logger.info("==> 发送" + msg);
            // 这里可以动态的设置过期时间
            rabbitTemplate.convertAndSend(Constants.HORSE_DELAY_EXCHANGE, Constants.HORSE_DELAY_KEY, msg,
                    message -> {
                        message.getMessageProperties().setExpiration(String.valueOf(expire));
                        return message;
                    },
                    new CorrelationData(String.valueOf(i)));
        }
    }

	// 基于插件的延时队列
    @GetMapping("/delay/plugin")
    public void delayPluginMsg(@RequestParam("expire") Integer expire) {
        for (int i = 1; i <= 10; i++) {
            String msg = "第" + i + "条消息";
            logger.info("==> 发送" + msg);
            // 动态设置过期时间
            rabbitTemplate.convertAndSend(Constants.HORSE_PLUGIN_EXCHANGE, Constants.HORSE_PLUGIN_KEY, msg, message -> {
                message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
                message.getMessageProperties().setDelay(expire);
                return message;
            }, new CorrelationData(String.valueOf(i)));

        }
    }

结果:你懂的:scream_cat::scream_cat::scream_cat:

RabbitMQ的基础使用演示到此结束。

总结

到此这篇关于SpringBoot整合RabbitMQ消息队列的文章就介绍到这了,更多相关SpringBoot整合RabbitMQ消息队列内容请搜索以前的文章或继续浏览下面的相关文章希望大家以后多多支持!


推荐阅读
  • 本文总结了在使用Ionic 5进行Android平台APK打包时遇到的问题,特别是针对QRScanner插件的改造。通过详细分析和提供具体的解决方法,帮助开发者顺利打包并优化应用性能。 ... [详细]
  • 本文探讨了如何在编程中正确处理包含空数组的 JSON 对象,提供了详细的代码示例和解决方案。 ... [详细]
  • 本文介绍如何使用阿里云的fastjson库解析包含时间戳、IP地址和参数等信息的JSON格式文本,并进行数据处理和保存。 ... [详细]
  • 本文详细介绍了中央电视台电影频道的节目预告,并通过专业工具分析了其加载方式,确保用户能够获取最准确的电视节目信息。 ... [详细]
  • Composer Registry Manager:PHP的源切换管理工具
    本文介绍了一个用于Composer的源切换管理工具——Composer Registry Manager。该项目旨在简化Composer包源的管理和切换,避免与常见的CRM系统混淆,并提供了详细的安装和使用指南。 ... [详细]
  • 本文详细介绍了Git分布式版本控制系统中远程仓库的概念和操作方法。通过具体案例,帮助读者更好地理解和掌握如何高效管理代码库。 ... [详细]
  • 最近团队在部署DLP,作为一个技术人员对于黑盒看不到的地方还是充满了好奇心。多次咨询乙方人员DLP的算法原理是什么,他们都以商业秘密为由避而不谈,不得已只能自己查资料学习,于是有了下面的浅见。身为甲方,虽然不需要开发DLP产品,但是也有必要弄明白DLP基本的原理。俗话说工欲善其事必先利其器,只有在懂这个工具的原理之后才能更加灵活地使用这个工具,即使出现意外情况也能快速排错,越接近底层,越接近真相。根据DLP的实际用途,本文将DLP检测分为2部分,泄露关键字检测和近似重复文档检测。 ... [详细]
  • 本文介绍了如何利用npm脚本和concurrently工具,实现本地开发环境中多个监听服务的同时启动,包括HTTP服务、自动刷新、Sass和ES6支持。 ... [详细]
  • 本文探讨了在通过 API 端点调用时,使用猫鼬(Mongoose)的 findOne 方法总是返回 null 的问题,并提供了详细的解决方案和建议。 ... [详细]
  • 本文详细介绍如何在VSCode中配置自定义代码片段,使其具备与IDEA相似的代码生成快捷键功能。通过具体的Java和HTML代码片段示例,展示配置步骤及效果。 ... [详细]
  • 在网页开发中,页面加载速度是一个关键的用户体验因素。为了提升加载效率,避免在PageLoad事件中进行大量数据绑定操作,可以采用异步加载和特定控件来优化页面加载过程。 ... [详细]
  • 探讨在循环中调用$.post()时,回调函数为何会在循环结束后才开始执行,并提供解决方案和优化建议。 ... [详细]
  • 深入解析JMeter中的JSON提取器及其应用
    本文详细介绍了如何在JMeter中使用JSON提取器来获取和处理API响应中的数据。特别是在需要将一个接口返回的数据作为下一个接口的输入时,JSON提取器是一个非常有用的工具。 ... [详细]
  • 本文探讨了在 Vue 2.0 项目中使用 Axios 获取数据时可能出现的错误,并提供详细的解决方案和最佳实践。 ... [详细]
  • 探讨如何正确使用 Fetch API 进行参数传递,分析不同写法的差异及解决方案。 ... [详细]
author-avatar
mobiledu2502884243
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有