热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

springboot+rabbitmq整合示例

几个概念说明:Broker:简单来说就是消息队列服务器实体。Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。Queue:消息队列载体,每个消息都会被投入到一个或多个队列。Bindi

几个概念说明:
Broker:简单来说就是消息队列服务器实体。
Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。
Queue:消息队列载体,每个消息都会被投入到一个或多个队列。
Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来。
Routing Key:路由关键字,exchange根据这个关键字进行消息投递。
vhost:虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离。
producer:消息生产者,就是投递消息的程序。
consumer:消息消费者,就是接受消息的程序。
channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务。

交换机路由的几种类型:
Direct Exchange:直接匹配,通过Exchange名称+RountingKey来发送与接收消息.
Fanout Exchange:广播订阅,向所有的消费者发布消息,但是只有消费者将队列绑定到该路由器才能收到消息,忽略Routing Key.
Topic Exchange:主题匹配订阅,这里的主题指的是RoutingKey,RoutingKey可以采用通配符,如:*或#,RoutingKey命名采用.来分隔多个词,只有消息这将队列绑定到该路由器且指定RoutingKey符合匹配规则时才能收到消息;
Headers Exchange:消息头订阅,消息发布前,为消息定义一个或多个键值对的消息头,然后消费者接收消息同时需要定义类似的键值对请求头:(如:x-mactch=all或者x_match=any),只有请求头与消息头匹配,才能接收消息,忽略RoutingKey.
默认的exchange:如果用空字符串去声明一个exchange,那么系统就会使用”amq.direct”这个exchange,我们创建一个queue时,默认的都会有一个和新建queue同名的routingKey绑定到这个默认的exchange上去

安装Erland
http://www.erlang.org/downloads

安装RabbitMQ
https://www.rabbitmq.com/download.html

开启RabbitMQ服务
执行rabbitmq-plugins enable rabbitmq_management命令,开启Web管理插件
重启RabbitMQ服务

Web地址
http://localhost:15672/
默认用户名和密码:guest

 

一、引入springboot和rabbitmq的依赖


<dependency>
    <groupId>org.quartz-schedulergroupId>
    <artifactId>quartzartifactId>
    <version>2.3.0version>
dependency>
View Code

二、新增application.properties对rabbimq的配置信息

spring.application.name=springboot-rabbitmq
spring.rabbitmq.host=116.255.193.36
spring.rabbitmq.port=5672
spring.rabbitmq.username=scrm
spring.rabbitmq.password=scrm
spring.rabbitmq.publisher-cOnfirms=true
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.virtual-host=scrm
spring.rabbitmq.listener.simple.acknowledge-mode=manual
#最小消息监听线程数
spring.rabbitmq.listener.cOncurrency=2  
#最大消息监听线程数
spring.rabbitmq.listener.max-cOncurrency=2 
View Code

三、公共设置类

1、队列、消息交换机,路由关键字公共枚举类

package cloud.app.prod.home.rabbitmq;

/**
 * Author : YongBo Xie 
* File Name: RabbitMqEnum.java
* Created Date: 2018年3月28日 上午10:32:02
* Modified Date: 2018年3月28日 上午10:32:02
* Version: 1.0
*/ public class RabbitMqEnum { /** * describe: 定义队列名称 **/ public enum QueueName { MARKETING_ACTIVITIE_QUEUE("marketingActivitieQueue", "营销活动队列"); private String code; private String name; QueueName(String code, String name) { this.code = code; this.name = name; } public String getCode() { return code; } public String getName() { return name; } } /** * describe: 定义交换机 **/ public enum Exchange { DIRECT_EXCHANGE("directExchange", "直连交换机"), FANOUT_EXCHANGE("fanoutExchange", "扇形交换机"), TOPIC_EXCHANGE("topicExchange", "主题交换机"), HEADERS_EXCHANGE("headersExchange", "首部交换机"); private String code; private String name; Exchange(String code, String name) { this.code = code; this.name = name; } public String getCode() { return code; } public String getName() { return name; } } /** * describe: 定义routing_key **/ public enum QueueKey { MARKETING_ACTIVITIE_DIRECT("marketingActivitie", "营销活动key"), MARKETING_ACTIVITIE_TOPIC_01("*.marketingActivitie.*", "营销活动key"), MARKETING_ACTIVITIE_TOPIC_02("marketingActivitie.#", "营销活动key"); private String code; private String name; QueueKey(String code, String name) { this.code = code; this.name = name; } public String getCode() { return code; } public String getName() { return name; } } }
View Code

