热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

rabbitmq消费端代码获取队列名称_RabbitMQ教程

如果此教程对您有帮助,就请有钱的捧个钱场,没钱的捧个人场(转载分享)哦~RabbitMQ实战教程1.什么是MQ消息队列(MessageQueue

如果此教程对您有帮助,就请有钱的捧个钱场,没钱的捧个人场(转载分享)哦~

RabbitMQ实战教程

1.什么是MQ

消息队列(Message Queue,简称MQ),从字面意思上看,本质是个队列,FIFO先入先出,只不过队列中存放的内容是message而已。

其主要用途:不同进程Process/线程Thread之间通信。

为什么会产生消息队列?有几个原因:

不同进程(process)之间传递消息时,两个进程之间耦合程度过高,改动一个进程,引发必须修改另一个进程,为了隔离这两个进程,在两进程间抽离出一层(一个模块),所有两进程之间传递的消息,都必须通过消息队列来传递,单独修改某一个进程,不会影响另一个;

不同进程(process)之间传递消息时,为了实现标准化,将消息的格式规范化了,并且,某一个进程接受的消息太多,一下子无法处理完,并且也有先后顺序,必须对收到的消息进行排队,因此诞生了事实上的消息队列;

MQ框架非常之多,比较流行的有RabbitMq、ActiveMq、ZeroMq、kafka,以及阿里开源的RocketMQ。本文主要介绍RabbitMq。

2.RabbitMQ

2.1.RabbitMQ的简介

开发语言:Erlang – 面向并发的编程语言。

2.1.1.AMQP

AMQP是消息队列的一个协议。

2.2.官网

2.3.MQ的其他产品

2.4.学习5种队列

2.5.安装文档

3.搭建RabbitMQ环境

3.1.下载

3.2.windows下安装

3.2.2.安装RabbitMQ

安装完成。

开始菜单里出现如下选项:

启动、停止、重新安装等。

3.2.3.启用管理工具

1、双击

2、进入C:\Program Files (x86)\RabbitMQ Server\rabbitmq_server-3.4.1\sbin输入命令:

rabbitmq-plugins enable rabbitmq_management

这样就启动了管理工具,可以试一下命令:

停止:net stop RabbitMQ

启动:net start RabbitMQ

3、在浏览器中输入地址查看:http://127.0.0.1:15672/

4、使用默认账号登录:guest/ guest

3.3.Linux下安装

3.3.1.安装Erlang

3.3.2.添加yum支持

cd /usr/local/src/

mkdir rabbitmq

cd rabbitmq

使用yum安装:

sudo yum install erlang

3.3.3.安装RabbitMQ

上传rabbitmq-server-3.4.1-1.noarch.rpm文件到/usr/local/src/rabbitmq/

安装:

rpm -ivh rabbitmq-server-3.4.1-1.noarch.rpm

3.3.4.启动、停止

service rabbitmq-server start

service rabbitmq-server stop

service rabbitmq-server restart

3.3.5.设置开机启动

chkconfig rabbitmq-server on

3.3.6.设置配置文件

cd /etc/rabbitmq

cp /usr/share/doc/rabbitmq-server-3.4.1/rabbitmq.config.example /etc/rabbitmq/

mv rabbitmq.config.example rabbitmq.config

3.3.7.开启用户远程访问

vi /etc/rabbitmq/rabbitmq.config

注意要去掉后面的逗号。

3.3.8.开启web界面管理工具

rabbitmq-plugins enable rabbitmq_management

service rabbitmq-server restart

3.3.9.防火墙开放15672端口

/sbin/iptables -I INPUT -p tcp --dport 15672 -j ACCEPT

/etc/rc.d/init.d/iptables save

3.4.安装的注意事项

1、推荐使用默认的安装路径

2、系统用户名必须是英文

Win10改名字非常麻烦,具体方法百度

3、计算机名必须是英文

4、系统的用户必须是管理员

如果安装失败应该如何解决:

1、重装系统 – 不推荐

2、将RabbitMQ安装到linux虚拟机中

a)推荐

3、使用别人安装好的RabbitMQ服务

a)只要给你开通一个账户即可。

b)使用公用的RabbitMQ服务,在192.168.50.22

c)推荐

常见错误:

3.5.安装完成后操作

1、系统服务中有RabbitMQ服务,停止、启动、重启

2、打开命令行工具

如果找不到命令行工具,直接cd到相应目录:

输入命令rabbitmq-plugins enable rabbitmq_management启用管理插件

查看管理页面

通过默认账户 guest/guest 登录

如果能够登录,说明安装成功。

4.添加用户

4.1.添加admin用户

4.2.用户角色

1、超级管理员(administrator)

可登陆管理控制台,可查看所有的信息,并且可以对用户,策略(policy)进行操作。

2、监控者(monitoring)

可登陆管理控制台,同时可以查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用情况等)

3、策略制定者(policymaker)

可登陆管理控制台, 同时可以对policy进行管理。但无法查看节点的相关信息(上图红框标识的部分)。

4、普通管理者(management)

仅可登陆管理控制台,无法看到节点信息,也无法对策略进行管理。

5、其他

无法登陆管理控制台,通常就是普通的生产者和消费者。

4.3.创建Virtual Hosts

选中Admin用户,设置权限:

看到权限已加:

4.4.管理界面中的功能

5.学习五种队列

5.1.导入my-rabbitmq项目

5.2.简单队列

5.2.1.图示

P:消息的生产者

C:消息的消费者

红色:队列

生产者将消息发送到队列,消费者从队列中获取消息。

5.2.2.导入RabbitMQ的客户端依赖

com.rabbitmq

amqp-client

3.4.1

5.2.3.获取MQ的连接 package com.zpc.rabbitmq.util;

import com.rabbitmq.client.ConnectionFactory;

import com.rabbitmq.client.Connection;

