热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

RabbitMQ(1)消息队列中间件使用

RabbitMQ消息队列中间件使用RabbitMQ是部署最广泛的开源消息代理。RabbitMQ拥有数以万计的用户,是最受欢迎的开源消息代理之一。从T-Mobile到Runtasti

RabbitMQ消息队列中间件使用


RabbitMQ 是部署最广泛的开源消息代理。RabbitMQ拥有数以万计的用户,是最受欢迎的开源消息代理之一。从T-Mobile到Runtastic,RabbitMQ在全球范围内用于小型初创企业和大型企业。

RabbitMQ是轻量级的,易于在本地和云中部署。它支持多种消息传递协议。RabbitMQ 可以部署在分布式和联合配置中,以满足高规模、高可用性的需求。



RabbitMQ的介绍

在下图中,“P”是我们的生产者,“C”是我们的消费者。中间的框是一个队列 - RabbitMQ 代表使用者保留的消息缓冲区。

(P) -> [|||] -> (C)



  • 生产者:只负责发送消息到队列缓存中。

  • 队列:队列是位于RabbitMQ内的邮筒的名称。RabbitMQ 和应用程序之间的消息都存储在队列中。队列仅受主机的内存和磁盘限制的约束,它本质上是一个大的消息缓冲区。生产者可以发送多个消息到同一个队列中,同样多个消费者可以从同一个队列中获取数据。

  • 消费者:只负责获取队列中的数据。

生产者、使用者和代理不必驻留在同一台主机上。事实上,在大多数应用程序中,它们都没有在一台主机上。因此应用程序也可以既是生产者又是消费者。


RabbitMQ的特点



  • 支持多种消息传递协议,消息队列,传递确认,灵活的队列路由,多种交换类型。

  • 使用 Kubernetes、BOSH、Chef、Docker 和 Puppet 进行部署。使用喜欢的编程语言开发跨语言消息传递,例如:Java,.NET,PHP,Python,Javascript,Ruby,Go等。

  • 部署为集群,以实现高可用性和吞吐量;跨多个可用区和区域联合。

  • 可插拔身份验证,授权,支持TLS和LDAP。轻量级且易于在公共云和私有云中部署。

  • 各种工具和插件,支持持续集成、操作指标和与其他企业系统的集成。用于扩展 RabbitMQ 功能的灵活插件方法。

  • HTTP-API,命令行工具和用于管理和监视RabbitMQ的UI。


Docker安装RabbitMQ

一、Docker安装erlang

由于RabbitMQ是erlang语言开发的,所以我们在安装RabbitMQ前先安装erlang,以便运行RabbitMQ,我们本次使用Docker安装因此只需要直接拉取容器即可。通过Docker搜索容器命令docker search --limit 5 erlang搜索到前5条erlang相关容器信息。

root@iZ059o7jp1sn1wZ:~# docker search --limit 5 erlang
NAME DESCRIPTION STARS OFFICIAL AUTOMATED
erlang Erlang is a programming language used to bui… 330 [OK]
erlangsolutions/wombatoam WombatOAM image without a license key 2
circleci/erlang CircleCI images for Erlang 0
erlang/ubuntu-build Ubuntu based build images 0
erlangsolutions/node-with-build-tools https://github.com/esl/node-with-build-tools 0

我直接拉取最新容器:docker pull erlang,如需要拉取特定版本的erlang和RabbitMQ则需要版本对应RabbitMQ Erlang 版本对应要求 — RabbitMQ。

验证erlang是否安装完成,在拉取容器后:docker run correl/erlang echo "hello word",如果回馈"hello word"则完成拉取。

root@iZ059o7jp1sn1wZ:~# docker run erlang echo "hello word"
hello word

本次运行erlang就不配置映射文件了,直接运行docker run -it --name 别名 端口号,需要设置配置文件挂载等请看另一篇博客Docker使用相关指令。

二、Docker安装RabbitMQ

安装完erlang后开始安装RabbitMQ,docker search --limit 5 rabbitmq查询你需要的版本,这边直接拉取最新镜像docker pull rabbitmq

