RabbitMQ 是部署最广泛的开源消息代理。RabbitMQ拥有数以万计的用户,是最受欢迎的开源消息代理之一。从T-Mobile到Runtastic,RabbitMQ在全球范围内用于小型初创企业和大型企业。
RabbitMQ是轻量级的,易于在本地和云中部署。它支持多种消息传递协议。RabbitMQ 可以部署在分布式和联合配置中,以满足高规模、高可用性的需求。
在下图中,“P”是我们的生产者,“C”是我们的消费者。中间的框是一个队列 - RabbitMQ 代表使用者保留的消息缓冲区。
生产者、使用者和代理不必驻留在同一台主机上。事实上,在大多数应用程序中,它们都没有在一台主机上。因此应用程序也可以既是生产者又是消费者。
一、Docker安装erlang
由于RabbitMQ是erlang语言开发的,所以我们在安装RabbitMQ前先安装erlang,以便运行RabbitMQ,我们本次使用Docker安装因此只需要直接拉取容器即可。通过Docker搜索容器命令docker search --limit 5 erlang
搜索到前5条erlang相关容器信息。
root@iZ059o7jp1sn1wZ:~# docker search --limit 5 erlang
NAME DESCRIPTION STARS OFFICIAL AUTOMATED
erlang Erlang is a programming language used to bui… 330 [OK]
erlangsolutions/wombatoam WombatOAM image without a license key 2
circleci/erlang CircleCI images for Erlang 0
erlang/ubuntu-build Ubuntu based build images 0
erlangsolutions/node-with-build-tools https://github.com/esl/node-with-build-tools 0
我直接拉取最新容器:docker pull erlang
,如需要拉取特定版本的erlang和RabbitMQ则需要版本对应RabbitMQ Erlang 版本对应要求 — RabbitMQ。
验证erlang是否安装完成,在拉取容器后:docker run correl/erlang echo "hello word"
,如果回馈"hello word"则完成拉取。
root@iZ059o7jp1sn1wZ:~# docker run erlang echo "hello word"
hello word
本次运行erlang就不配置映射文件了,直接运行docker run -it --name 别名 端口号
,需要设置配置文件挂载等请看另一篇博客Docker使用相关指令。
二、Docker安装RabbitMQ
安装完erlang后开始安装RabbitMQ,docker search --limit 5 rabbitmq
查询你需要的版本,这边直接拉取最新镜像docker pull rabbitmq
root@iZ059o7jp1sn1wZ:~# docker search --limit 5 rabbitmq
NAME DESCRIPTION STARS OFFICIAL AUTOMATED
rabbitmq RabbitMQ is an open source multi-protocol me… 4349 [OK]
bitnami/rabbitmq Bitnami Docker Image for RabbitMQ 86 [OK]
bitnami/rabbitmq-exporter 1
circleci/rabbitmq-delayed https://github.com/circleci/rabbitmq-delayed… 1
circleci/rabbitmq This image is for internal use 0
挂载安装RabbitMQ:docker run -d --name=rabbitmq -p 5672:5672 -p 15672:15672 -v /docker/rabbitmq/data:/var/lib/rabbitmq -e RABBITMQ_DEFAULT_USER=root -e RABBITMQ_DEFAULT_PASS=root rabbitmq
Web可视化端使用此用户密码登录,如果没设置用户账号和密码,默认的账号和密码为guest/guest。使用docker ps
查看是否正常运行RabbitMQ和erlang。
注意: guest用户只能本地(localhost、127.0.0.1)访问Web可视化管理界面,其他IP访问需另创用户并授权
root@iZ059o7jp1sn1wZ:~# docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
40d6fe910b2e rabbitmq "docker-entrypoint.s…" 4 minutes ago Up 4 minutes 4369/tcp, 0.0.0.0:5672->5672/tcp, :::5672->5672/tcp, 5671/tcp, 15691-15692/tcp, 25672/tcp, 0.0.0.0:15672->15672/tcp, :::15672->15672/tcp rabbitmq
6a499e7c356c 0b68d51c5f30 "erl" 3 hours ago Up 3 hours erlang
安装Web可视化插件
进入RabbitMQ容器docker exec -it 容器ID /bin/bash
,安装可视化插件rabbitmq-plugins enable rabbitmq_management
root@iZ059o7jp1sn1wZ:~# docker exec -it 40d6fe910b2e /bin/bash
root@40d6fe910b2e:/# rabbitmq-plugins enable rabbitmq_management
Enabling plugins on node rabbit@40d6fe910b2e:
rabbitmq_management
The following plugins have been configured:
rabbitmq_management
rabbitmq_management_agent
rabbitmq_prometheus
rabbitmq_web_dispatch
Applying plugin configuration to rabbit@40d6fe910b2e...
The following plugins have been enabled:
rabbitmq_management
started 1 plugins.
web进入可视化界面IP:15672,密码为上面设定RabbitMQ的账号密码。
登录成功后的页面
首先安装相关包,我们可以在RabbitMQ官网找到相关依赖包,RabbitMQ连接需要SLF4J依赖,本次简单的RabbitMQ程序SLF4J Simple已经够用,但你应该在生产中使用一个完整的日志记录库,如Logback。
发布者将连接到 RabbitMQ,发送一条消息,然后退出。
代码部分
package Demo;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
public class Producer {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) {
// 1、创建链接
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("114.55.34.91");
factory.setPort(5672);
factory.setUsername("root");
factory.setPassword("root");
// 2、链接
try (Connection cOnnection= factory.newConnection();
// 3、获取通道
Channel channel = connection.createChannel()) {
/**
* 1、name: 队列名称
* 2、durable: 是否持久化
* 3、exclusive: 是否独享、排外的。如果设置为true,定义为排他队列。则只有创建者可以使用此队列。也就是private私有的。
* 4、autoDelete: 是否自动删除。也就是临时队列。当最后一个消费者断开连接后,会自动删除。
* 5、arguments:参数
* */
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
System.out.println(" [x] Sent '" + message + "'");
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
}
arguments参数:
运行结果
Connected to the target VM, address: '127.0.0.1:60528', transport: 'socket'
[x] Sent 'Hello World!'
Disconnected from the target VM, address: '127.0.0.1:60528', transport: 'socket'
Process finished with exit code 0
用Debug运行到连接的时候,我们可以在可视化页面看见连接的用户IP和用户名字
这就是我们的出版商。我们的消费者监听来自 RabbitMQ 的消息,因此与发布单个消息的发布者不同,我们将让消费者运行以监听消息并将其打印出来。
我们将使用额外的DeliverCallback
接口来缓冲服务器推送给我们的消息。
设置与发送者相同,我们都需要打开同一个队列,需要和发布者发布的队列一样。
public class Consumer {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection cOnnection= factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
}
}
请注意,我们在此处也声明队列。由于我们可能会在发布者之前启动使用者,因此我们希望在尝试使用队列中的消息之前确保队列存在。
我们为什么不使用try-with-resource
语句来自动关闭通道和连接?因为我们希望当消费者异步侦听消息到达时,进程保持活动状态。
我们要从服务器队列中拿取消息。由于它将异步推送消息,因此我们以对象的形式提供回调,该回调将缓冲消息,直到我们要使用它们。这就是DeliverCallback
子类的作用。
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
代码部分
package Demo;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
/**
* 消费者
*/
public class Consumer {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("114.55.34.91");
factory.setPort(5672);
factory.setUsername("root");
factory.setPassword("root");
Connection cOnnection= factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
// DeliverCallback缓冲服务器推送给我们的消息
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
}
}
在上面的教程中,我们学会了接受和发送消息,本章节对MQ的接收者消息进行处理。
当多个消费者对MQ存储消息进行接收,每个消费者都分配一条,到消息全部被消费。使用任务队列的优点之一是能够轻松并行化工作。如果我们正在积累积压的工作,我们可以添加更多的消费者来进行扩展。
默认情况下,RabbitMQ 将按顺序将每条消息发送给下一个消费者。平均而言,每个消费者将获得相同数量的消息。这种分发消息的方式称为轮循机制。将上文发送
代码改为,输入来进行轮询测试。
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 输入参数
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()) {
String message = scanner.next();
channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
System.out.println(" [x] Sent '" + message + "'");
}
创建2个线程来进行消费,我们可以直接在idea内创建2个线程来进行处理。
创建好2个消费者线程后,启动消费者线程进行消费监听,然后启动发送者发送消息来处理。
发送者:
消费者1:
消费者2:
结论:消息发送线程,发送消息时工作线程会轮询得到消息发送线程发送的消息,这是一种公平的策略,但是这种方式效率较低,在实际工作中一般采用不公平的策略。