public class ConnectionUtil {

public static Connection getConnection() throws Exception {

//定义连接工厂

ConnectionFactory factory = new ConnectionFactory();

//设置服务地址

factory.setHost("localhost");

//端口

factory.setPort(5672);

//设置账号信息,用户名、密码、vhost

factory.setVirtualHost("testhost");

factory.setUsername("admin");

factory.setPassword("admin");

// 通过工程获取连接

Connection connection = factory.newConnection();

return connection;

}

}

5.2.4.生产者发送消息到队列 package com.zpc.rabbitmq.simple;

import com.zpc.rabbitmq.util.ConnectionUtil;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

public class Send {

private final static String QUEUE_NAME = "q_test_01";

public static void main(String[] argv) throws Exception {

// 获取到连接以及mq通道

Connection connection = ConnectionUtil.getConnection();

// 从连接中创建通道

Channel channel = connection.createChannel();

// 声明(创建)队列

channel.queueDeclare(QUEUE_NAME, false, false, false, null);

// 消息内容

String message = "Hello World!";

channel.basicPublish("", QUEUE_NAME, null, message.getBytes());

System.out.println(" [x] Sent '" + message + "'");

//关闭通道和连接

channel.close();

connection.close();

}

}

5.2.5.管理工具中查看消息

点击上面的队列名称,查询具体的队列中的信息:

5.2.6.消费者从队列中获取消息 package com.zpc.rabbitmq.simple;

import com.zpc.rabbitmq.util.ConnectionUtil;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.QueueingConsumer;

public class Recv {

private final static String QUEUE_NAME = "q_test_01";

public static void main(String[] argv) throws Exception {

// 获取到连接以及mq通道

Connection connection = ConnectionUtil.getConnection();

// 从连接中创建通道

Channel channel = connection.createChannel();

// 声明队列

channel.queueDeclare(QUEUE_NAME, false, false, false, null);

// 定义队列的消费者

QueueingConsumer consumer = new QueueingConsumer(channel);

// 监听队列

channel.basicConsume(QUEUE_NAME, true, consumer);

// 获取消息

while (true) {

QueueingConsumer.Delivery delivery = consumer.nextDelivery();

String message = new String(delivery.getBody());

System.out.println(" [x] Received '" + message + "'");

}

}

}

5.3.Work模式

5.3.1.图示

一个生产者、2个消费者。

一个消息只能被一个消费者获取。

5.3.2.消费者1 package com.zpc.rabbitmq.work;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.QueueingConsumer;

import com.zpc.rabbitmq.util.ConnectionUtil;

public class Recv {

private final static String QUEUE_NAME = "test_queue_work";

public static void main(String[] argv) throws Exception {

// 获取到连接以及mq通道

Connection connection = ConnectionUtil.getConnection();

Channel channel = connection.createChannel();

// 声明队列

channel.queueDeclare(QUEUE_NAME, false, false, false, null);

// 同一时刻服务器只会发一条消息给消费者

//channel.basicQos(1);

// 定义队列的消费者

QueueingConsumer consumer = new QueueingConsumer(channel);

// 监听队列,false表示手动返回完成状态,true表示自动

channel.basicConsume(QUEUE_NAME, true, consumer);

// 获取消息

while (true) {

QueueingConsumer.Delivery delivery = consumer.nextDelivery();

String message = new String(delivery.getBody());

System.out.println(" [y] Received '" + message + "'");

//休眠

Thread.sleep(10);

// 返回确认状态,注释掉表示使用自动确认模式

//channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

}

}

}

5.3.3.消费者2 package com.zpc.rabbitmq.work;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.QueueingConsumer;

import com.zpc.rabbitmq.util.ConnectionUtil;

public class Recv2 {

private final static String QUEUE_NAME = "test_queue_work";

public static void main(String[] argv) throws Exception {

// 获取到连接以及mq通道

Connection connection = ConnectionUtil.getConnection();

Channel channel = connection.createChannel();

// 声明队列

channel.queueDeclare(QUEUE_NAME, false, false, false, null);

// 同一时刻服务器只会发一条消息给消费者

//channel.basicQos(1);

// 定义队列的消费者

QueueingConsumer consumer = new QueueingConsumer(channel);

// 监听队列,false表示手动返回完成状态,true表示自动

channel.basicConsume(QUEUE_NAME, true, consumer);

// 获取消息

while (true) {

QueueingConsumer.Delivery delivery = consumer.nextDelivery();

String message = new String(delivery.getBody());

System.out.println(" [x] Received '" + message + "'");

// 休眠1秒

Thread.sleep(1000);

//下面这行注释掉表示使用自动确认模式

//channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

}

}

}

5.3.4.生产者

向队列中发送100条消息。 package com.zpc.rabbitmq.work;

import com.zpc.rabbitmq.util.ConnectionUtil;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