root@iZ059o7jp1sn1wZ:~# docker search --limit 5 rabbitmq
NAME DESCRIPTION STARS OFFICIAL AUTOMATED
rabbitmq RabbitMQ is an open source multi-protocol me… 4349 [OK]
bitnami/rabbitmq Bitnami Docker Image for RabbitMQ 86 [OK]
bitnami/rabbitmq-exporter 1
circleci/rabbitmq-delayed https://github.com/circleci/rabbitmq-delayed… 1
circleci/rabbitmq This image is for internal use 0

挂载安装RabbitMQ:docker run -d --name=rabbitmq -p 5672:5672 -p 15672:15672 -v /docker/rabbitmq/data:/var/lib/rabbitmq -e RABBITMQ_DEFAULT_USER=root -e RABBITMQ_DEFAULT_PASS=root rabbitmq



  • RABBITMQ_DEFAULT_USER:用户账号

  • RABBITMQ_DEFAULT_PASS:用户密码

Web可视化端使用此用户密码登录,如果没设置用户账号和密码,默认的账号和密码为guest/guest。使用docker ps查看是否正常运行RabbitMQ和erlang。

注意: guest用户只能本地(localhost、127.0.0.1)访问Web可视化管理界面,其他IP访问需另创用户并授权

root@iZ059o7jp1sn1wZ:~# docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
40d6fe910b2e rabbitmq "docker-entrypoint.s…" 4 minutes ago Up 4 minutes 4369/tcp, 0.0.0.0:5672->5672/tcp, :::5672->5672/tcp, 5671/tcp, 15691-15692/tcp, 25672/tcp, 0.0.0.0:15672->15672/tcp, :::15672->15672/tcp rabbitmq
6a499e7c356c 0b68d51c5f30 "erl" 3 hours ago Up 3 hours erlang

安装Web可视化插件

进入RabbitMQ容器docker exec -it 容器ID /bin/bash,安装可视化插件rabbitmq-plugins enable rabbitmq_management

root@iZ059o7jp1sn1wZ:~# docker exec -it 40d6fe910b2e /bin/bash
root@40d6fe910b2e:/# rabbitmq-plugins enable rabbitmq_management
Enabling plugins on node rabbit@40d6fe910b2e:
rabbitmq_management
The following plugins have been configured:
rabbitmq_management
rabbitmq_management_agent
rabbitmq_prometheus
rabbitmq_web_dispatch
Applying plugin configuration to rabbit@40d6fe910b2e...
The following plugins have been enabled:
rabbitmq_management
started 1 plugins.

web进入可视化界面IP:15672,密码为上面设定RabbitMQ的账号密码。

image-20220621113758887

登录成功后的页面

image-20220621113819614


RabbitMQ初の体验(Hello Word)

首先安装相关包,我们可以在RabbitMQ官网找到相关依赖包,RabbitMQ连接需要SLF4J依赖,本次简单的RabbitMQ程序SLF4J Simple已经够用,但你应该在生产中使用一个完整的日志记录库,如Logback。

image-20220623153417303


发送

(P) -> [|||]

发布者将连接到 RabbitMQ,发送一条消息,然后退出。

代码部分

package Demo;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
public class Producer {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) {
// 1、创建链接
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("114.55.34.91");
factory.setPort(5672);
factory.setUsername("root");
factory.setPassword("root");
// 2、链接
try (Connection cOnnection= factory.newConnection();
// 3、获取通道
Channel channel = connection.createChannel()) {
/**
* 1、name: 队列名称
* 2、durable: 是否持久化
* 3、exclusive: 是否独享、排外的。如果设置为true,定义为排他队列。则只有创建者可以使用此队列。也就是private私有的。
* 4、autoDelete: 是否自动删除。也就是临时队列。当最后一个消费者断开连接后,会自动删除。
* 5、arguments:参数
* */
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
System.out.println(" [x] Sent '" + message + "'");
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
}

arguments参数:



  • Message TTL | Auto expire | Overflow behaviour | Single active consumer

    Dead letter exchange | Dead letter routing key

    Max length | Max length bytes

