热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

RabbitMQ(1)消息队列中间件使用

RabbitMQ消息队列中间件使用RabbitMQ是部署最广泛的开源消息代理。RabbitMQ拥有数以万计的用户,是最受欢迎的开源消息代理之一。从T-Mobile到Runtasti

RabbitMQ消息队列中间件使用


RabbitMQ 是部署最广泛的开源消息代理。RabbitMQ拥有数以万计的用户,是最受欢迎的开源消息代理之一。从T-Mobile到Runtastic,RabbitMQ在全球范围内用于小型初创企业和大型企业。

RabbitMQ是轻量级的,易于在本地和云中部署。它支持多种消息传递协议。RabbitMQ 可以部署在分布式和联合配置中,以满足高规模、高可用性的需求。



RabbitMQ的介绍

在下图中,“P”是我们的生产者,“C”是我们的消费者。中间的框是一个队列 - RabbitMQ 代表使用者保留的消息缓冲区。

(P) -> [|||] -> (C)



  • 生产者:只负责发送消息到队列缓存中。

  • 队列:队列是位于RabbitMQ内的邮筒的名称。RabbitMQ 和应用程序之间的消息都存储在队列中。队列仅受主机的内存和磁盘限制的约束,它本质上是一个大的消息缓冲区。生产者可以发送多个消息到同一个队列中,同样多个消费者可以从同一个队列中获取数据。

  • 消费者:只负责获取队列中的数据。

生产者、使用者和代理不必驻留在同一台主机上。事实上,在大多数应用程序中,它们都没有在一台主机上。因此应用程序也可以既是生产者又是消费者。


RabbitMQ的特点



  • 支持多种消息传递协议,消息队列,传递确认,灵活的队列路由,多种交换类型。

  • 使用 Kubernetes、BOSH、Chef、Docker 和 Puppet 进行部署。使用喜欢的编程语言开发跨语言消息传递,例如:Java,.NET,PHP,Python,Javascript,Ruby,Go等。

  • 部署为集群,以实现高可用性和吞吐量;跨多个可用区和区域联合。

  • 可插拔身份验证,授权,支持TLS和LDAP。轻量级且易于在公共云和私有云中部署。

  • 各种工具和插件,支持持续集成、操作指标和与其他企业系统的集成。用于扩展 RabbitMQ 功能的灵活插件方法。

  • HTTP-API,命令行工具和用于管理和监视RabbitMQ的UI。


Docker安装RabbitMQ

一、Docker安装erlang

由于RabbitMQ是erlang语言开发的,所以我们在安装RabbitMQ前先安装erlang,以便运行RabbitMQ,我们本次使用Docker安装因此只需要直接拉取容器即可。通过Docker搜索容器命令docker search --limit 5 erlang搜索到前5条erlang相关容器信息。

root@iZ059o7jp1sn1wZ:~# docker search --limit 5 erlang
NAME DESCRIPTION STARS OFFICIAL AUTOMATED
erlang Erlang is a programming language used to bui… 330 [OK]
erlangsolutions/wombatoam WombatOAM image without a license key 2
circleci/erlang CircleCI images for Erlang 0
erlang/ubuntu-build Ubuntu based build images 0
erlangsolutions/node-with-build-tools https://github.com/esl/node-with-build-tools 0

我直接拉取最新容器:docker pull erlang,如需要拉取特定版本的erlang和RabbitMQ则需要版本对应RabbitMQ Erlang 版本对应要求 — RabbitMQ。

验证erlang是否安装完成,在拉取容器后:docker run correl/erlang echo "hello word",如果回馈"hello word"则完成拉取。

root@iZ059o7jp1sn1wZ:~# docker run erlang echo "hello word"
hello word

本次运行erlang就不配置映射文件了,直接运行docker run -it --name 别名 端口号,需要设置配置文件挂载等请看另一篇博客Docker使用相关指令。

二、Docker安装RabbitMQ

安装完erlang后开始安装RabbitMQ,docker search --limit 5 rabbitmq查询你需要的版本,这边直接拉取最新镜像docker pull rabbitmq