public class Send {

private final static String QUEUE_NAME = "test_queue_work";

public static void main(String[] argv) throws Exception {

// 获取到连接以及mq通道

Connection connection = ConnectionUtil.getConnection();

Channel channel = connection.createChannel();

// 声明队列

channel.queueDeclare(QUEUE_NAME, false, false, false, null);

for (int i &#61; 0; i <100; i&#43;&#43;) {

// 消息内容

String message &#61; "" &#43; i;

channel.basicPublish("", QUEUE_NAME, null, message.getBytes());

System.out.println(" [x] Sent &#39;" &#43; message &#43; "&#39;");

Thread.sleep(i * 10);

}

channel.close();

connection.close();

}

}

5.3.5.测试

测试结果&#xff1a;

1、消费者1和消费者2获取到的消息内容是不同的&#xff0c;同一个消息只能被一个消费者获取。

2、消费者1和消费者2获取到的消息的数量是相同的&#xff0c;一个是消费奇数号消息&#xff0c;一个是偶数。

其实&#xff0c;这样是不合理的&#xff0c;因为消费者1线程停顿的时间短。应该是消费者1要比消费者2获取到的消息多才对。

RabbitMQ 默认将消息顺序发送给下一个消费者&#xff0c;这样&#xff0c;每个消费者会得到相同数量的消息。即轮询(round-robin)分发消息。

怎样才能做到按照每个消费者的能力分配消息呢&#xff1f;联合使用 Qos 和 Acknowledge 就可以做到。

basicQos 方法设置了当前信道最大预获取(prefetch)消息数量为1。消息从队列异步推送给消费者&#xff0c;消费者的 ack 也是异步发送给队列&#xff0c;从队列的视角去看&#xff0c;总是会有一批消息已推送但尚未获得 ack 确认&#xff0c;Qos 的 prefetchCount 参数就是用来限制这批未确认消息数量的。设为1时&#xff0c;队列只有在收到消费者发回的上一条消息 ack 确认后&#xff0c;才会向该消费者发送下一条消息。prefetchCount 的默认值为0&#xff0c;即没有限制&#xff0c;队列会将所有消息尽快发给消费者。

2个概念

轮询分发 &#xff1a;使用任务队列的优点之一就是可以轻易的并行工作。如果我们积压了好多工作&#xff0c;我们可以通过增加工作者(消费者)来解决这一问题&#xff0c;使得系统的伸缩性更加容易。在默认情况下&#xff0c;RabbitMQ将逐个发送消息到在序列中的下一个消费者(而不考虑每个任务的时长等等&#xff0c;且是提前一次性分配&#xff0c;并非一个一个分配)。平均每个消费者获得相同数量的消息。这种方式分发消息机制称为Round-Robin(轮询)。

公平分发 &#xff1a;虽然上面的分配法方式也还行&#xff0c;但是有个问题就是&#xff1a;比如&#xff1a;现在有2个消费者&#xff0c;所有的奇数的消息都是繁忙的&#xff0c;而偶数则是轻松的。按照轮询的方式&#xff0c;奇数的任务交给了第一个消费者&#xff0c;所以一直在忙个不停。偶数的任务交给另一个消费者&#xff0c;则立即完成任务&#xff0c;然后闲得不行。而RabbitMQ则是不了解这些的。这是因为当消息进入队列&#xff0c;RabbitMQ就会分派消息。它不看消费者为应答的数目&#xff0c;只是盲目的将消息发给轮询指定的消费者。

为了解决这个问题&#xff0c;我们使用basicQos( prefetchCount &#61; 1)方法&#xff0c;来限制RabbitMQ只发不超过1条的消息给同一个消费者。当消息处理完毕后&#xff0c;有了反馈&#xff0c;才会进行第二次发送。

还有一点需要注意&#xff0c;使用公平分发&#xff0c;必须关闭自动应答&#xff0c;改为手动应答。

5.4.Work模式的“能者多劳”

打开上述代码的注释&#xff1a; // 同一时刻服务器只会发一条消息给消费者

channel.basicQos(1); //开启这行 表示使用手动确认模式

channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

同时改为手动确认&#xff1a; // 监听队列&#xff0c;false表示手动返回完成状态&#xff0c;true表示自动

channel.basicConsume(QUEUE_NAME, false, consumer);

测试&#xff1a;

消费者1比消费者2获取的消息更多。

5.5.消息的确认模式

消费者从队列中获取消息&#xff0c;服务端如何知道消息已经被消费呢&#xff1f;

模式1&#xff1a;自动确认

只要消息从队列中获取&#xff0c;无论消费者获取到消息后是否成功消息&#xff0c;都认为是消息已经成功消费。

模式2&#xff1a;手动确认

消费者从队列中获取消息后&#xff0c;服务器会将该消息标记为不可用状态&#xff0c;等待消费者的反馈&#xff0c;如果消费者一直没有反馈&#xff0c;那么该消息将一直处于不可用状态。

手动模式&#xff1a;

自动模式&#xff1a;

5.6.订阅模式

5.6.1.图示

解读&#xff1a;

1、1个生产者&#xff0c;多个消费者

2、每一个消费者都有自己的一个队列

3、生产者没有将消息直接发送到队列&#xff0c;而是发送到了交换机

4、每个队列都要绑定到交换机

5、生产者发送的消息&#xff0c;经过交换机&#xff0c;到达队列&#xff0c;实现&#xff0c;一个消息被多个消费者获取的目的

注意&#xff1a;一个消费者队列可以有多个消费者实例&#xff0c;只有其中一个消费者实例会消费

5.6.2.消息的生产者(看作是后台系统)

向交换机中发送消息。 package com.zpc.rabbitmq.subscribe;

import com.zpc.rabbitmq.util.ConnectionUtil;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

public class Send {

private final static String EXCHANGE_NAME &#61; "test_exchange_fanout";

public static void main(String[] argv) throws Exception {

// 获取到连接以及mq通道

Connection connection &#61; ConnectionUtil.getConnection();

Channel channel &#61; connection.createChannel();

// 声明exchange

channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

// 消息内容

String message &#61; "Hello World!";

channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());

System.out.println(" [x] Sent &#39;" &#43; message &#43; "&#39;");

channel.close();

connection.close();

}

}

注意&#xff1a;消息发送到没有队列绑定的交换机时&#xff0c;消息将丢失&#xff0c;因为&#xff0c;交换机没有存储消息的能力&#xff0c;消息只能存在在队列中。

5.6.3.消费者1(看作是前台系统) package com.zpc.rabbitmq.subscribe;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.QueueingConsumer;

import com.zpc.rabbitmq.util.ConnectionUtil;

public class Recv {

private final static String QUEUE_NAME &#61; "test_queue_work1";

private final static String EXCHANGE_NAME &#61; "test_exchange_fanout";

public static void main(String[] argv) throws Exception {

// 获取到连接以及mq通道

Connection connection &#61; ConnectionUtil.getConnection();

Channel channel &#61; connection.createChannel();

// 声明队列

channel.queueDeclare(QUEUE_NAME, false, false, false, null);

// 绑定队列到交换机

channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");

// 同一时刻服务器只会发一条消息给消费者

channel.basicQos(1);

// 定义队列的消费者

QueueingConsumer consumer &#61; new QueueingConsumer(channel);

// 监听队列&#xff0c;手动返回完成

channel.basicConsume(QUEUE_NAME, false, consumer);

// 获取消息

while (true) {

QueueingConsumer.Delivery delivery &#61; consumer.nextDelivery();

String message &#61; new String(delivery.getBody());

System.out.println(" [Recv] Received &#39;" &#43; message &#43; "&#39;");

Thread.sleep(10);

channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

}

}

}

5.6.4.消费者2(看作是搜索系统) package com.zpc.rabbitmq.subscribe;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.QueueingConsumer;

import com.zpc.rabbitmq.util.ConnectionUtil;

public class Recv2 {

private final static String QUEUE_NAME &#61; "test_queue_work2";

private final static String EXCHANGE_NAME &#61; "test_exchange_fanout";

public static void main(String[] argv) throws Exception {

// 获取到连接以及mq通道

Connection connection &#61; ConnectionUtil.getConnection();

Channel channel &#61; connection.createChannel();

// 声明队列

channel.queueDeclare(QUEUE_NAME, false, false, false, null);

// 绑定队列到交换机

channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");

// 同一时刻服务器只会发一条消息给消费者

channel.basicQos(1);

// 定义队列的消费者

QueueingConsumer consumer &#61; new QueueingConsumer(channel);

// 监听队列&#xff0c;手动返回完成

channel.basicConsume(QUEUE_NAME, false, consumer);

// 获取消息

while (true) {

QueueingConsumer.Delivery delivery &#61; consumer.nextDelivery();

String message &#61; new String(delivery.getBody());

System.out.println(" [Recv2] Received &#39;" &#43; message &#43; "&#39;");

Thread.sleep(10);

channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

}

}

}

5.6.5.测试

测试结果&#xff1a;

同一个消息被多个消费者获取。一个消费者队列可以有多个消费者实例&#xff0c;只有其中一个消费者实例会消费到消息。

在管理工具中查看队列和交换机的绑定关系&#xff1a;

5.7.路由模式

5.7.1.图示

5.7.2.生产者

5.7.3.消费者1(假设是前台系统)

5.7.4.消费2(假设是搜索系统)

5.8.主题模式(通配符模式)

5.8.1.图示

同一个消息被多个消费者获取。一个消费者队列可以有多个消费者实例&#xff0c;只有其中一个消费者实例会消费到消息。

5.8.2.生产者 package com.zpc.rabbitmq.topic;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.zpc.rabbitmq.util.ConnectionUtil;

public class Send {

private final static String EXCHANGE_NAME &#61; "test_exchange_topic";

public static void main(String[] argv) throws Exception {

// 获取到连接以及mq通道

Connection connection &#61; ConnectionUtil.getConnection();

Channel channel &#61; connection.createChannel();

// 声明exchange

channel.exchangeDeclare(EXCHANGE_NAME, "topic");

// 消息内容

String message &#61; "Hello World!!";

channel.basicPublish(EXCHANGE_NAME, "routekey.1", null, message.getBytes());

System.out.println(" [x] Sent &#39;" &#43; message &#43; "&#39;");

channel.close();

connection.close();

}

}

5.8.3.消费者1(前台系统) package com.zpc.rabbitmq.topic;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.QueueingConsumer;

import com.zpc.rabbitmq.util.ConnectionUtil;

public class Recv {

private final static String QUEUE_NAME &#61; "test_queue_topic_work_1";

private final static String EXCHANGE_NAME &#61; "test_exchange_topic";

public static void main(String[] argv) throws Exception {

// 获取到连接以及mq通道

Connection connection &#61; ConnectionUtil.getConnection();

Channel channel &#61; connection.createChannel();

// 声明队列

channel.queueDeclare(QUEUE_NAME, false, false, false, null);

// 绑定队列到交换机

channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "routekey.*");

// 同一时刻服务器只会发一条消息给消费者

channel.basicQos(1);

// 定义队列的消费者

QueueingConsumer consumer &#61; new QueueingConsumer(channel);

// 监听队列&#xff0c;手动返回完成

channel.basicConsume(QUEUE_NAME, false, consumer);

// 获取消息

while (true) {

QueueingConsumer.Delivery delivery &#61; consumer.nextDelivery();

String message &#61; new String(delivery.getBody());

System.out.println(" [Recv_x] Received &#39;" &#43; message &#43; "&#39;");

Thread.sleep(10);

channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

}

}

}

5.8.4.消费者2(搜索系统) package com.zpc.rabbitmq.topic;

import com.zpc.rabbitmq.util.ConnectionUtil;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.QueueingConsumer;

public class Recv2 {

private final static String QUEUE_NAME &#61; "test_queue_topic_work_2";

private final static String EXCHANGE_NAME &#61; "test_exchange_topic";

public static void main(String[] argv) throws Exception {

// 获取到连接以及mq通道

Connection connection &#61; ConnectionUtil.getConnection();

Channel channel &#61; connection.createChannel();

// 声明队列

channel.queueDeclare(QUEUE_NAME, false, false, false, null);

// 绑定队列到交换机

channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.*");

// 同一时刻服务器只会发一条消息给消费者

channel.basicQos(1);

// 定义队列的消费者

QueueingConsumer consumer &#61; new QueueingConsumer(channel);

// 监听队列&#xff0c;手动返回完成

channel.basicConsume(QUEUE_NAME, false, consumer);

// 获取消息

while (true) {

QueueingConsumer.Delivery delivery &#61; consumer.nextDelivery();

String message &#61; new String(delivery.getBody());

System.out.println(" [Recv2_x] Received &#39;" &#43; message &#43; "&#39;");

Thread.sleep(10);

channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

}

}

}

6.Spring-Rabbit

6.1.Spring项目

6.2.简介

6.3.使用

6.3.1.消费者 package com.zpc.rabbitmq.spring;

/**

* 消费者

*

* &#64;author Evan

*/

public class Foo {

//具体执行业务的方法

public void listen(String foo) {

System.out.println("\n消费者&#xff1a; " &#43; foo &#43; "\n");

}

}

6.3.2.生产者 package com.zpc.rabbitmq.spring;

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.context.support.AbstractApplicationContext;

import org.springframework.context.support.ClassPathXmlApplicationContext;

public class SpringMain {

public static void main(final String... args) throws Exception {

AbstractApplicationContext ctx &#61; new ClassPathXmlApplicationContext(

"classpath:spring/rabbitmq-context.xml");

//RabbitMQ模板

RabbitTemplate template &#61; ctx.getBean(RabbitTemplate.class);

//发送消息

template.convertAndSend("Hello, 鸟鹏!");

Thread.sleep(1000);// 休眠1秒

ctx.destroy(); //容器销毁

}

}

6.3.3.配置文件

1、定义连接工厂

host&#61;"127.0.0.1" port&#61;"5672" username&#61;"admin" password&#61;"admin"

virtual-host&#61;"testhost" />

2、定义模板(可以指定交换机或队列)

3、定义队列、交换机、以及完成队列和交换机的绑定

4、定义监听

5、定义管理&#xff0c;用于管理队列、交换机等&#xff1a;

完整配置文件rabbitmq-context.xml

xmlns:xsi&#61;"http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit&#61;"http://www.springframework.org/schema/rabbit"

xsi:schemaLocation&#61;"http://www.springframework.org/schema/rabbit

http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd

http://www.springframework.org/schema/beans

http://www.springframework.org/schema/beans/spring-beans-4.1.xsd">

host&#61;"127.0.0.1" port&#61;"5672" username&#61;"admin" password&#61;"admin"

virtual-host&#61;"testhost" />

6.4.持久化交换机和队列

持久化&#xff1a;将交换机或队列的数据保存到磁盘&#xff0c;服务器宕机或重启之后依然存在。

非持久化&#xff1a;将交换机或队列的数据保存到内存&#xff0c;服务器宕机或重启之后将不存在。

非持久化的性能高于持久化。

如何选择持久化&#xff1f;非持久化&#xff1f; – 看需求。

7.Spring集成RabbitMQ一个完整案例

7.1.在A系统中发送消息到交换机

7.1.1.导入依赖

xsi:schemaLocation&#61;"http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

4.0.0

com.zpc

myrabbitA

0.0.1-SNAPSHOT

jar

myrabbit

org.springframework.amqp

spring-rabbit

1.4.0.RELEASE

com.alibaba

fastjson

1.2.47

7.1.2.队列和交换机的绑定关系

实现&#xff1a;

1、在配置文件中将队列和交换机完成绑定

2、可以在管理界面中完成绑定

a)绑定关系如果发生变化&#xff0c;需要修改配置文件&#xff0c;并且服务需要重启

b)管理更加灵活

c)更容易对绑定关系的权限管理&#xff0c;流程管理

本例选择第2种方式

7.1.3.配置

rabbitmq-context.xml

xmlns:xsi&#61;"http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit&#61;"http://www.springframework.org/schema/rabbit"

xsi:schemaLocation&#61;"http://www.springframework.org/schema/rabbit

http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd

http://www.springframework.org/schema/beans

http://www.springframework.org/schema/beans/spring-beans-4.1.xsd">

host&#61;"127.0.0.1" port&#61;"5672" username&#61;"admin" password&#61;"admin"

virtual-host&#61;"testhost" />

7.1.4.消息内容

方案&#xff1a;

1、消息内容使用对象做json序列化发送

a)数据大

b)有些数据其他人是可能用不到的

