热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

RabbitMQ在SpringBoot中的使用

SpringBoot应用可以完成自动配置及依赖注入——可以通过Spring直接提供与MQ的连接对象 6.1消息生产者创建SpringBoot应用,添加依赖配置application

SpringBoot应用可以完成自动配置及依赖注入——可以通过Spring直接提供与MQ的连接对象

 

6.1 消息生产者

  • 创建SpringBoot应用,添加依赖

  • RabbitMQ在SpringBoot中的使用
  • 配置application.yml

  

server:
  port: 9001
spring:
  application:
    name: producer
  rabbitmq:
    host: 47.96.11.185
    port: 5672
    virtual-host: host1
    username: ytao
    password: admin123

 

 

发送消息

@Service
public class TestService {

    @Resource
    private AmqpTemplate amqpTemplate;

    public void sendMsg(String msg){

        //1. 发送消息到队列
        amqpTemplate.convertAndSend("queue1",msg);

        //2. 发送消息到交换机(订阅交换机)
        amqpTemplate.convertAndSend("ex1","",msg);

        //3. 发送消息到交换机(路由交换机)
        amqpTemplate.convertAndSend("ex2","a",msg);
        
    }

}

 

 

6.2 消息消费者

  • 创建项目添加依赖

  • 配置yml

  • 接收消息

@Service
//@RabbitListener(queues = {"queue1","queue2"})
@RabbitListener(queues = "queue1")
public class ReceiveMsgService {

    @RabbitHandler
    public void receiveMsg(String msg){
        System.out.println("接收MSG:"+msg);
    }

    //@RabbitHandler
    //public void receiveMsg(byte[] bs){
    //
    //}

}

 

 

 

二、使用RabbitMQ传递对象

RabbitMQ是消息队列,发送和接收的都是字符串/字节数组类型的消息

2.1 使用序列化对象

  

  要求:

  •   传递的对象实现序列化接口

  • 传递的对象的包名、类名、属性名必须一致

 

 

消息提供者

@Service
public class MQService {

    @Resource
    private AmqpTemplate amqpTemplate;

    public void sendGoodsToMq(Goods goods){
        //消息队列可以发送 字符串、字节数组、序列化对象
        amqpTemplate.convertAndSend("","queue1",goods);
    }

}

 

消息消费者

@Component
@RabbitListener(queues = "queue1")
public class ReceiveService {

    @RabbitHandler
    public void receiveMsg(Goods goods){
        System.out.println("Goods---"+goods);
    }

}

2.2 使用序列化字节数组

要求:

  • 传递的对象实现序列化接口

  • 传递的对象的包名、类名、属性名必须一致

 

 

消息提供者

@Service
public class MQService {

    @Resource
    private AmqpTemplate amqpTemplate;

    public void sendGoodsToMq(Goods goods){
        //消息队列可以发送 字符串、字节数组、序列化对象
        byte[] bytes = SerializationUtils.serialize(goods);
        amqpTemplate.convertAndSend("","queue1",bytes);
    }

}

 

消息消费者

@Component
@RabbitListener(queues = "queue1")
public class ReceiveService {

    @RabbitHandler
    public void receiveMsg(byte[] bs){
        Goods goods = (Goods) SerializationUtils.deserialize(bs);
        System.out.println("byte[]---"+goods);
    }

}

 

2.3 使用JSON字符串传递

要求:对象的属性名一直

  • 消息提供者

@Service
public class MQService {

    @Resource
    private AmqpTemplate amqpTemplate;

    public void sendGoodsToMq(Goods goods) throws JsonProcessingException {
        //消息队列可以发送 字符串、字节数组、序列化对象
        ObjectMapper objectMapper = new ObjectMapper();
        String msg = objectMapper.writeValueAsString(goods);
        amqpTemplate.convertAndSend("","queue1",msg);
    }

}

 

消息消费者

@Component
@RabbitListener(queues = "queue1")
public class ReceiveService {

    @RabbitHandler
    public void receiveMsg(String msg) throws JsonProcessingException {
        ObjectMapper objectMapper = new ObjectMapper();
        Goods goods = objectMapper.readValue(msg,Goods.class);
        System.out.println("String---"+msg);
    }
}

 

三、基于Java的交换机与队列创建

我们使用消息队列,消息队列和交换机可以通过管理系统完成创建,也可以在应用程序中通过Java代码来完成创建

