热门标签 | HotTags
当前位置:  开发笔记 > 运维 > 正文

SpringBoot入门之消息中间件的使用

一、前言 在消息中间件中有 2 个重要的概念:消息代理和目的地。当消息发送者发送消息后,消息就被消息代理接管,消息代理保证消息传递到指定目

一、前言

在消息中间件中有 2 个重要的概念:消息代理和目的地。当消息发送者发送消息后,消息就被消息代理接管,消息代理保证消息传递到指定目的地。

我们常用的消息代理有 JMS 和 AMQP 规范。对应地,它们常见的实现分别是 ActiveMQ 和 RabbitMQ。

二、整合 ActiveMQ

2.1 添加依赖


  org.springframework.boot
  spring-boot-starter-activemq


 
  org.apache.activemq 
  activemq-pool 

2.2 添加配置

# activemq 配置
spring.activemq.broker-url=tcp://192.168.2.12:61616
spring.activemq.user=admin
spring.activemq.password=admin
spring.activemq.pool.enabled=false
spring.activemq.pool.max-cOnnections=50
# 使用发布/订阅模式时,下边配置需要设置成 true
spring.jms.pub-sub-domain=false

此处 spring.activemq.pool.enabled=false,表示关闭连接池。

2.3 编码

配置类:

@Configuration
public class JmsConfirguration {
  public static final String QUEUE_NAME = "activemq_queue";
  
  public static final String TOPIC_NAME = "activemq_topic";
  
  @Bean
  public Queue queue() {
    return new ActiveMQQueue(QUEUE_NAME);
  }
  
  @Bean
  public Topic topic() {
    return new ActiveMQTopic(TOPIC_NAME);
  }
}

负责创建队列和主题。

消息生产者:

@Component
public class JmsSender {
  @Autowired
  private Queue queue;
  
  @Autowired
  private Topic topic;
  
  @Autowired
  private JmsMessagingTemplate jmsTemplate;
  
  public void sendByQueue(String message) {
    this.jmsTemplate.convertAndSend(queue, message);
  }
  
  public void sendByTopic(String message) {
    this.jmsTemplate.convertAndSend(topic, message);
  }
}

消息消费者:

@Component
public class JmsReceiver {
  
  @JmsListener(destination = JmsConfirguration.QUEUE_NAME)
  public void receiveByQueue(String message) {
    System.out.println("接收队列消息:" + message);
  }
  
  @JmsListener(destination = JmsConfirguration.TOPIC_NAME)
  public void receiveByTopic(String message) {
    System.out.println("接收主题消息:" + message);
  }
}

消息消费者使用 @JmsListener 注解监听消息。

2.4 测试

@RunWith(SpringRunner.class)
@SpringBootTest
public class JmsTest {
  @Autowired
  private JmsSender sender;
  @Test
  public void testSendByQueue() {
    for (int i = 1; i <6; i++) {
      this.sender.sendByQueue("hello activemq queue " + i);
    }
  }
  
  @Test
  public void testSendByTopic() {
    for (int i = 1; i <6; i++) {
      this.sender.sendByTopic("hello activemq topic " + i);
    }
  }
}

打印结果:

接收队列消息:hello activemq queue 1
接收队列消息:hello activemq queue 2
接收队列消息:hello activemq queue 3
接收队列消息:hello activemq queue 4
接收队列消息:hello activemq queue 5

测试发布/订阅模式时,设置 spring.jms.pub-sub-domain=true

接收主题消息:hello activemq topic 1
接收主题消息:hello activemq topic 2
接收主题消息:hello activemq topic 3
接收主题消息:hello activemq topic 4
接收主题消息:hello activemq topic 5

三、整合 RabbitMQ

3.1 添加依赖


  org.springframework.boot
  spring-boot-starter-amqp

3.2 添加配置

spring.rabbitmq.host=192.168.2.30
spring.rabbitmq.port=5672
spring.rabbitmq.username=light
spring.rabbitmq.password=light
spring.rabbitmq.virtual-host=/test

3.3 编码

配置类:

@Configuration
public class AmqpConfirguration {
  //=============简单、工作队列模式===============
  
  public static final String SIMPLE_QUEUE = "simple_queue";
  @Bean
  public Queue queue() {
    return new Queue(SIMPLE_QUEUE, true);
  }
  
  //===============发布/订阅模式============
  
  public static final String PS_QUEUE_1 = "ps_queue_1";
  public static final String PS_QUEUE_2 = "ps_queue_2";
  public static final String FANOUT_EXCHANGE = "fanout_exchange";
  