    Maximum priority | Lazy mode | Master locator

运行结果

Connected to the target VM, address: '127.0.0.1:60528', transport: 'socket'
[x] Sent 'Hello World!'
Disconnected from the target VM, address: '127.0.0.1:60528', transport: 'socket'
Process finished with exit code 0

用Debug运行到连接的时候,我们可以在可视化页面看见连接的用户IP和用户名字

image-20220623153923360


接收

[|||]-> (C)

这就是我们的出版商。我们的消费者监听来自 RabbitMQ 的消息,因此与发布单个消息的发布者不同,我们将让消费者运行以监听消息并将其打印出来。

我们将使用额外的DeliverCallback接口来缓冲服务器推送给我们的消息。

设置与发送者相同,我们都需要打开同一个队列,需要和发布者发布的队列一样。

public class Consumer {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection cOnnection= factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
}
}

请注意,我们在此处也声明队列。由于我们可能会在发布者之前启动使用者,因此我们希望在尝试使用队列中的消息之前确保队列存在。

我们为什么不使用try-with-resource语句来自动关闭通道和连接?因为我们希望当消费者异步侦听消息到达时,进程保持活动状态。

我们要从服务器队列中拿取消息。由于它将异步推送消息,因此我们以对象的形式提供回调,该回调将缓冲消息,直到我们要使用它们。这就是DeliverCallback子类的作用。

DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });

代码部分

package Demo;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
/**
* 消费者
*/
public class Consumer {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("114.55.34.91");
factory.setPort(5672);
factory.setUsername("root");
factory.setPassword("root");
Connection cOnnection= factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
// DeliverCallback缓冲服务器推送给我们的消息
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
}
}

RabbitMQ工作队列

在上面的教程中,我们学会了接受和发送消息,本章节对MQ的接收者消息进行处理。

img


轮询机制

当多个消费者对MQ存储消息进行接收,每个消费者都分配一条,到消息全部被消费。使用任务队列的优点之一是能够轻松并行化工作。如果我们正在积累积压的工作,我们可以添加更多的消费者来进行扩展。

默认情况下,RabbitMQ 将按顺序将每条消息发送给下一个消费者。平均而言,每个消费者将获得相同数量的消息。这种分发消息的方式称为轮循机制。将上文发送代码改为,输入来进行轮询测试。

channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 输入参数
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()) {
String message = scanner.next();
channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
System.out.println(" [x] Sent '" + message + "'");
}

创建2个线程来进行消费,我们可以直接在idea内创建2个线程来进行处理。

image-20220702135152222

image-20220702135400029

创建好2个消费者线程后,启动消费者线程进行消费监听,然后启动发送者发送消息来处理。

发送者:

image-20220702135528889

消费者1:

image-20220702135555780

消费者2:

image-20220702135635700

结论:消息发送线程,发送消息时工作线程会轮询得到消息发送线程发送的消息,这是一种公平的策略,但是这种方式效率较低,在实际工作中一般采用不公平的策略。