3.1 普通Maven项目交换机及队列创建

  • 使用Java代码新建队列

  • //1.定义队列 (使用Java代码在MQ中新建一个队列)
    //参数1:定义的队列名称
    //参数2:队列中的数据是否持久化(如果选择了持久化)
    //参数3: 是否排外(当前队列是否为当前连接私有)
    //参数4:自动删除(当此队列的连接数为0时,此队列会销毁(无论队列中是否还有数据))
    //参数5:设置当前队列的参数
    channel.queueDeclare("queue7",false,false,false,null);

     

  • 新建交换机
  • //定义一个“订阅交换机”
    channel.exchangeDeclare("ex3", BuiltinExchangeType.FANOUT);
    //定义一个“路由交换机”
    channel.exchangeDeclare("ex4", BuiltinExchangeType.DIRECT);

     

  • 绑定队列到交换机
  • //绑定队列
    //参数1:队列名称
    //参数2:目标交换机
    //参数3:如果绑定订阅交换机参数为"",如果绑定路由交换机则表示设置队列的key
    channel.queueBind("queue7","ex4","k1");
    channel.queueBind("queue8","ex4","k2");

     

 

3.2 SpringBoot应用中通过配置完成队列的创建

@Configuration
public class RabbitMQConfiguration {

    //声明队列
    @Bean
    public Queue queue9(){
        Queue queue9 = new Queue("queue9");
        //设置队列属性
        return queue9;
    }
    @Bean
    public Queue queue10(){
        Queue queue10 = new Queue("queue10");
        //设置队列属性
        return queue10;
    }

    //声明订阅模式交换机
    @Bean
    public FanoutExchange ex5(){
        return new FanoutExchange("ex5");
    }

    //声明路由模式交换机
    @Bean
    public DirectExchange ex6(){
        return new DirectExchange("ex6");
    }

    //绑定队列
    @Bean
    public Binding bindingQueue9(Queue queue9, DirectExchange ex6){
        return BindingBuilder.bind(queue9).to(ex6).with("k1");
    }
    @Bean
    public Binding bindingQueue10(Queue queue10, DirectExchange ex6){
        return BindingBuilder.bind(queue10).to(ex6).with("k2");
    }
}

 

 

四、消息的可靠性

4.1 RabbitMQ事务

当在消息发送过程中添加了事务,处理效率降低几十倍甚至上百倍

channel.txSelect();  //开启事务
try{
    channel.basicPublish("ex4", "k1", null, msg.getBytes());
    System.out.println("发送:" + msg);
    channel.txCommit(); //提交事务
}catch (Exception e){
    channel.txRollback(); //事务回滚
}

 

4.2 RabbitMQ消息确认和return机制

RabbitMQ在SpringBoot中的使用

 

 

消息确认机制:确认消息提供者是否成功发送消息到交换机

return机制:确认消息是否成功的从交换机分发到队列

 

 

 

4.2.1 普通Maven项目的消息确认
  • 普通confirm方式

  • //1.发送消息之前开启消息确认
    channel.confirmSelect();
    
    channel.basicPublish("ex1", "a", null, msg.getBytes());
    
    //2.接收消息确认
    boolean b = channel.waitForConfirms(); 
    
    System.out.println("发送:" +(b?"成功":"失败"));

     

  • 批量confirm方式
  • //1.发送消息之前开启消息确认
    channel.confirmSelect();
    
    //2.批量发送消息
    for (int i=0 ; i<10 ; i++){
        channel.basicPublish("ex1", "a", null, msg.getBytes());
    }
    
    //3.接收批量消息确认:发送的所有消息中,如果有一条是失败的,则所有消息发送直接失败,抛出IO异常
    boolean b = channel.waitForConfirms(); 

     

  • 异步confirm方式
  • //发送消息之前开启消息确认
    channel.confirmSelect();
    
    //批量发送消息
    for (int i=0 ; i<10 ; i++){
        channel.basicPublish("ex1", "a", null, msg.getBytes());
    }
    
    //假如发送消息需要10s,waitForConfirms会进入阻塞状态
    //boolean b = channel.waitForConfirms();
    
    //使用监听器异步confirm
    channel.addConfirmListener(new ConfirmListener() {
        //参数1: long l  返回消息的表示
        //参数2: boolean b 是否为批量confirm
        public void handleAck(long l, boolean b) throws IOException {
            System.out.println("~~~~~消息成功发送到交换机");
        }
        public void handleNack(long l, boolean b) throws IOException {
            System.out.println("~~~~~消息发送到交换机失败");
        }
    });

     

  

 

4.2.2 普通Maven项目的return机制
  • 添加return监听器

  • 发送消息是指定第三个参数为true

  • 由于监听器监听是异步处理,所以在消息发送之后不能关闭channel