2、发送特定的业务字段&#xff0c;如id、操作类型

7.1.5.实现

生产者MsgSender.java&#xff1a; package com.zpc.myrabbit;

import com.alibaba.fastjson.JSON;

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.context.support.AbstractApplicationContext;

import org.springframework.context.support.ClassPathXmlApplicationContext;

import java.text.SimpleDateFormat;

import java.util.Date;

import java.util.HashMap;

import java.util.Map;

/**

* 消息生产者

*/

public class MsgSender {

public static void main(String[] args) throws Exception {

AbstractApplicationContext ctx &#61; new ClassPathXmlApplicationContext(

"classpath:spring/rabbitmq-context.xml");

//RabbitMQ模板

RabbitTemplate template &#61; ctx.getBean(RabbitTemplate.class);

String date &#61; new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());//24小时制

//发送消息

Map msg &#61; new HashMap();

msg.put("type", "1");

msg.put("date", date);

template.convertAndSend("type2", JSON.toJSONString(msg));

Thread.sleep(1000);// 休眠1秒

ctx.destroy(); //容器销毁

}

}

7.2.在B系统接收消息

7.2.1.导入依赖

xsi:schemaLocation&#61;"http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

4.0.0

com.zpc

myrabbitB

0.0.1-SNAPSHOT

