热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

springboot整合rabbitmq

本人通过学习rabbitmq,自己编写的demo,希望大家提出宝贵的建议springboot整合rabbitmq

模板项目整体结构:

 

springboot整合rabbitmq

 

1、配置文件:application.yml

server:
  port: 8089
spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    #支持发布确认
    publisher-confirms: true
    
    #支持发布返回
    publisher-returns: true
    listener:
      simple:
      #采用手动应答
        acknowledge-mode: manual
        #当前监听容器数量
        concurrency: 1
        max-concurrency: 1
        #是否支持重试
        retry:

          enabled: true

 

2、RabbitMQConfig.java 文件  核心

package com.rabbitmqtemplate.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;

import com.rabbitmqtemplate.callback.MsgSendConfirmCallBack;
import com.rabbitmqtemplate.callback.MsgSendReturnCallback;
import com.rabbitmqtemplate.constants.RabbitMqExchange;
import com.rabbitmqtemplate.constants.RabbitMqQueue;
import com.rabbitmqtemplate.constants.RabbitMqRoutingKey;

/**
 * rabbitmq核心配置
 * @author lishilei
 *
 */
@Configuration
public class RabbitMQConfig {

    @Value("${spring.rabbitmq.host}")
    public String host;
    @Value("${spring.rabbitmq.port}")
    public int port;
    @Value("${spring.rabbitmq.username}")
    public String username;
    @Value("${spring.rabbitmq.password}")
    public String password;
    @Value("${spring.rabbitmq.virtual-host}")
    public String virtualHost;
    @Value("${spring.rabbitmq.publisher-confirms}")
    public Boolean  publisherConfirms;

    /**
     * 创建工厂,自动创建的ConnectionFactory不能完成事件的回调
     *
     * @return
     */
    @Bean
    public CachingConnectionFactory connectionFactory() {
        CachingConnectionFactory cOnnectionFactory= new CachingConnectionFactory(host,port);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        connectionFactory.setVirtualHost(virtualHost);
        connectionFactory.setPublisherConfirms(publisherConfirms);//手动应答模式
        return connectionFactory;
    }

    /**
     * RabbitTemplate 发送消息,必须是prototype类型,
     * 因为要设置回调类,所以应是prototype类型,如果是singleton类型,则回调类为最后一次设置
     *
     * @return
     */
    @Bean
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate template = new RabbitTemplate(connectionFactory());
        
        /**若使用confirm-callback或return-callback,
         * 必须要配置publisherConfirms或publisherReturns为true  
         * 每个rabbitTemplate只能有一个confirm-callback和return-callback
         * */  
        template.setConfirmCallback(msgSendConfirmCallBack());  
        template.setReturnCallback(msgSendReturnCallback());
        
        /**  
         * 使用return-callback时必须设置mandatory为true,
         * 或者在配置中设置mandatory-expression的值为true,
         * 可针对每次请求的消息去确定’mandatory’的boolean值,  
         * 只能在提供’return -callback’时使用,与mandatory互斥
         * */  
         template.setMandatory(true); //保证消息的可靠投递
        return template;
    }

    /**
     * 消息确认机制
     * Confirms给客户端一种轻量级的方式,能够跟踪哪些消息被broker处理,
     * 哪些可能因为broker宕掉或者网络失败的情况而重新发布。
     * 确认并且保证消息被送达,提供了两种方式:发布确认和事务。
     * (两者不可同时使用)在channel为事务时,
     * 不可引入确认模式;同样channel为确认模式下,不可使用事务。
     *
     */
    @Bean  
    public MsgSendConfirmCallBack msgSendConfirmCallBack(){  
        return new MsgSendConfirmCallBack();  
    }

    /**
     * 消息未发送到交换机调用
     * @return
     */
    @Bean
    public MsgSendReturnCallback msgSendReturnCallback(){
        return new MsgSendReturnCallback();
    }
    
    @Bean
    public Queue defaultQueue() {
        return new Queue(RabbitMqQueue.QUEUE, true); // 队列持久
    }

    /**
     * 针对消费者配置 FanoutExchange: 将消息分发到所有的绑定队列,
     * 无routingkey的概念 HeadersExchange
     * :通过添加属性key-value匹配 DirectExchange:按照routingkey分发到指定队列
     * TopicExchange:多关键字匹配
     */
    @Bean
    public DirectExchange defaultExchange() {
        return new DirectExchange(RabbitMqExchange.EXCHANGE);
    }

    /**
     * 通过绑定键将队列绑定到指定的交换机上
     * @return
     */
    @Bean
    public Binding binding() {
        return BindingBuilder.bind(defaultQueue()).to(defaultExchange()).with(RabbitMqRoutingKey.ROUTINGKEY);
    }
 

}

3、constants下文件

package com.rabbitmqtemplate.constants;

/**
 * 交换机配置
 * @author lishilei
 *
 */