String msg = "Hello HuangDaoJun!";
Connection connection = ConnectionUtil.getConnection();     //相当于JDBC操作的数据库连接
Channel channel = connection.createChannel();               //相当于JDBC操作的statement

//return机制:监控交换机是否将消息分发到队列
channel.addReturnListener(new ReturnListener() {
    public void handleReturn(int i, String s, String s1, String s2,AMQP.BasicProperties basicProperties,byte[] bytes) throws IOException {
        //如果交换机分发消息到队列失败,则会执行此方法(用来处理交换机分发消息到队列失败的情况)
        System.out.println("*****"+i);  //标识
        System.out.println("*****"+s);  //
        System.out.println("*****"+s1); //交换机名
        System.out.println("*****"+s2); //交换机对应的队列的key
        System.out.println("*****"+new String(bytes));  //发送的消息
    }
});

//发送消息
//channel.basicPublish("ex2", "c", null, msg.getBytes());
channel.basicPublish("ex2", "c", true, null, msg.getBytes());

 

 

4.3 在SpringBoot应用实现消息确认与return监听

4.3.1 配置application.yml,开启消息确认和return监听
spring:
  rabbitmq:
    publisher-confirm-type: simple  ## 开启消息确认模式
    publisher-returns: true        ##使用return监听机制

 

4.3.2 创建confirm和return监听

@Component
public class MsgConfirmAndReturn implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {

    Logger logger = LoggerFactory.getLogger(MsgConfirmAndReturn.class);

    @Resource
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init(){
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnCallback(this);
    }

    @Override
    public void confirm(CorrelationData correlationData, boolean b, String s) {
        //此方法用于监听消息确认结果(消息是否发送到交换机)
        if(b){
            logger.info("-------消息成功发送到交换机");
        }else{
            logger.warn("-------消息发送到交换机失败");
        }
    }

    @Override
    public void returnedMessage(Message message, int i, String s, String s1, String s2) {
        //此方法用于return监听(当交换机分发消息到队列失败时执行)
        logger.warn("~~~~~~~交换机分发消息到队列失败");
    }
}

 

 

 

 

 

五、延迟机制

5.1 延迟队列

  • 延迟队列——消息进入到队列之后,延迟指定的时间才能被消费者消费

  • AMQP协议和RabbitMQ队列本身是不支持延迟队列功能的,但是可以通过TTL(Time To Live)特性模拟延迟队列的功能

  • TTL就是消息的存活时间。RabbitMQ可以分别对队列和消息设置存活时间

 

RabbitMQ在SpringBoot中的使用

 

 

  • 在创建队列的时候可以设置队列的存活时间,当消息进入到队列并且在存活时间内没有消费者消费,则此消息就会从当前队列被移除;

  • 创建消息队列没有设置TTL,但是消息设置了TTL,那么当消息的存活时间结束,也会被移除;

  • 当TTL结束之后,我们可以指定将当前队列的消息转存到其他指定的队列

 

5.2 使用延迟队列实现订单支付监控

5.2.1 实现流程图

 

RabbitMQ在SpringBoot中的使用

 

 

 

 

 

5.2.2 创建交换机和队列

RabbitMQ在SpringBoot中的使用

 

 

2.创建消息队列

 

RabbitMQ在SpringBoot中的使用

 

 

 

3.创建死信队列

RabbitMQ在SpringBoot中的使用

 

 

 

4.队列绑定

 

RabbitMQ在SpringBoot中的使用