root@iZ059o7jp1sn1wZ:~# docker search --limit 5 rabbitmq
NAME DESCRIPTION STARS OFFICIAL AUTOMATED
rabbitmq RabbitMQ is an open source multi-protocol me… 4349 [OK]
bitnami/rabbitmq Bitnami Docker Image for RabbitMQ 86 [OK]
bitnami/rabbitmq-exporter 1
circleci/rabbitmq-delayed https://github.com/circleci/rabbitmq-delayed… 1
circleci/rabbitmq This image is for internal use 0

挂载安装RabbitMQ:docker run -d --name=rabbitmq -p 5672:5672 -p 15672:15672 -v /docker/rabbitmq/data:/var/lib/rabbitmq -e RABBITMQ_DEFAULT_USER=root -e RABBITMQ_DEFAULT_PASS=root rabbitmq



  • RABBITMQ_DEFAULT_USER:用户账号

  • RABBITMQ_DEFAULT_PASS:用户密码

Web可视化端使用此用户密码登录,如果没设置用户账号和密码,默认的账号和密码为guest/guest。使用docker ps查看是否正常运行RabbitMQ和erlang。

注意: guest用户只能本地(localhost、127.0.0.1)访问Web可视化管理界面,其他IP访问需另创用户并授权

root@iZ059o7jp1sn1wZ:~# docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
40d6fe910b2e rabbitmq "docker-entrypoint.s…" 4 minutes ago Up 4 minutes 4369/tcp, 0.0.0.0:5672->5672/tcp, :::5672->5672/tcp, 5671/tcp, 15691-15692/tcp, 25672/tcp, 0.0.0.0:15672->15672/tcp, :::15672->15672/tcp rabbitmq
6a499e7c356c 0b68d51c5f30 "erl" 3 hours ago Up 3 hours erlang

安装Web可视化插件

进入RabbitMQ容器docker exec -it 容器ID /bin/bash,安装可视化插件rabbitmq-plugins enable rabbitmq_management

root@iZ059o7jp1sn1wZ:~# docker exec -it 40d6fe910b2e /bin/bash
root@40d6fe910b2e:/# rabbitmq-plugins enable rabbitmq_management
Enabling plugins on node rabbit@40d6fe910b2e:
rabbitmq_management
The following plugins have been configured:
rabbitmq_management
rabbitmq_management_agent
rabbitmq_prometheus
rabbitmq_web_dispatch
Applying plugin configuration to rabbit@40d6fe910b2e...
The following plugins have been enabled:
rabbitmq_management
started 1 plugins.

web进入可视化界面IP:15672,密码为上面设定RabbitMQ的账号密码。

image-20220621113758887

登录成功后的页面

image-20220621113819614


RabbitMQ初の体验(Hello Word)

首先安装相关包,我们可以在RabbitMQ官网找到相关依赖包,RabbitMQ连接需要SLF4J依赖,本次简单的RabbitMQ程序SLF4J Simple已经够用,但你应该在生产中使用一个完整的日志记录库,如Logback。

image-20220623153417303


发送

(P) -> [|||]

发布者将连接到 RabbitMQ,发送一条消息,然后退出。

代码部分

package Demo;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
public class Producer {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) {
// 1、创建链接
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("114.55.34.91");
factory.setPort(5672);
factory.setUsername("root");
factory.setPassword("root");
// 2、链接
try (Connection cOnnection= factory.newConnection();
// 3、获取通道
Channel channel = connection.createChannel()) {
/**
* 1、name: 队列名称
* 2、durable: 是否持久化
* 3、exclusive: 是否独享、排外的。如果设置为true,定义为排他队列。则只有创建者可以使用此队列。也就是private私有的。
* 4、autoDelete: 是否自动删除。也就是临时队列。当最后一个消费者断开连接后,会自动删除。
* 5、arguments:参数
* */
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
System.out.println(" [x] Sent '" + message + "'");
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
}

arguments参数:



  • Message TTL | Auto expire | Overflow behaviour | Single active consumer

    Dead letter exchange | Dead letter routing key

    Max length | Max length bytes

    Maximum priority | Lazy mode | Master locator

运行结果

Connected to the target VM, address: '127.0.0.1:60528', transport: 'socket'
[x] Sent 'Hello World!'
Disconnected from the target VM, address: '127.0.0.1:60528', transport: 'socket'
Process finished with exit code 0

用Debug运行到连接的时候,我们可以在可视化页面看见连接的用户IP和用户名字

image-20220623153923360


接收

[|||]-> (C)

这就是我们的出版商。我们的消费者监听来自 RabbitMQ 的消息,因此与发布单个消息的发布者不同,我们将让消费者运行以监听消息并将其打印出来。

我们将使用额外的DeliverCallback接口来缓冲服务器推送给我们的消息。

设置与发送者相同,我们都需要打开同一个队列,需要和发布者发布的队列一样。

public class Consumer {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection cOnnection= factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
}
}

请注意,我们在此处也声明队列。由于我们可能会在发布者之前启动使用者,因此我们希望在尝试使用队列中的消息之前确保队列存在。

我们为什么不使用try-with-resource语句来自动关闭通道和连接?因为我们希望当消费者异步侦听消息到达时,进程保持活动状态。

我们要从服务器队列中拿取消息。由于它将异步推送消息,因此我们以对象的形式提供回调,该回调将缓冲消息,直到我们要使用它们。这就是DeliverCallback子类的作用。

DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });

代码部分

package Demo;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
/**
* 消费者
*/
public class Consumer {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("114.55.34.91");
factory.setPort(5672);
factory.setUsername("root");
factory.setPassword("root");
Connection cOnnection= factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
// DeliverCallback缓冲服务器推送给我们的消息
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
}
}

RabbitMQ工作队列

在上面的教程中,我们学会了接受和发送消息,本章节对MQ的接收者消息进行处理。

img


轮询机制

当多个消费者对MQ存储消息进行接收,每个消费者都分配一条,到消息全部被消费。使用任务队列的优点之一是能够轻松并行化工作。如果我们正在积累积压的工作,我们可以添加更多的消费者来进行扩展。

默认情况下,RabbitMQ 将按顺序将每条消息发送给下一个消费者。平均而言,每个消费者将获得相同数量的消息。这种分发消息的方式称为轮循机制。将上文发送代码改为,输入来进行轮询测试。

channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 输入参数
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()) {
String message = scanner.next();
channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
System.out.println(" [x] Sent '" + message + "'");
}

创建2个线程来进行消费,我们可以直接在idea内创建2个线程来进行处理。

image-20220702135152222

image-20220702135400029

创建好2个消费者线程后,启动消费者线程进行消费监听,然后启动发送者发送消息来处理。

发送者:

image-20220702135528889

消费者1:

image-20220702135555780

消费者2:

image-20220702135635700

结论:消息发送线程,发送消息时工作线程会轮询得到消息发送线程发送的消息,这是一种公平的策略,但是这种方式效率较低,在实际工作中一般采用不公平的策略。