public interface RabbitMqExchange {
    String EXCHANGE = "default-exchange";
}

package com.rabbitmqtemplate.constants;

/**
 * 消息队列配置
 * @author lishilei
 *
 */
public interface RabbitMqQueue {
    String QUEUE = "default-queue";
}

package com.rabbitmqtemplate.constants;

/**
 * 路由配置
 * @author lishilei
 *
 */
public interface RabbitMqRoutingKey {
    String ROUTINGKEY = "direct-routingkey";
}

 

4、callback

package com.rabbitmqtemplate.callback;

import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback;
import org.springframework.amqp.rabbit.support.CorrelationData;

import lombok.extern.slf4j.Slf4j;

/**
 * 确认消息是否成功发送给交换机
 * @author lishilei
 *
 */
@Slf4j
public class MsgSendConfirmCallBack implements ConfirmCallback {

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        log.info(" 回调消息id:" + correlationData);
                      log.debug("消息发送到exchange成功,id: {}", correlationData.getId());
            } else {
                log.debug("消息发送到exchange失败,原因: {}", cause);

               throw new RuntimeException("send error " + cause);
        }
    }
}

package com.rabbitmqtemplate.callback;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback;

import lombok.extern.slf4j.Slf4j;

/**
 * 消息回调
 *
 * @author lishilei
 *
 */
@Slf4j
public class MsgSendReturnCallback implements ReturnCallback {

    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        String correlatiOnId= message.getMessageProperties().getCorrelationIdString();
        log.info("消息: {} 发送失败,应答码: {} 原因:{} 交换机:{} 路由键:{}", correlationId, replyCode,
                replyText, exchange, routingKey);

    }
}

 

5、Receiver 消费者

package com.rabbitmqtemplate.consumer;

import java.io.ByteArrayInputStream;
import java.io.ObjectInputStream;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import com.rabbitmq.client.Channel;
import com.rabbitmqtemplate.constants.RabbitMqQueue;
import com.rabbitmqtemplate.dto.UserDTO;

import lombok.extern.slf4j.Slf4j;

/**
 * 消费者
 * @author lishilei
 *
 */
@Component
@Slf4j
public class Receiver {

    @RabbitListener(queues = RabbitMqQueue.QUEUE)
    public void process(Message message, Channel channel) throws Exception {

        /* 写入当前对象的二进制流   
        ByteArrayOutputStream bos = new ByteArrayOutputStream();  
        ObjectOutputStream oos = new ObjectOutputStream(bos);  
        oos.writeObject(this);  

                     读出二进制流产生的新对象   
        ByteArrayInputStream bis = new ByteArrayInputStream(bos.toByteArray());  
        ObjectInputStream ois = new ObjectInputStream(bis);  
        return ois.readObject(); */
        
        //获取传递的消息
        ByteArrayInputStream bi = new ByteArrayInputStream(message.getBody());
        ObjectInputStream oi = new ObjectInputStream(bi);
        UserDTO  obj = (UserDTO) oi.readObject();
        log.info("消费端接收到消息 : " + obj);
        
        // TODO 添加处理消息的业务逻辑
        
        //手动应答,告诉rabbitmq业务执行完成,消息可以丢弃
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        log.info("消息处理成功");

    }

}

 

6、Sender 生产者

package com.rabbitmqtemplate.sender;

import java.util.UUID;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import com.rabbitmqtemplate.constants.RabbitMqExchange;
import com.rabbitmqtemplate.constants.RabbitMqRoutingKey;

/**
 * 发送者
 * @author lishilei
 *
 * @param
 */
@Component
public class Sender {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * 消息发送
     * @param e
     */
    public void send(E e) {

        String uuid = UUID.randomUUID().toString();
        CorrelationData correlatiOnId= new CorrelationData(uuid);
        rabbitTemplate.convertAndSend(RabbitMqExchange.EXCHANGE, RabbitMqRoutingKey.ROUTINGKEY, e,correlationId);
    }

}

 

package com.rabbitmqtemplate.controller;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;

import com.rabbitmqtemplate.dto.UserDTO;
import com.rabbitmqtemplate.sender.Sender;

 

7、测试controller

/**
 * 测试controller
 * @Controller 和  @ResponseBody 等价 @RestController
 * @author lishilei
 *
 */
@Controller
@RequestMapping("/rabbbit")
public class SendController{  
 
    @Autowired  
    private Sender sender;  
 
    @RequestMapping("/direct")  
    @ResponseBody
    public String direct(){
        UserDTO userDTO = new UserDTO();
        userDTO.setAge(24);
        userDTO.setName("alibaba");
        sender.send(userDTO);
        return "消息处理成功";  
    }  
    
}

 

8、消息体,可自己定义

package com.rabbitmqtemplate.dto;

import java.io.Serializable;

import lombok.Data;

/**
 * 消息体
 * @author lishilei
 *
 */
@Data
public class UserDTO implements Serializable{

