热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

RabbitMQ消息中间件快速入门:SpringBoot整合生产者与消费者

前言本章我们来一次快速入门RabbitMQ——生产者与消费者。需要构建一个生产端与消费端的模型。什么意思呢?我们的生产者发送一条消息,投递到RabbitMQ集群也就是Broker。

求关注

快速入门生产者与消费者,SpringBoot整合RabbitMQ!


前言

本章我们来一次快速入门RabbitMQ——生产者与消费者。需要构建一个生产端与消费端的模型。什么意思呢?我们的生产者发送一条消息,投递到RabbitMQ集群也就是Broker。

我们的消费端进行监听RabbitMQ,当发现队列中有消息后,就进行消费。


1. 环境准备

本次整合主要采用SpringBoot框架,需要对SpringBoot的使用有一定了解。


2.大概步骤

我们来看下大概步骤:



  • ConnectionFacorty:获取连接工厂

  • Connection:一个连接

  • Channel:数据通信信道,可发送和接收消息

  • Queue:具体的消息存储队列

  • Producer & Consumer 生产者和消费者

这个连接工厂需要配置一些相应的信息,例如: RabbitMQ节点的地址,端口号,VirtualHost等等。

Channel是我们RabbitMQ所有消息进行交互的关键。


3. 项目实战


3.1 连接工厂


/**
*
* @ClassName: ConnectionUtils
* @Description: 连接工具类
* @author Coder编程
* @date 2019年6月21日 上午22:28:22
*
*/
public class ConnectionUtils {
public static Connection getConnection() throws IOException, TimeoutException {
//定义连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置服务地址
factory.setHost("127.0.0.1");
//端口
factory.setPort(5672);//amqp协议 端口 类似与mysql的3306
//设置账号信息,用户名、密码、vhost
factory.setVirtualHost("/vhost_cp");
factory.setUsername("user_cp");
factory.setPassword("123456");
// 通过工程获取连接
Connection cOnnection= factory.newConnection();
return connection;
}
}

3.2 生产端


/**
*
* @ClassName: Producer
* @Description: 生产者
* @author Coder编程
* @date 2019年7月30日 上午21:04:43
*
*/
public class Producer {

public static void main(String[] args) throws Exception {

System.out.println("Producer start...");

//1 创建ConnectionFactory
Connection cOnnection= ConnectionUtils.getConnection();

//2 通过connection创建一个Channel
Channel channel = connection.createChannel();

//3 通过Channel发送数据
for(int i=0; i <5; i++){
String msg = "Hello RabbitMQ!";
//1 exchange 2 routingKey
channel.basicPublish("", "test001", null, msg.getBytes());
}
//4 记得要关闭相关的连接
channel.close();
connection.close();
}
}

3.3 消费端


/**
*
* @ClassName: Consumer
* @Description: 消费端
* @author Coder编程
* @date 2019年7月30日 上午21:08:12
*
*/
public class Consumer {
public static void main(String[] args) throws Exception {

System.out.println("Consumer start...");

//1 创建ConnectionFactory
Connection cOnnection= ConnectionUtils.getConnection();

//2通过connection创建一个Channel
Channel channel = connection.createChannel();

//3声明(创建)一个队列
String queueName = "test001";
channel.queueDeclare(queueName, true, false, false, null);

//4创建消费者
QueueingConsumer queueingCOnsumer= new QueueingConsumer(channel);

//5设置Channel
channel.basicConsume(queueName, true, queueingConsumer);

while(true){
//6 获取消息
Delivery delivery = queueingConsumer.nextDelivery();
String msg = new String(delivery.getBody());
System.err.println("消费端: " + msg);
//Envelope envelope = delivery.getEnvelope();
}

}
}

3.4 源码解析


channel.queueDeclare(queueName, true, false, false, null);

底层代码

第一个参数:queuename:队列的名称

第二个参数:durable 是否持久化。true消息会持久化到本地,保证重启服务后消息不会丢失

第三个参数:exclusive :表示独占方式,设置为true 在某些情景下有必要,例如:顺序消费。表示只有一个channel可以去监听,其他channel都不能够监听。目的就是为了保证顺序消费。

第四个参数:autoDelete:队列如果与Exchange未绑定,则自动删除

第五个参数:arguments:扩展参数


channel.basicConsume(QUEUE_NAME, true, consumer);

第二个参数 autoAck:自动签收消息


3.5 运行程序

(1)启动消费端

启动消费端

(2)查看管控台

Overview

可以看到已经有一个连接,一个信道,一个消费者等信息了。

Connections

Channels

可以看到信道目前的状态是空闲状态。

queues

队列中多了test001队列。


关于管控台的介绍可以看这篇文章:消息中间件——RabbitMQ(四)命令行与管控台的基本操作!


(3)运行生产端

运行生产端

消费端收到消息

可以看到生产端发送完消息之后停下了,消费端迅速接收到了消息。也可以继续通过管控台观察消费的情况。

(4) 问题

注意:

这里面可能有一个问题:为什么要先启动消费端呢?

因为在消费端创建的队列,我们必须要有队列,才能够发送消息。

另一个问题:在生产端代码中:


channel.basicPublish("", "test001", null, msg.getBytes());

并没有设置exchange,只设置了队列名称,消费端却依然能够消费到消息,这是为什么呢?

答:发消息的一定要指定Exchange,如果不指定Exchange或者Exchange为空的话,它会默认走第一个

默认Exchange

它的路由规则:将相同命名的队列Queue的消息路由过去,如果路由不过去,将会把消息删除。


文末


欢迎关注个人微信公众号:Coder编程

获取最新原创技术文章和免费学习资料,更有大量精品思维导图、面试资料、PMP备考资料等你来领,方便你随时随地学习技术知识!

新建了一个qq群:315211365,欢迎大家进群交流一起学习。谢谢了!也可以介绍给身边有需要的朋友。



文章收录至

Github: https://github.com/CoderMerlin/coder-programming

Gitee: https://gitee.com/573059382/coder-programming

欢迎关注并star~

微信公众号


参考文章:

https://www.cnblogs.com/myJavaEE/p/6665166.html

《RabbitMQ消息中间件精讲》

推荐文章:

消息中间件——RabbitMQ(二)各大主流消息中间件综合对比介绍!

消息中间件——RabbitMQ(三)理解RabbitMQ核心概念和AMQP协议!

消息中间件——RabbitMQ(四)命令行与管控台的基本操作!



推荐阅读
  • 作者:守望者1028链接:https:www.nowcoder.comdiscuss55353来源:牛客网面试高频题:校招过程中参考过牛客诸位大佬的面经,但是具体哪一块是参考谁的我 ... [详细]
  • 微软Exchange服务器遭遇2022年版“千年虫”漏洞
    微软Exchange服务器在新年伊始遭遇了一个类似于‘千年虫’的日期处理漏洞,导致邮件传输受阻。该问题主要影响配置了FIP-FS恶意软件引擎的Exchange 2016和2019版本。 ... [详细]
  • 根据最新发布的《互联网人才趋势报告》,尽管大量IT从业者已转向Python开发,但随着人工智能和大数据领域的迅猛发展,仍存在巨大的人才缺口。本文将详细介绍如何使用Python编写一个简单的爬虫程序,并提供完整的代码示例。 ... [详细]
  • 本文探讨了 Spring Boot 应用程序在不同配置下支持的最大并发连接数,重点分析了内置服务器(如 Tomcat、Jetty 和 Undertow)的默认设置及其对性能的影响。 ... [详细]
  • 本文详细介绍了Java中org.neo4j.helpers.collection.Iterators.single()方法的功能、使用场景及代码示例,帮助开发者更好地理解和应用该方法。 ... [详细]
  • 优化ListView性能
    本文深入探讨了如何通过多种技术手段优化ListView的性能,包括视图复用、ViewHolder模式、分批加载数据、图片优化及内存管理等。这些方法能够显著提升应用的响应速度和用户体验。 ... [详细]
  • 本文详细介绍了 Dockerfile 的编写方法及其在网络配置中的应用,涵盖基础指令、镜像构建与发布流程,并深入探讨了 Docker 的默认网络、容器互联及自定义网络的实现。 ... [详细]
  • 在前两篇文章中,我们探讨了 ControllerDescriptor 和 ActionDescriptor 这两个描述对象,分别对应控制器和操作方法。本文将基于 MVC3 源码进一步分析 ParameterDescriptor,即用于描述 Action 方法参数的对象,并详细介绍其工作原理。 ... [详细]
  • 本文详细介绍了Akka中的BackoffSupervisor机制,探讨其在处理持久化失败和Actor重启时的应用。通过具体示例,展示了如何配置和使用BackoffSupervisor以实现更细粒度的异常处理。 ... [详细]
  • 基于KVM的SRIOV直通配置及性能测试
    SRIOV介绍、VF直通配置,以及包转发率性能测试小慢哥的原创文章,欢迎转载目录?1.SRIOV介绍?2.环境说明?3.开启SRIOV?4.生成VF?5.VF ... [详细]
  • 探讨如何真正掌握Java EE,包括所需技能、工具和实践经验。资深软件教学总监李刚分享了对毕业生简历中常见问题的看法,并提供了详尽的标准。 ... [详细]
  • 本文将介绍如何编写一些有趣的VBScript脚本,这些脚本可以在朋友之间进行无害的恶作剧。通过简单的代码示例,帮助您了解VBScript的基本语法和功能。 ... [详细]
  • Explore a common issue encountered when implementing an OAuth 1.0a API, specifically the inability to encode null objects and how to resolve it. ... [详细]
  • 本文详细探讨了Java中的24种设计模式及其应用,并介绍了七大面向对象设计原则。通过创建型、结构型和行为型模式的分类,帮助开发者更好地理解和应用这些模式,提升代码质量和可维护性。 ... [详细]
  • 深入理解 SQL 视图、存储过程与事务
    本文详细介绍了SQL中的视图、存储过程和事务的概念及应用。视图为用户提供了一种灵活的数据查询方式,存储过程则封装了复杂的SQL逻辑,而事务确保了数据库操作的完整性和一致性。 ... [详细]
author-avatar
风飞满天2602938511
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有