推荐阅读
  • 作为140字符的开创者,Twitter看似简单却异常复杂。其简洁之处在于仅用140个字符就能实现信息的高效传播,甚至在多次全球性事件中超越传统媒体的速度。然而,为了支持2亿用户的高效使用,其背后的技术架构和系统设计则极为复杂,涉及高并发处理、数据存储和实时传输等多个技术挑战。 ... [详细]
  • Java Socket 关键参数详解与优化建议
    Java Socket 的 API 虽然被广泛使用,但其关键参数的用途却鲜为人知。本文详细解析了 Java Socket 中的重要参数,如 backlog 参数,它用于控制服务器等待连接请求的队列长度。此外,还探讨了其他参数如 SO_TIMEOUT、SO_REUSEADDR 等的配置方法及其对性能的影响,并提供了优化建议,帮助开发者提升网络通信的稳定性和效率。 ... [详细]
  • 本文介绍了Spring 2.0引入的TaskExecutor接口及其多种实现,包括同步和异步执行任务的方式。文章详细解释了如何在Spring应用中配置和使用这些线程池实现,以提高应用的性能和可管理性。 ... [详细]
  • 浅析python实现布隆过滤器及Redis中的缓存穿透原理_python
    本文带你了解了位图的实现,布隆过滤器的原理及Python中的使用,以及布隆过滤器如何应对Redis中的缓存穿透,相信你对布隆过滤 ... [详细]
  • 秒建一个后台管理系统?用这5个开源免费的Java项目就够了
    秒建一个后台管理系统?用这5个开源免费的Java项目就够了 ... [详细]
  • 深入解析Struts、Spring与Hibernate三大框架的面试要点与技巧 ... [详细]
  • Web开发框架概览:Java与JavaScript技术及框架综述
    Web开发涉及服务器端和客户端的协同工作。在服务器端,Java是一种优秀的编程语言,适用于构建各种功能模块,如通过Servlet实现特定服务。客户端则主要依赖HTML进行内容展示,同时借助JavaScript增强交互性和动态效果。此外,现代Web开发还广泛使用各种框架和库,如Spring Boot、React和Vue.js,以提高开发效率和应用性能。 ... [详细]
  • 手指触控|Android电容屏幕驱动调试指南
    手指触控|Android电容屏幕驱动调试指南 ... [详细]
  • 掌握PHP编程必备知识与技巧——全面教程在当今的PHP开发中,了解并运用最新的技术和最佳实践至关重要。本教程将详细介绍PHP编程的核心知识与实用技巧。首先,确保你正在使用PHP 5.3或更高版本,最好是最新版本,以充分利用其性能优化和新特性。此外,我们还将探讨代码结构、安全性和性能优化等方面的内容,帮助你成为一名更高效的PHP开发者。 ... [详细]
  • 我正在使用 Ruby on Rails 构建个人网站。总体而言,RoR 是一个非常出色的工具,它提供了丰富的功能和灵活性,使得创建自定义页面变得既高效又便捷。通过利用其强大的框架和模块化设计,我可以轻松实现复杂的功能,同时保持代码的整洁和可维护性。此外,Rails 的社区支持也非常强大,为开发过程中遇到的问题提供了丰富的资源和解决方案。 ... [详细]
  • 本文推荐了六款高效的Java Web应用开发工具,并详细介绍了它们的实用功能。其中,分布式敏捷开发系统架构“zheng”项目,基于Spring、Spring MVC和MyBatis技术栈,提供了完整的分布式敏捷开发解决方案,支持快速构建高性能的企业级应用。此外,该工具还集成了多种中间件和服务,进一步提升了开发效率和系统的可维护性。 ... [详细]
  • 本文将介绍如何在混合开发(Hybrid)应用中实现Native与HTML5的交互,包括基本概念、学习目标以及具体的实现步骤。 ... [详细]
  • 原文网址:https:www.cnblogs.comysoceanp7476379.html目录1、AOP什么?2、需求3、解决办法1:使用静态代理4 ... [详细]
  • 本文是Java并发编程系列的开篇之作,将详细解析Java 1.5及以上版本中提供的并发工具。文章假设读者已经具备同步和易失性关键字的基本知识,重点介绍信号量机制的内部工作原理及其在实际开发中的应用。 ... [详细]
  • 阿里巴巴终面技术挑战:如何利用 UDP 实现 TCP 功能?
    在阿里巴巴的技术面试中,技术总监曾提出一道关于如何利用 UDP 实现 TCP 功能的问题。当时回答得不够理想,因此事后进行了详细总结。通过与总监的进一步交流,了解到这是一道常见的阿里面试题。面试官的主要目的是考察应聘者对 UDP 和 TCP 在原理上的差异的理解,以及如何通过 UDP 实现类似 TCP 的可靠传输机制。 ... [详细]
author-avatar
阳光碎了围脖_182
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有