war

myrabbit

4.1.3.RELEASE

1.2.46

com.rabbitmq

amqp-client

3.4.1

org.springframework.amqp

spring-rabbit

1.4.0.RELEASE

org.springframework

spring-webmvc

${spring.version}

com.alibaba

fastjson

1.2.47

${project.artifactId}

org.apache.tomcat.maven

tomcat7-maven-plugin

/testRabbit

UTF-8

8081

7.2.2.配置

xmlns:xsi&#61;"http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit&#61;"http://www.springframework.org/schema/rabbit"

xsi:schemaLocation&#61;"http://www.springframework.org/schema/rabbit

http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd

http://www.springframework.org/schema/beans

http://www.springframework.org/schema/beans/spring-beans-4.1.xsd">

host&#61;"127.0.0.1" port&#61;"5672" username&#61;"admin" password&#61;"admin"

virtual-host&#61;"testhost" />

7.2.3.具体处理逻辑 public class Listener {

//具体执行业务的方法

public void listen(String msg) {

System.out.println("\n消费者B开始处理消息&#xff1a; " &#43; msg &#43; "\n");

}

}

7.2.4.在界面管理工具中完成绑定关系

选中定义好的交换机(exchange)

1)direct

2)fanout

3)topic

7.3.在C系统中接收消息