  @Bean
  public Queue psQueue1() {
    return new Queue(PS_QUEUE_1, true);
  }
  
  @Bean
  public Queue psQueue2() {
    return new Queue(PS_QUEUE_2, true);
  }
  
  @Bean
  public FanoutExchange fanoutExchange() {
    return new FanoutExchange(FANOUT_EXCHANGE);
  }
  
  @Bean
  public Binding fanoutBinding1() {
    return BindingBuilder.bind(psQueue1()).to(fanoutExchange());
  }
  
  @Bean
  public Binding fanoutBinding2() {
    return BindingBuilder.bind(psQueue2()).to(fanoutExchange());
  }
  //===============路由模式============
  
  public static final String ROUTING_QUEUE_1 = "routing_queue_1";
  public static final String ROUTING_QUEUE_2 = "routing_queue_2";
  public static final String DIRECT_EXCHANGE = "direct_exchange";
  
  @Bean
  public Queue routingQueue1() {
    return new Queue(ROUTING_QUEUE_1, true);
  }
  
  @Bean
  public Queue routingQueue2() {
    return new Queue(ROUTING_QUEUE_2, true);
  }
  
  @Bean
  public DirectExchange directExchange() {
    return new DirectExchange(DIRECT_EXCHANGE);
  }
  
  @Bean
  public Binding directBinding1() {
    return BindingBuilder.bind(routingQueue1()).to(directExchange()).with("user");
  }
  
  @Bean
  public Binding directBinding2() {
    return BindingBuilder.bind(routingQueue2()).to(directExchange()).with("order");
  }
  
  //===============主题模式============
  
  public static final String TOPIC_QUEUE_1 = "topic_queue_1";
  public static final String TOPIC_QUEUE_2 = "topic_queue_2";
  public static final String TOPIC_EXCHANGE = "topic_exchange";
  
  @Bean
  public Queue topicQueue1() {
    return new Queue(TOPIC_QUEUE_1, true);
  }
  
  @Bean
  public Queue topicQueue2() {
    return new Queue(TOPIC_QUEUE_2, true);
  }
  
  @Bean
  public TopicExchange topicExchange() {
    return new TopicExchange(TOPIC_EXCHANGE);
  }
  
  @Bean
  public Binding topicBinding1() {
    return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with("user.add");
  }
  
  @Bean
  public Binding topicBinding2() {
    return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("user.#");
  }  
}

RabbitMQ 有多种工作模式,因此配置比较多。想了解相关内容的读者可以查看《RabbitMQ 工作模式介绍》或者自行百度相关资料。

消息生产者:

@Component
public class AmqpSender {
  @Autowired
  private AmqpTemplate amqpTemplate;
  /**
   * 简单模式发送
   * 
   * @param message
   */
  public void simpleSend(String message) {
    this.amqpTemplate.convertAndSend(AmqpConfirguration.SIMPLE_QUEUE, message);
  }
  /**
   * 发布/订阅模式发送
   * 
   * @param message
   */
  public void psSend(String message) {
    this.amqpTemplate.convertAndSend(AmqpConfirguration.FANOUT_EXCHANGE, "", message);
  }
  /**
   * 路由模式发送
   * 
   * @param message
   */
  public void routingSend(String routingKey, String message) {
    this.amqpTemplate.convertAndSend(AmqpConfirguration.DIRECT_EXCHANGE, routingKey, message);
  }
  /**
   * 主题模式发送
   * 
   * @param routingKey
   * @param message
   */
  public void topicSend(String routingKey, String message) {
    this.amqpTemplate.convertAndSend(AmqpConfirguration.TOPIC_EXCHANGE, routingKey, message);
  }
}

消息消费者:

@Component
public class AmqpReceiver {
  /**
   * 简单模式接收
   * 
   * @param message
   */
  @RabbitListener(queues = AmqpConfirguration.SIMPLE_QUEUE)
  public void simpleReceive(String message) {
    System.out.println("接收消息:" + message);
  }
  /**
   * 发布/订阅模式接收
   * 
   * @param message
   */
  @RabbitListener(queues = AmqpConfirguration.PS_QUEUE_1)
  public void psReceive1(String message) {
    System.out.println(AmqpConfirguration.PS_QUEUE_1 + "接收消息:" + message);
  }
  @RabbitListener(queues = AmqpConfirguration.PS_QUEUE_2)
  public void psReceive2(String message) {
    System.out.println(AmqpConfirguration.PS_QUEUE_2 + "接收消息:" + message);
  }
  /**
   * 路由模式接收
   * 
   * @param message
   */
  @RabbitListener(queues = AmqpConfirguration.ROUTING_QUEUE_1)
  public void routingReceive1(String message) {
    System.out.println(AmqpConfirguration.ROUTING_QUEUE_1 + "接收消息:" + message);
  }
  @RabbitListener(queues = AmqpConfirguration.ROUTING_QUEUE_2)
  public void routingReceive2(String message) {
    System.out.println(AmqpConfirguration.ROUTING_QUEUE_2 + "接收消息:" + message);
  }
  /**
   * 主题模式接收
   * 
   * @param message
   */
  @RabbitListener(queues = AmqpConfirguration.TOPIC_QUEUE_1)
  public void topicReceive1(String message) {
    System.out.println(AmqpConfirguration.TOPIC_QUEUE_1 + "接收消息:" + message);
  }
  
  @RabbitListener(queues = AmqpConfirguration.TOPIC_QUEUE_2)
  public void topicReceive2(String message) {
    System.out.println(AmqpConfirguration.TOPIC_QUEUE_2 + "接收消息:" + message);
  }
}

消息消费者使用 @RabbitListener 注解监听消息。

3.4 测试

@RunWith(SpringRunner.class)
@SpringBootTest
public class AmqpTest {
  @Autowired
  private AmqpSender sender;
  @Test
  public void testSimpleSend() {
    for (int i = 1; i <6; i++) {
      this.sender.simpleSend("test simpleSend " + i);
    }
  }
  @Test
  public void testPsSend() {
    for (int i = 1; i <6; i++) {
      this.sender.psSend("test psSend " + i);
    }
  }
  
  @Test
  public void testRoutingSend() {
    for (int i = 1; i <6; i++) {
      this.sender.routingSend("order", "test routingSend " + i);
    }
  }
  
  @Test
  public void testTopicSend() {
    for (int i = 1; i <6; i++) {
      this.sender.topicSend("user.add", "test topicSend " + i);
    }
  }
}

测试结果略过。。。

踩坑提醒1:ACCESS_REFUSED – Login was refused using authentication mechanism PLAIN

解决方案:

1) 请确保用户名和密码是否正确,需要注意的是用户名和密码的值是否包含空格或制表符(笔者测试时就是因为密码多了一个制表符导致认证失败)。

2) 如果测试账户使用的是 guest,需要修改 rabbitmq.conf 文件。在该文件中添加 “loopback_users = none” 配置。

踩坑提醒2:Cannot prepare queue for listener. Either the queue doesn't exist or the broker will not allow us to use it

解决方案:

我们可以登陆 RabbitMQ 的管理界面,在 Queue 选项中手动添加对应的队列。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。