推荐阅读
  • 本文作为“实现简易版Spring系列”的第五篇,继前文深入探讨了Spring框架的核心技术之一——控制反转(IoC)之后,将重点转向另一个关键技术——面向切面编程(AOP)。对于使用Spring框架进行开发的开发者来说,AOP是一个不可或缺的概念。了解AOP的背景及其基本原理,对于掌握这一技术至关重要。本文将通过具体示例,详细解析AOP的实现机制,帮助读者更好地理解和应用这一技术。 ... [详细]
  • Ceph API微服务实现RBD块设备的高效创建与安全删除
    本文旨在实现Ceph块存储中RBD块设备的高效创建与安全删除功能。开发环境为CentOS 7,使用 IntelliJ IDEA 进行开发。首先介绍了 librbd 的基本概念及其在 Ceph 中的作用,随后详细描述了项目 Gradle 配置的优化过程,确保了开发环境的稳定性和兼容性。通过这一系列步骤,我们成功实现了 RBD 块设备的快速创建与安全删除,提升了系统的整体性能和可靠性。 ... [详细]
  • 本文详细介绍了如何在Linux系统中搭建51单片机的开发与编程环境,重点讲解了使用Makefile进行项目管理的方法。首先,文章指导读者安装SDCC(Small Device C Compiler),这是一个专为小型设备设计的C语言编译器,适合用于51单片机的开发。随后,通过具体的实例演示了如何配置Makefile文件,以实现代码的自动化编译与链接过程,从而提高开发效率。此外,还提供了常见问题的解决方案及优化建议,帮助开发者快速上手并解决实际开发中可能遇到的技术难题。 ... [详细]
  • Spring Batch 异常处理与任务限制优化策略 ... [详细]
  • PHP中元素的计量单位是什么? ... [详细]
  • Spring Boot 实战(一):基础的CRUD操作详解
    在《Spring Boot 实战(一)》中,详细介绍了基础的CRUD操作,涵盖创建、读取、更新和删除等核心功能,适合初学者快速掌握Spring Boot框架的应用开发技巧。 ... [详细]
  • 深入解析JWT的实现与应用
    本文深入探讨了JSON Web Token (JWT) 的实现机制及其应用场景。JWT 是一种基于 RFC 7519 标准的开放性认证协议,用于在各方之间安全地传输信息。文章详细分析了 JWT 的结构、生成和验证过程,并讨论了其在现代 Web 应用中的实际应用案例,为开发者提供了全面的理解和实践指导。 ... [详细]
  • 在 Android 开发中,通过合理利用系统通知服务,可以显著提升应用的用户交互体验。针对 Android 8.0 及以上版本,开发者需首先创建并注册通知渠道。本文将详细介绍如何在应用中实现这一功能,包括初始化通知管理器、创建通知渠道以及发送通知的具体步骤,帮助开发者更好地理解和应用这些技术细节。 ... [详细]
  • 当前,众多初创企业对全栈工程师的需求日益增长,但市场中却存在大量所谓的“伪全栈工程师”,尤其是那些仅掌握了Node.js技能的前端开发人员。本文旨在深入探讨全栈工程师在现代技术生态中的真实角色与价值,澄清对这一角色的误解,并强调真正的全栈工程师应具备全面的技术栈和综合解决问题的能力。 ... [详细]
  • 全面解析Java虚拟机:内存模型深度剖析 ... [详细]
  • MySQL性能优化与调参指南【数据库管理】
    本文详细探讨了MySQL数据库的性能优化与参数调整技巧,旨在帮助数据库管理员和开发人员提升系统的运行效率。内容涵盖索引优化、查询优化、配置参数调整等方面,结合实际案例进行深入分析,提供实用的操作建议。此外,还介绍了常见的性能监控工具和方法,助力读者全面掌握MySQL性能优化的核心技能。 ... [详细]
  • Go语言实现Redis客户端与服务器的交互机制深入解析
    在前文对Godis v1.0版本的基础功能进行了详细介绍后,本文将重点探讨如何实现客户端与服务器之间的交互机制。通过具体代码实现,使客户端与服务器能够顺利通信,赋予项目实际运行的能力。本文将详细解析Go语言在实现这一过程中的关键技术和实现细节,帮助读者深入了解Redis客户端与服务器的交互原理。 ... [详细]
  • JVM参数设置与命令行工具详解
    JVM参数配置与命令行工具的深入解析旨在优化系统性能,通过合理设置JVM参数,确保在高吞吐量的前提下,有效减少垃圾回收(GC)的频率,进而降低系统停顿时间,提升服务的稳定性和响应速度。此外,本文还将详细介绍常用的JVM命令行工具,帮助开发者更好地监控和调优JVM运行状态。 ... [详细]
  • 在Hive中合理配置Map和Reduce任务的数量对于优化不同场景下的性能至关重要。本文探讨了如何控制Hive任务中的Map数量,分析了当输入数据超过128MB时是否会自动拆分,以及Map数量是否越多越好的问题。通过实际案例和实验数据,本文提供了具体的配置建议,帮助用户在不同场景下实现最佳性能。 ... [详细]
  • 在Spring框架中,基于Schema的异常通知与环绕通知的实现方法具有重要的实践价值。首先,对于异常通知,需要创建一个实现ThrowsAdvice接口的通知类。尽管ThrowsAdvice接口本身不包含任何方法,但开发者需自定义方法来处理异常情况。此外,环绕通知则通过实现MethodInterceptor接口来实现,允许在方法调用前后执行特定逻辑,从而增强功能或进行必要的控制。这两种通知机制的结合使用,能够有效提升应用程序的健壮性和灵活性。 ... [详细]
author-avatar
cfncjl_130
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有