2、数据连接配置类

package cloud.app.prod.home.rabbitmq;

import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * Author : YongBo Xie 
* File Name: RabbitConfig.java
* Created Date: 2018年3月28日 下午6:41:17
* Modified Date: 2018年3月28日 下午6:41:17
* Version: 1.0
*/ @Configuration @ConfigurationProperties(prefix = "spring.rabbitmq") public class RabbitConfig { @Value("${spring.rabbitmq.host}") private String addresses; @Value("${spring.rabbitmq.port}") private int port; @Value("${spring.rabbitmq.username}") private String username; @Value("${spring.rabbitmq.password}") private String password; @Value("${spring.rabbitmq.publisher-confirms}") private Boolean publisherConfirms; @Value("${spring.rabbitmq.publisher-returns}") private Boolean publisherReturns; @Value("${spring.rabbitmq.virtual-host}") private String virtualHost; // 构建mq实例工厂 @Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setAddresses(addresses); connectionFactory.setPort(port); connectionFactory.setUsername(username); connectionFactory.setPassword(password); connectionFactory.setPublisherConfirms(publisherConfirms); connectionFactory.setVirtualHost(virtualHost); connectionFactory.setPublisherReturns(publisherReturns); return connectionFactory; } }
View Code

3、生产者类

package cloud.app.prod.home.rabbitmq;

import org.apache.log4j.Logger;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

/**
 * Author : YongBo Xie 
* File Name: RabbitMqSender.java
* Created Date: 2018年3月30日 上午10:48:36
* Modified Date: 2018年3月30日 上午10:48:36
* Version: 1.0
*/ @Component public class RabbitMqSender { private static Logger logger = Logger.getLogger(RabbitMqSender.class); @Bean public RabbitTemplate messageRabbitTemplate(ConnectionFactory connectionFactory) { final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); rabbitTemplate.setConfirmCallback(new ConfirmCallback() { /** * 回调 * @param correlationData 消息唯一标识 * @param ack 确认结果 * @param cause 失败原因 */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { logger.info("消息唯一标识:"+correlationData); logger.info("确认结果:"+ack); logger.info("失败原因:"+cause); } }); rabbitTemplate.setReturnCallback(new ReturnCallback() { /** * 用于实现消息发送到RabbitMQ交换器,但无相应队列与交换器绑定时的回调 */ @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { logger.info(message.getMessageProperties().getCorrelationIdString() + " 发送失败"); } }); return rabbitTemplate; } }
View Code

四、个例

1、初始化队列、消息交换机,并把队列绑定到消息交换机

package cloud.app.prod.home.rabbitmq.mem;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.HeadersExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import cloud.app.prod.home.rabbitmq.RabbitMqEnum;