推荐阅读
  • 如何提高PHP编程技能及推荐高级教程
    本文介绍了如何提高PHP编程技能的方法,推荐了一些高级教程。学习任何一种编程语言都需要长期的坚持和不懈的努力,本文提醒读者要有足够的耐心和时间投入。通过实践操作学习,可以更好地理解和掌握PHP语言的特异性,特别是单引号和双引号的用法。同时,本文也指出了只走马观花看整体而不深入学习的学习方式无法真正掌握这门语言,建议读者要从整体来考虑局部,培养大局观。最后,本文提醒读者完成一个像模像样的网站需要付出更多的努力和实践。 ... [详细]
  • 一、Hadoop来历Hadoop的思想来源于Google在做搜索引擎的时候出现一个很大的问题就是这么多网页我如何才能以最快的速度来搜索到,由于这个问题Google发明 ... [详细]
  • 缤果串口网络蓝牙调试助手的特点和下载链接
    本文介绍了缤果串口网络蓝牙调试助手的主要特点,包括支持常用的波特率、校验、数据位和停止位设置,以及以ASCII码或十六进制接收或发送数据或字符的功能。该助手还能任意设定自动发送周期,并能将接收数据保存成文本文件。同时,该软件支持网络UDP/TCP和蓝牙功能。最后,提供了腾讯微云和百度网盘的下载链接。 ... [详细]
  • 本文介绍了C++中的引用运算符及其应用。引用运算符是一种将变量定义为另一个变量的引用变量的方式,在改变其中一个变量时,两者均会同步变化。引用变量来源于数学,在计算机语言中用于储存计算结果或表示值抽象概念。变量可以通过变量名访问,在指令式语言中引用变量通常是可变的,但在纯函数式语言中可能是不可变的。本文还介绍了引用变量的示例及验证,以及引用变量在函数形参中的应用。当定义的函数使用引用型形参时,函数调用时形参的改变会同时带来实参的改变。 ... [详细]
  • 本文讨论了如何查看js的一些方法的官方文档,作者提到了在实现打印功能时遇到了困惑,不知道如何查看方法。虽然百度有时可以得到答案,但作者想要知道官方文档的用法,因为有时候百度并不能满足自己的需求。 ... [详细]
  • SLAM中相机运动估计的基本问题及解决方案
    本文讨论了SLAM中相机运动估计的基本问题,指出了解决方案的存在。作者认为阅读相关SLAM书籍是掌握基础原理的有效途径,而不是仅仅依赖现成的解决方案。同时,作者也提到了激光雷达和特征点匹配等技术在SLAM中的应用,并建议读者深入理解相关原理,而不是盲目追求现成的代码。 ... [详细]
  • 英语思维导图大全 词汇与语法结构详解
    本文详细介绍了英语思维导图大全中的词汇与语法结构,包括新鲜一感的理解和订阅后获取百度网盘链接的方法。通过阅读本文,您将对英语思维导图的相关知识有更深入的了解。 ... [详细]
  • 树莓派语音控制的配置方法和步骤
    本文介绍了在树莓派上实现语音控制的配置方法和步骤。首先感谢博主Eoman的帮助,文章参考了他的内容。树莓派的配置需要通过sudo raspi-config进行,然后使用Eoman的控制方法,即安装wiringPi库并编写控制引脚的脚本。具体的安装步骤和脚本编写方法在文章中详细介绍。 ... [详细]
  • SpringMVC接收请求参数的方式总结
    本文总结了在SpringMVC开发中处理控制器参数的各种方式,包括处理使用@RequestParam注解的参数、MultipartFile类型参数和Simple类型参数的RequestParamMethodArgumentResolver,处理@RequestBody注解的参数的RequestResponseBodyMethodProcessor,以及PathVariableMapMethodArgumentResol等子类。 ... [详细]
  • 本文由编程笔记#小编整理,主要介绍了关于数论相关的知识,包括数论的算法和百度百科的链接。文章还介绍了欧几里得算法、辗转相除法、gcd、lcm和扩展欧几里得算法的使用方法。此外,文章还提到了数论在求解不定方程、模线性方程和乘法逆元方面的应用。摘要长度:184字。 ... [详细]
  • 嵌入式处理器的架构与内核发展历程
    本文主要介绍了嵌入式处理器的架构与内核发展历程,包括不同架构的指令集的变化,以及内核的流水线和结构。通过对ARM架构的分析,可以更好地理解嵌入式处理器的架构与内核的关系。 ... [详细]
  • HTML5网页模板怎么加百度统计?
    本文介绍了如何在HTML5网页模板中加入百度统计,并对模板文件、css样式表、js插件库等内容进行了说明。同时还解答了关于HTML5网页模板的使用方法、表单提交、域名和空间的问题,并介绍了如何使用Visual Studio 2010创建HTML5模板。此外,还提到了使用Jquery编写美好的HTML5前端框架模板的方法,以及制作企业HTML5网站模板和支持HTML5的CMS。 ... [详细]
  • 分享css中提升优先级属性!important的用法总结
    web前端|css教程css!importantweb前端-css教程本文分享css中提升优先级属性!important的用法总结微信门店展示源码,vscode如何管理站点,ubu ... [详细]
  • 一次上线事故,30岁+的程序员踩坑经验之谈
    本文主要介绍了一位30岁+的程序员在一次上线事故中踩坑的经验之谈。文章提到了在双十一活动期间,作为一个在线医疗项目,他们进行了优惠折扣活动的升级改造。然而,在上线前的最后一天,由于大量数据请求,导致部分接口出现问题。作者通过部署两台opentsdb来解决问题,但读数据的opentsdb仍然经常假死。作者只能查询最近24小时的数据。这次事故给他带来了很多教训和经验。 ... [详细]
  • Netty源代码分析服务器端启动ServerBootstrap初始化
    本文主要分析了Netty源代码中服务器端启动的过程,包括ServerBootstrap的初始化和相关参数的设置。通过分析NioEventLoopGroup、NioServerSocketChannel、ChannelOption.SO_BACKLOG等关键组件和选项的作用,深入理解Netty服务器端的启动过程。同时,还介绍了LoggingHandler的作用和使用方法,帮助读者更好地理解Netty源代码。 ... [详细]
author-avatar
骑蜗牛追神81986
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有