(和B系统配置差不多&#xff0c;无非是Q名和Q对应的处理逻辑变了)

7.3.1.配置

xmlns:xsi&#61;"http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit&#61;"http://www.springframework.org/schema/rabbit"

xsi:schemaLocation&#61;"http://www.springframework.org/schema/rabbit

http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd

http://www.springframework.org/schema/beans

http://www.springframework.org/schema/beans/spring-beans-4.1.xsd">

host&#61;"127.0.0.1" port&#61;"5672" username&#61;"admin" password&#61;"admin"

virtual-host&#61;"testhost" />

7.3.2.处理业务逻辑 public class Listener {

//具体执行业务的方法

public void listen(String msg) {

System.out.println("\n消费者C开始处理消息&#xff1a; " &#43; msg &#43; "\n");

}

}

7.3.3.在管理工具中绑定队列和交换机

见7.2.4

7.3.4.测试

分别启动B,C两个web应用&#xff0c;然后运行A的MsgSender主方法发送消息&#xff0c;分别测试fanout、direct、topic三种类型

8.Springboot集成RabbitMQ

springboot集成RabbitMQ非常简单&#xff0c;如果只是简单的使用配置非常少&#xff0c;springboot提供了spring-boot-starter-amqp对消息各种支持。

代码下载地址&#xff1a;https://download.csdn.net/download/zpcandzhj/10585077

8.1.简单队列

1、配置pom文件&#xff0c;主要是添加spring-boot-starter-amqp的支持

org.springframework.boot

spring-boot-starter-amqp

2、配置application.properties文件

配置rabbitmq的安装地址、端口以及账户信息 spring.application.name&#61;spirng-boot-rabbitmq

spring.rabbitmq.host&#61;127.0.0.1

spring.rabbitmq.port&#61;5672

spring.rabbitmq.username&#61;admin

spring.rabbitmq.password&#61;admin

3、配置队列 package com.zpc.rabbitmq;

import org.springframework.amqp.core.Queue;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

&#64;Configuration

public class RabbitConfig {

&#64;Bean

public Queue queue() {

return new Queue("q_hello");

}

}

4、发送者 package com.zpc.rabbitmq;

import org.springframework.amqp.core.AmqpTemplate;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Component;

import java.text.SimpleDateFormat;

import java.util.Date;

&#64;Component

public class HelloSender {

&#64;Autowired

private AmqpTemplate rabbitTemplate;

public void send() {

String date &#61; new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());//24小时制

String context &#61; "hello " &#43; date;

System.out.println("Sender : " &#43; context);

//简单对列的情况下routingKey即为Q名

this.rabbitTemplate.convertAndSend("q_hello", context);

}

}

5、接收者 package com.zpc.rabbitmq;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;

import org.springframework.amqp.rabbit.annotation.RabbitListener;

import org.springframework.stereotype.Component;

&#64;Component

&#64;RabbitListener(queues &#61; "q_hello")

public class HelloReceiver {

&#64;RabbitHandler

public void process(String hello) {

System.out.println("Receiver : " &#43; hello);

}

}

6、测试 package com.zpc.rabbitmq;

import org.junit.Test;

import org.junit.runner.RunWith;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.boot.test.context.SpringBootTest;

import org.springframework.test.context.junit4.SpringRunner;

&#64;RunWith(SpringRunner.class)

&#64;SpringBootTest

public class RabbitMqHelloTest {

&#64;Autowired

private HelloSender helloSender;

&#64;Test

public void hello() throws Exception {

helloSender.send();

}

}

8.2.多对多使用(Work模式)

注册两个Receiver: package com.zpc.rabbitmq;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;

import org.springframework.amqp.rabbit.annotation.RabbitListener;

import org.springframework.stereotype.Component;

&#64;Component

&#64;RabbitListener(queues &#61; "q_hello")

public class HelloReceiver2 {

&#64;RabbitHandler

public void process(String hello) {

System.out.println("Receiver2 : " &#43; hello);

}

} &#64;Test

public void oneToMany() throws Exception {

for (int i&#61;0;i<100;i&#43;&#43;){

helloSender.send(i);

Thread.sleep(300);

}

} public void send(int i) {

String date &#61; new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());//24小时制

String context &#61; "hello " &#43; i &#43; " " &#43; date;

System.out.println("Sender : " &#43; context);

//简单对列的情况下routingKey即为Q名

this.rabbitTemplate.convertAndSend("q_hello", context);

}

8.3.Topic Exchange(主题模式)

topic 是RabbitMQ中最灵活的一种方式&#xff0c;可以根据routing_key自由的绑定不同的队列

首先对topic规则配置&#xff0c;这里使用两个队列(消费者)来演示。

1)配置队列&#xff0c;绑定交换机 package com.zpc.rabbitmq.topic;

import org.springframework.amqp.core.Binding;

import org.springframework.amqp.core.BindingBuilder;

import org.springframework.amqp.core.Queue;

import org.springframework.amqp.core.TopicExchange;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

&#64;Configuration

public class TopicRabbitConfig {

final static String message &#61; "q_topic_message";

final static String messages &#61; "q_topic_messages";

&#64;Bean

public Queue queueMessage() {

return new Queue(TopicRabbitConfig.message);

}

&#64;Bean

public Queue queueMessages() {

return new Queue(TopicRabbitConfig.messages);

}

/**

* 声明一个Topic类型的交换机

* &#64;return

*/

&#64;Bean

TopicExchange exchange() {

return new TopicExchange("mybootexchange");

}

/**

* 绑定Q到交换机,并且指定routingKey

* &#64;param queueMessage

* &#64;param exchange

* &#64;return

*/

&#64;Bean

Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) {

return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message");

}

&#64;Bean

Binding bindingExchangeMessages(Queue queueMessages, TopicExchange exchange) {

return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");

}

}

2)创建2个消费者

q_topic_message 和q_topic_messages package com.zpc.rabbitmq.topic;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;

import org.springframework.amqp.rabbit.annotation.RabbitListener;

import org.springframework.stereotype.Component;

&#64;Component

&#64;RabbitListener(queues &#61; "q_topic_message")

public class Receiver1 {

&#64;RabbitHandler

public void process(String hello) {

System.out.println("Receiver1 : " &#43; hello);

}

} package com.zpc.rabbitmq.topic;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;

import org.springframework.amqp.rabbit.annotation.RabbitListener;

import org.springframework.stereotype.Component;

&#64;Component

&#64;RabbitListener(queues &#61; "q_topic_messages")

public class Receiver2 {

&#64;RabbitHandler

public void process(String hello) {

System.out.println("Receiver2 : " &#43; hello);

}

}

3)消息发送者(生产者) package com.zpc.rabbitmq.topic;

import org.springframework.amqp.core.AmqpTemplate;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Component;

&#64;Component

public class MsgSender {

&#64;Autowired

private AmqpTemplate rabbitTemplate;

public void send1() {

String context &#61; "hi, i am message 1";

System.out.println("Sender : " &#43; context);

this.rabbitTemplate.convertAndSend("mybootexchange", "topic.message", context);

}

public void send2() {

String context &#61; "hi, i am messages 2";

System.out.println("Sender : " &#43; context);

this.rabbitTemplate.convertAndSend("mybootexchange", "topic.messages", context);

}

}

send1方法会匹配到topic.#和topic.message&#xff0c;两个Receiver都可以收到消息&#xff0c;发送send2只有topic.#可以匹配所有只有Receiver2监听到消息。

4)测试 package com.zpc.rabbitmq.topic;

import org.junit.Test;

import org.junit.runner.RunWith;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.boot.test.context.SpringBootTest;

import org.springframework.test.context.junit4.SpringRunner;

&#64;RunWith(SpringRunner.class)

&#64;SpringBootTest

public class RabbitTopicTest {

&#64;Autowired

private MsgSender msgSender;

&#64;Test

public void send1() throws Exception {

msgSender.send1();

}

&#64;Test

public void send2() throws Exception {

msgSender.send2();

}

}

8.4.Fanout Exchange(订阅模式)

Fanout 就是我们熟悉的广播模式或者订阅模式&#xff0c;给Fanout交换机发送消息&#xff0c;绑定了这个交换机的所有队列都收到这个消息。

1)配置队列&#xff0c;绑定交换机 package com.zpc.rabbitmq.fanout;

import org.springframework.amqp.core.Binding;

import org.springframework.amqp.core.BindingBuilder;

import org.springframework.amqp.core.FanoutExchange;

import org.springframework.amqp.core.Queue;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

&#64;Configuration

public class FanoutRabbitConfig {

&#64;Bean

public Queue aMessage() {

return new Queue("q_fanout_A");

}

&#64;Bean

public Queue bMessage() {

return new Queue("q_fanout_B");

}

&#64;Bean

public Queue cMessage() {

return new Queue("q_fanout_C");

}

&#64;Bean

FanoutExchange fanoutExchange() {

return new FanoutExchange("mybootfanoutExchange");

}

&#64;Bean

Binding bindingExchangeA(Queue aMessage, FanoutExchange fanoutExchange) {

return BindingBuilder.bind(aMessage).to(fanoutExchange);

}

&#64;Bean

Binding bindingExchangeB(Queue bMessage, FanoutExchange fanoutExchange) {

return BindingBuilder.bind(bMessage).to(fanoutExchange);

}

&#64;Bean

Binding bindingExchangeC(Queue cMessage, FanoutExchange fanoutExchange) {

return BindingBuilder.bind(cMessage).to(fanoutExchange);

}

}

2)创建3个消费者 package com.zpc.rabbitmq.fanout;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;

import org.springframework.amqp.rabbit.annotation.RabbitListener;

import org.springframework.stereotype.Component;

&#64;Component

&#64;RabbitListener(queues &#61; "q_fanout_A")

public class ReceiverA {

&#64;RabbitHandler

public void process(String hello) {

System.out.println("AReceiver : " &#43; hello &#43; "/n");

}

} package com.zpc.rabbitmq.fanout;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;

import org.springframework.amqp.rabbit.annotation.RabbitListener;

import org.springframework.stereotype.Component;

&#64;Component

&#64;RabbitListener(queues &#61; "q_fanout_B")

public class ReceiverB {

&#64;RabbitHandler

public void process(String hello) {

System.out.println("BReceiver : " &#43; hello &#43; "/n");

}

} package com.zpc.rabbitmq.fanout;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;

import org.springframework.amqp.rabbit.annotation.RabbitListener;

import org.springframework.stereotype.Component;

&#64;Component

&#64;RabbitListener(queues &#61; "q_fanout_C")

public class ReceiverC {

&#64;RabbitHandler

public void process(String hello) {

System.out.println("CReceiver : " &#43; hello &#43; "/n");

}

}

3)生产者 package com.zpc.rabbitmq.fanout;

import org.springframework.amqp.core.AmqpTemplate;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Component;

&#64;Component

public class MsgSenderFanout {

&#64;Autowired

private AmqpTemplate rabbitTemplate;

public void send() {

String context &#61; "hi, fanout msg ";

System.out.println("Sender : " &#43; context);

this.rabbitTemplate.convertAndSend("mybootfanoutExchange","", context);

}

}

4)测试 package com.zpc.rabbitmq.fanout;

import org.junit.Test;

import org.junit.runner.RunWith;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.boot.test.context.SpringBootTest;

import org.springframework.test.context.junit4.SpringRunner;

&#64;RunWith(SpringRunner.class)

&#64;SpringBootTest

public class RabbitFanoutTest {

&#64;Autowired

private MsgSenderFanout msgSender;

&#64;Test

public void send1() throws Exception {

msgSender.send();

}

}

结果如下&#xff0c;三个消费者都收到消息&#xff1a;

AReceiver : hi, fanout msg

CReceiver : hi, fanout msg

BReceiver : hi, fanout msg

9.总结

使用MQ实现商品数据的同步优势&#xff1a;

1、降低系统间耦合度

2、便于管理数据的同步(数据一致性)

欢迎打赏&#xff01;



推荐阅读
  • 本文探讨了Web开发与游戏开发之间的主要区别,旨在帮助开发者更好地理解两种开发领域的特性和需求。文章基于作者的实际经验和网络资料整理而成。 ... [详细]
  • 深入剖析JVM垃圾回收机制
    本文详细探讨了Java虚拟机(JVM)中的垃圾回收机制,包括其意义、对象判定方法、引用类型、常见垃圾收集算法以及各种垃圾收集器的特点和工作原理。通过理解这些内容,开发人员可以更好地优化内存管理和程序性能。 ... [详细]
  • 本文详细探讨了Java中Volatile关键字的工作原理、优化技巧及其在实际开发中的应用场景,特别是在提高多线程环境下数据可见性和减少锁竞争方面的优势。 ... [详细]
  • 本文介绍了一个项目中如何在Windows平台上实现多声道音频数据的采集,特别是针对DANTE音频接口的8路立体声音频通道。文章详细描述了使用Windows底层音频API进行音频采集的方法,并提供了一个具体的实现示例。 ... [详细]
  • Python编码入门指南
    本文探讨了使用Python进行网络设备连通性测试的简便性,特别是针对IP地址范围为192.168.0.101至192.168.0.200的设备。通过比较Python与Shell、Java等语言,展示了Python在执行此类任务时的优势。 ... [详细]
  • 本文探讨了如何通过一系列技术手段提升Spring Boot项目的并发处理能力,解决生产环境中因慢请求导致的系统性能下降问题。 ... [详细]
  • 本文详细介绍了ActivityManagerService (AMS) 的工作原理及其在Android系统中的重要角色。AMS作为system_server进程的一部分,在系统启动时加载,负责管理和协调应用程序中的Activity和服务(Service)。文章将通过具体的接口图和通信流程,帮助读者更好地理解AMS的工作机制。 ... [详细]
  • Spring Security核心概念与应用实践
    本文详细介绍了Spring Security的核心机制,包括其作为一系列过滤器的工作原理,如何实现用户认证与授权,以及常见的配置方法和高级特性如CSRF防护。 ... [详细]
  • 本文介绍了 Windows 10 系统中 Internet Explorer 浏览器的版本选择与配置方法,包括如何区分32位和64位IE浏览器以及如何设置默认使用的版本。 ... [详细]
  • 解决ASP.NET Core在IIS中出现的502.5进程失败错误
    本文详细探讨了在Windows Server 2012环境下安装.NET Core后,IIS站点出现502.5错误的原因及解决方案,包括重启服务和系统的方法。 ... [详细]
  • 本文介绍如何创建一个专门用于处理浮点数的JSON处理器,并将其注册到JSON配置器中,以实现对浮点数的精确控制和格式化输出。 ... [详细]
  • Elasticsearch集群构建指南:本地环境搭建与管理
    本文详细介绍了如何在本地环境中搭建Elasticsearch集群,包括节点配置、主节点选举机制、以及如何通过单播和广播方式增加节点。同时,文章还探讨了集群的高可用性和扩展性,以及如何通过配置防止脑裂现象的发生。 ... [详细]
  • 作为140字符的开创者,Twitter看似简单却异常复杂。其简洁之处在于仅用140个字符就能实现信息的高效传播,甚至在多次全球性事件中超越传统媒体的速度。然而,为了支持2亿用户的高效使用,其背后的技术架构和系统设计则极为复杂,涉及高并发处理、数据存储和实时传输等多个技术挑战。 ... [详细]
  • 本文深入解析了 Golang 中的异步 API,并通过具体的应用实例展示了其强大功能。文章不仅探讨了 Golang 和 Erlang 在并行处理方面的核心理念,还详细介绍了如何利用通道(channel)和 goroutine 实现高效的并发编程。例如,通过 `ch := make(chan int)` 创建通道,并使用 `go func(ch chan int)` 启动 goroutine 来处理异步任务。此外,文章还提供了多个实际案例,帮助读者更好地理解和应用这些概念。 ... [详细]
  • 在 CentOS 7 上部署和配置 RabbitMQ 消息队列系统时,首先需要安装 Erlang,因为 RabbitMQ 是基于 Erlang 语言开发的。具体步骤包括:安装必要的依赖项,下载 Erlang 源码包(可能需要一些时间,请耐心等待),解压源码包,解决可能出现的错误,验证安装是否成功,并将 Erlang 添加到环境变量中。接下来,下载 RabbitMQ 的 tar.xz 压缩包,并进行解压和安装。确保每一步都按顺序执行,以保证系统的稳定性和可靠性。 ... [详细]
author-avatar
laomeng爱家人_465
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有