推荐阅读
  • 本文介绍了Redis的基础数据结构string的应用场景,并以面试的形式进行问答讲解,帮助读者更好地理解和应用Redis。同时,描述了一位面试者的心理状态和面试官的行为。 ... [详细]
  • Python实现变声器功能(萝莉音御姐音)的方法及步骤
    本文介绍了使用Python实现变声器功能(萝莉音御姐音)的方法及步骤。首先登录百度AL开发平台,选择语音合成,创建应用并填写应用信息,获取Appid、API Key和Secret Key。然后安装pythonsdk,可以通过pip install baidu-aip或python setup.py install进行安装。最后,书写代码实现变声器功能,使用AipSpeech库进行语音合成,可以设置音量等参数。 ... [详细]
  • Android Studio Bumblebee | 2021.1.1(大黄蜂版本使用介绍)
    本文介绍了Android Studio Bumblebee | 2021.1.1(大黄蜂版本)的使用方法和相关知识,包括Gradle的介绍、设备管理器的配置、无线调试、新版本问题等内容。同时还提供了更新版本的下载地址和启动页面截图。 ... [详细]
  • 计算机存储系统的层次结构及其优势
    本文介绍了计算机存储系统的层次结构,包括高速缓存、主存储器和辅助存储器三个层次。通过分层存储数据可以提高程序的执行效率。计算机存储系统的层次结构将各种不同存储容量、存取速度和价格的存储器有机组合成整体,形成可寻址存储空间比主存储器空间大得多的存储整体。由于辅助存储器容量大、价格低,使得整体存储系统的平均价格降低。同时,高速缓存的存取速度可以和CPU的工作速度相匹配,进一步提高程序执行效率。 ... [详细]
  • Oracle优化新常态的五大禁止及其性能隐患
    本文介绍了Oracle优化新常态中的五大禁止措施,包括禁止外键、禁止视图、禁止触发器、禁止存储过程和禁止JOB,并分析了这些禁止措施可能带来的性能隐患。文章还讨论了这些禁止措施在C/S架构和B/S架构中的不同应用情况,并提出了解决方案。 ... [详细]
  • 本文介绍了OpenStack的逻辑概念以及其构成简介,包括了软件开源项目、基础设施资源管理平台、三大核心组件等内容。同时还介绍了Horizon(UI模块)等相关信息。 ... [详细]
  • 本文介绍了OkHttp3的基本使用和特性,包括支持HTTP/2、连接池、GZIP压缩、缓存等功能。同时还提到了OkHttp3的适用平台和源码阅读计划。文章还介绍了OkHttp3的请求/响应API的设计和使用方式,包括阻塞式的同步请求和带回调的异步请求。 ... [详细]
  • Centos下安装memcached+memcached教程
    本文介绍了在Centos下安装memcached和使用memcached的教程,详细解释了memcached的工作原理,包括缓存数据和对象、减少数据库读取次数、提高网站速度等。同时,还对memcached的快速和高效率进行了解释,与传统的文件型数据库相比,memcached作为一个内存型数据库,具有更高的读取速度。 ... [详细]
  • 基于PgpoolII的PostgreSQL集群安装与配置教程
    本文介绍了基于PgpoolII的PostgreSQL集群的安装与配置教程。Pgpool-II是一个位于PostgreSQL服务器和PostgreSQL数据库客户端之间的中间件,提供了连接池、复制、负载均衡、缓存、看门狗、限制链接等功能,可以用于搭建高可用的PostgreSQL集群。文章详细介绍了通过yum安装Pgpool-II的步骤,并提供了相关的官方参考地址。 ... [详细]
  • 开发笔记:计网局域网:NAT 是如何工作的?
    篇首语:本文由编程笔记#小编为大家整理,主要介绍了计网-局域网:NAT是如何工作的?相关的知识,希望对你有一定的参考价值。 ... [详细]
  • 一句话解决高并发的核心原则
    本文介绍了解决高并发的核心原则,即将用户访问请求尽量往前推,避免访问CDN、静态服务器、动态服务器、数据库和存储,从而实现高性能、高并发、高可扩展的网站架构。同时提到了Google的成功案例,以及适用于千万级别PV站和亿级PV网站的架构层次。 ... [详细]
  • 本文讨论了微软的STL容器类是否线程安全。根据MSDN的回答,STL容器类包括vector、deque、list、queue、stack、priority_queue、valarray、map、hash_map、multimap、hash_multimap、set、hash_set、multiset、hash_multiset、basic_string和bitset。对于单个对象来说,多个线程同时读取是安全的。但如果一个线程正在写入一个对象,那么所有的读写操作都需要进行同步。 ... [详细]
  • Java和JavaScript是什么关系?java跟javaScript都是编程语言,只是java跟javaScript没有什么太大关系,一个是脚本语言(前端语言),一个是面向对象 ... [详细]
  • Android自定义控件绘图篇之Paint函数大汇总
    本文介绍了Android自定义控件绘图篇中的Paint函数大汇总,包括重置画笔、设置颜色、设置透明度、设置样式、设置宽度、设置抗锯齿等功能。通过学习这些函数,可以更好地掌握Paint的用法。 ... [详细]
  • C++语言入门:数组的基本知识和应用领域
    本文介绍了C++语言的基本知识和应用领域,包括C++语言与Python语言的区别、C++语言的结构化特点、关键字和控制语句的使用、运算符的种类和表达式的灵活性、各种数据类型的运算以及指针概念的引入。同时,还探讨了C++语言在代码效率方面的优势和与汇编语言的比较。对于想要学习C++语言的初学者来说,本文提供了一个简洁而全面的入门指南。 ... [详细]
author-avatar
阳光碎了围脖_182
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有