    /**
     *
     */
    private static final long serialVersiOnUID= 3623461501945244407L;

    private String name;
    
    private Integer age;

}

9、SpringBoot 工程启动

package com.rabbitmqtemplate;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class RabbitMQApp
{
    public static void main( String[] args )
    {
        SpringApplication.run(RabbitMQApp.class, args);
    }
}

 


 

 

 


 

 

 


 

 

 

 

 

 

 

 


推荐阅读
  • .Net下RabbitMQ发布订阅模式实践
    一、概念AMQP,即AdvancedMessageQueuingProtocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的 ... [详细]
  • 在RabbitMQ中,消息发布者默认情况下不会接收到关于消息在Broker中状态的反馈,这可能导致消息丢失的问题。为了确保消息的可靠传输与投递,可以采用确认机制(如发布确认和事务模式)来验证消息是否成功抵达Broker,并采取相应的重试策略以提高系统的可靠性。此外,还可以配置消息持久化和镜像队列等高级功能,进一步增强消息的可靠性和高可用性。 ... [详细]
  • 计算 n 叉树中各节点子树的叶节点数量分析 ... [详细]
  • 深入解析 Django 中用户模型的自定义方法与技巧 ... [详细]
  • Android 图像色彩处理技术详解
    本文详细探讨了 Android 平台上的图像色彩处理技术,重点介绍了如何通过模仿美图秀秀的交互方式,利用 SeekBar 实现对图片颜色的精细调整。文章展示了具体的布局设计和代码实现,帮助开发者更好地理解和应用图像处理技术。 ... [详细]
  • 本文详细介绍了如何在Linux系统中搭建51单片机的开发与编程环境,重点讲解了使用Makefile进行项目管理的方法。首先,文章指导读者安装SDCC(Small Device C Compiler),这是一个专为小型设备设计的C语言编译器,适合用于51单片机的开发。随后,通过具体的实例演示了如何配置Makefile文件,以实现代码的自动化编译与链接过程,从而提高开发效率。此外,还提供了常见问题的解决方案及优化建议,帮助开发者快速上手并解决实际开发中可能遇到的技术难题。 ... [详细]
  • 在 Android 开发中,通过合理利用系统通知服务,可以显著提升应用的用户交互体验。针对 Android 8.0 及以上版本,开发者需首先创建并注册通知渠道。本文将详细介绍如何在应用中实现这一功能,包括初始化通知管理器、创建通知渠道以及发送通知的具体步骤,帮助开发者更好地理解和应用这些技术细节。 ... [详细]
  • 深入解析Tomcat:开发者的实用指南
    深入解析Tomcat:开发者的实用指南 ... [详细]
  • Go语言实现Redis客户端与服务器的交互机制深入解析
    在前文对Godis v1.0版本的基础功能进行了详细介绍后,本文将重点探讨如何实现客户端与服务器之间的交互机制。通过具体代码实现,使客户端与服务器能够顺利通信,赋予项目实际运行的能力。本文将详细解析Go语言在实现这一过程中的关键技术和实现细节,帮助读者深入了解Redis客户端与服务器的交互原理。 ... [详细]
  • 本文提供了 RabbitMQ 3.7 的快速上手指南,详细介绍了环境搭建、生产者和消费者的配置与使用。通过官方教程的指引,读者可以轻松完成初步测试和实践,快速掌握 RabbitMQ 的核心功能和基本操作。 ... [详细]
  • 如何判断一个度序列能否构成简单图——哈维尔-哈基米算法的应用与解析 ... [详细]
  • 前言: 网上搭建k8s的文章很多,但很多都无法按其说明在阿里云ecs服务器成功搭建,所以我就花了些时间基于自己成功搭建k8s的步骤写了个操作手册,希望对想搭建k8s环境的盆友有所帮 ... [详细]
  • 在 CentOS 7 上部署和配置 RabbitMQ 消息队列系统时,首先需要安装 Erlang,因为 RabbitMQ 是基于 Erlang 语言开发的。具体步骤包括:安装必要的依赖项,下载 Erlang 源码包(可能需要一些时间,请耐心等待),解压源码包,解决可能出现的错误,验证安装是否成功,并将 Erlang 添加到环境变量中。接下来,下载 RabbitMQ 的 tar.xz 压缩包,并进行解压和安装。确保每一步都按顺序执行,以保证系统的稳定性和可靠性。 ... [详细]
  • 顶尖编程语言,无可匹敌的选择
    我常常在想,一个人具备怎样的素质和能力,才称得上高级工程师?估计有不少人会说,“基础过硬、熟练掌握一门编程语言、至少看过一个 ... [详细]
  • SpringCloud之Bus(消息总线)
    说明:关于SpringCloud系列的文章中的代码都在码云上面地址:https:gitee.comzh_0209_javaspringcloud-ali ... [详细]
author-avatar
袁善恩芷恩
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有