/**
 * Author : YongBo Xie 
* File Name: RabbitConfig.java
* Created Date: 2018年3月27日 下午3:13:57
* Modified Date: 2018年3月27日 下午3:13:57
* Version: 1.0
*/ @Configuration public class MarketingActivitieRabbitConfig { // private static Logger logger = Logger.getLogger(MarketingActivitieRabbitConfig.class); /** * 构建队列,名称,是否持久化之类 * @return */ @Bean public Queue marketingActivitieQueue() { return new Queue(RabbitMqEnum.QueueName.MARKETING_ACTIVITIE_QUEUE.getCode(), true); } /** * 直连交换机(模式) * 用于实例间的任务分发 * 是一种带路由功能的交换机,一个队列会和一个交换机绑定,除此之外再绑定一个routing_key */ @Bean public DirectExchange createDirectExchange() { return new DirectExchange(RabbitMqEnum.Exchange.DIRECT_EXCHANGE.getCode()); } /** * 扇形交换机(模式) * 分发给所有绑定到该exchange上的队列,忽略routing key * 速度是所有的交换机类型里面最快的 */ @Bean public FanoutExchange createFanoutExchange() { return new FanoutExchange(RabbitMqEnum.Exchange.FANOUT_EXCHANGE.getCode()); } /** * 主题交换机(模式) * 通过可配置的规则分发给绑定在该exchange上的队列 * 发送到主题交换机上的消息需要携带指定规则的routing_key * 交换机和队列的binding_key需要采用*.#.*.....的格式,每个部分用.分开 * *表示一个单词 * #表示任意数量(零个或多个)单词 */ @Bean public TopicExchange createTopicExchange() { return new TopicExchange(RabbitMqEnum.Exchange.TOPIC_EXCHANGE.getCode()); } /** * 首部交换机(模式) * 适用规则复杂的分发,用headers里的参数表达规则,有点像HTTP的Headers * 绑定交换机和队列的时候,Hash结构中要求携带一个键“x-match”,这个键的Value可以是any或者all, * 这代表消息携带的Hash是需要全部匹配(all),还是仅匹配一个键(any)就可以了 */ @Bean public HeadersExchange createHeadersExchange() { return new HeadersExchange(RabbitMqEnum.Exchange.HEADERS_EXCHANGE.getCode()); } /** * 队列和直连交换机绑定 * @param queue * @param routingKey * @return */ @Bean public Binding bindingQueueWithDirectExchange() { return BindingBuilder.bind(marketingActivitieQueue()).to(createDirectExchange()) .with(RabbitMqEnum.QueueKey.MARKETING_ACTIVITIE_DIRECT.getCode()); } /** * 队列和扇形交换机绑定 * @param queue * @return */ @Bean public Binding bindingQueueWithFanoutExchange() { return BindingBuilder.bind(marketingActivitieQueue()).to(createFanoutExchange()); } /** * 队列和主题交换机绑定 * @param queue * @param routingKey * @return */ @Bean public Binding bindingQueueWithTopicExchange() { return BindingBuilder.bind(marketingActivitieQueue()).to(createTopicExchange()) .with(RabbitMqEnum.QueueKey.MARKETING_ACTIVITIE_TOPIC_01.getCode()); } /** * 队列和首部交换机绑定 * key和value匹配 * @param queue * @param key * @param value * @return */ // @Bean // public Binding bindingQueueWithHeadersExchange() { // return BindingBuilder.bind(marketingActivitieQueue()).to(createHeadersExchange()) // .where(RabbitMqEnum.QueueKey.MARKETING_ACTIVITIE_HEADERS.getCode()) // .matches(RabbitMqEnum.QueueKey.MARKETING_ACTIVITIE_HEADERS.getName()); // } /** * 队列和首部交换机绑定(x-match : all) * 完全匹配 * @param queue * @param headerValues * @return */ // @Bean // public Binding bindingQueueWithHeadersExchangeAll(Map headerValues) { // return BindingBuilder.bind(marketingActivitieQueue()).to(createHeadersExchange()).whereAll(headerValues).match(); // } /** * 队列和首部交换机绑定(x-match : all) * 任一匹配 * @param queue * @param headerValues * @return */ // @Bean // public Binding bindingQueueWithHeadersExchangeAny(Map headerValues) { // return BindingBuilder.bind(marketingActivitieQueue()).to(createHeadersExchange()).whereAny(headerValues).match(); // } }
View Code

2、生产者

package cloud.app.prod.home.rabbitmq.mem;

import org.apache.log4j.Logger;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import cloud.app.prod.home.common.FailException;
import cloud.app.prod.home.mem.vo.MarketingActivitiesVO;
import cloud.app.prod.home.rabbitmq.RabbitMqEnum;
import cloud.app.prod.home.utils.DSHUtils;

/**
 * Author : YongBo Xie 
* File Name: MarketingActivitieRabbitMqSender.java
* Created Date: 2018年3月28日 下午2:16:32
* Modified Date: 2018年3月28日 下午2:16:32
* Version: 1.0
*/ @Component public class MarketingActivitieRabbitMqSender { private static Logger logger = Logger.getLogger(MarketingActivitieRabbitMqSender.class); @Autowired private RabbitTemplate rabbitTemplate; /** * 发送消息 * rabbitTemplate.send(message); //发消息,参数类型为org.springframework.amqp.core.Message * rabbitTemplate.convertAndSend(object); //转换并发送消息。 将参数对象转换为org.springframework.amqp.core.Message后发送 * rabbitTemplate.convertSendAndReceive(message) //转换并发送消息,且等待消息者返回响应消息 * 针对业务场景选择合适的消息发送方式即可 * @param obj * @throws FailException */ public void sendRabbitmqDirect(MarketingActivitiesVO marketingActivitiesVO) throws FailException { CorrelationData correlationData = new CorrelationData(DSHUtils.generateUUID()); logger.info("send: " + correlationData.getId()); rabbitTemplate.convertAndSend(RabbitMqEnum.Exchange.DIRECT_EXCHANGE.getCode(), RabbitMqEnum.QueueKey.MARKETING_ACTIVITIE_DIRECT.getCode() , marketingActivitiesVO, correlationData); } public void sendRabbitmqDirect(String exchange, String routingKey, Object obj) throws FailException { CorrelationData correlationData = new CorrelationData(DSHUtils.generateUUID()); logger.info("send: " + correlationData.getId()); rabbitTemplate.convertAndSend(exchange, routingKey, obj); } }
View Code

3、消费者

package cloud.app.prod.home.rabbitmq.mem;

import org.apache.log4j.Logger;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.amqp.rabbit.listener.MessageListenerContainer;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

import com.rabbitmq.client.Channel;

import cloud.app.prod.home.rabbitmq.RabbitMqEnum;

/**
 * Author : YongBo Xie 
* File Name: MarketingActivitieRabbitMqReceiver.java
* Created Date: 2018年3月28日 下午3:14:58
* Modified Date: 2018年3月28日 下午3:14:58
* Version: 1.0
*/ @Component public class MarketingActivitieRabbitMqReceiver { private static Logger logger = Logger.getLogger(MarketingActivitieRabbitMqReceiver.class); @Bean public MessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(connectionFactory); container.setQueueNames(RabbitMqEnum.QueueName.MARKETING_ACTIVITIE_QUEUE.getCode()); container.setMessageListener(messageListener()); container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //设置为手动 return container; } // @RabbitListener(queues = "marketingActivitieQueue") // @RabbitHandler // public void process(String msg) { // logger.info(Thread.currentThread().getName() + " 接收到来自marketingActivitieQueue队列的消息:" + msg); // } @Bean public ChannelAwareMessageListener messageListener() { return new ChannelAwareMessageListener() { @Override public void onMessage(Message message, Channel channel) throws Exception { channel.confirmSelect();//在设置消息被消费的回调前需显示调用,否则回调函数无法调用 if (message.toString().indexOf("1") > 0){ logger.info(Thread.currentThread().getName() + " 接收到来自marketingActivitieQueue队列的消息1:" + message.toString()); channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); } if (message.toString().indexOf("2") > 0){ logger.info(Thread.currentThread().getName() + " 接收到来自marketingActivitieQueue队列的消息2:" + message.toString()); //被拒绝的是否重新入队列 //channel.basicNack 与 channel.basicReject 的区别在于basicNack可以拒绝多条消息,而basicReject一次只能拒绝一条消息 // channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true); channel.basicReject(message.getMessageProperties().getDeliveryTag(),false); } logger.info(Thread.currentThread().getName() + " 接收到来自marketingActivitieQueue队列的消息3:" + message.toString()); } }; } }
View Code

 


推荐阅读
  • 本文详细介绍了Linux系统中用于管理IPC(Inter-Process Communication)资源的两个重要命令:ipcs和ipcrm。通过这些命令,用户可以查看和删除系统中的消息队列、共享内存和信号量。 ... [详细]
  • 如何将Python与Excel高效结合:常用操作技巧解析
    本文深入探讨了如何将Python与Excel高效结合,涵盖了一系列实用的操作技巧。文章内容详尽,步骤清晰,注重细节处理,旨在帮助读者掌握Python与Excel之间的无缝对接方法,提升数据处理效率。 ... [详细]
  • 深入解析Spring Boot启动过程中Netty异步架构的工作原理与应用
    深入解析Spring Boot启动过程中Netty异步架构的工作原理与应用 ... [详细]
  • 修复一个 Bug 竟耗时两天?真的有那么复杂吗?
    修复一个 Bug 竟然耗费了两天时间?这背后究竟隐藏着怎样的复杂性?本文将深入探讨这个看似简单的 Bug 为何会如此棘手,从代码层面剖析问题根源,并分享解决过程中遇到的技术挑战和心得。 ... [详细]
  • 本文详细探讨了使用Python3编写爬虫时如何应对网站的反爬虫机制,通过实例讲解了如何模拟浏览器访问,帮助读者更好地理解和应用相关技术。 ... [详细]
  • 小程序的授权和登陆
    小程序的授权和登陆 ... [详细]
  • Cookie学习小结
    Cookie学习小结 ... [详细]
  • HTTP(HyperTextTransferProtocol)是超文本传输协议的缩写,它用于传送www方式的数据。HTTP协议采用了请求响应模型。客服端向服务器发送一 ... [详细]
  • 本文介绍如何使用 Python 的 DOM 和 SAX 方法解析 XML 文件,并通过示例展示了如何动态创建数据库表和处理大量数据的实时插入。 ... [详细]
  • 如何在PHP中获取数组中特定元素的索引位置
    在PHP中获取数组中特定元素的索引位置有多种方法。首先,可以使用 `array_search()` 函数,其语法为 `array_search(目标值, $array)`,该函数将返回匹配元素的第一个键名(即下标)。其次,也可以利用 `array_keys()` 函数,通过 `array_keys($array, 目标值)` 语法来获取所有匹配元素的键名列表。这两种方法都能有效解决数组元素定位的问题,具体选择取决于实际需求和性能考虑。 ... [详细]
  • PTArchiver工作原理详解与应用分析
    PTArchiver工作原理及其应用分析本文详细解析了PTArchiver的工作机制,探讨了其在数据归档和管理中的应用。PTArchiver通过高效的压缩算法和灵活的存储策略,实现了对大规模数据的高效管理和长期保存。文章还介绍了其在企业级数据备份、历史数据迁移等场景中的实际应用案例,为用户提供了实用的操作建议和技术支持。 ... [详细]
  • Python多线程编程技巧与实战应用详解 ... [详细]
  • 深入解析Android 4.4中的Fence机制及其应用
    在Android 4.4中,Fence机制是处理缓冲区交换和同步问题的关键技术。该机制广泛应用于生产者-消费者模式中,确保了不同组件之间高效、安全的数据传输。通过深入解析Fence机制的工作原理和应用场景,本文探讨了其在系统性能优化和资源管理中的重要作用。 ... [详细]
  • 深入解析CAS机制:全面替代传统锁的底层原理与应用
    本文深入探讨了CAS(Compare-and-Swap)机制,分析了其作为传统锁的替代方案在并发控制中的优势与原理。CAS通过原子操作确保数据的一致性,避免了传统锁带来的性能瓶颈和死锁问题。文章详细解析了CAS的工作机制,并结合实际应用场景,展示了其在高并发环境下的高效性和可靠性。 ... [详细]
  • 在Android开发中,BroadcastReceiver(广播接收器)是一个重要的组件,广泛应用于多种场景。本文将深入解析BroadcastReceiver的工作原理、应用场景及其具体实现方法,帮助开发者更好地理解和使用这一组件。通过实例分析,文章详细探讨了静态广播的注册方式、生命周期管理以及常见问题的解决策略,为开发者提供全面的技术指导。 ... [详细]
author-avatar
vicj_iao
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有