Rabbit MQ
RabbitMQ是一个消息代理:它接受和转发消息。 你可以把它想象成一个邮局,RabbitMQ是邮箱,邮局和邮递员。
- Rabbit MQ
- Tutorial 1 - Hello Rabbit MQ
- Rabbit MQ安装
- Rabbit MQ Service运行
- 实现Publisher(Publisher往Broker发送消息)
- 实现Consumer(Consumer从Broker接收消息)
- 运行结果截图
几个重要概念
- 生产者(Producer):消息发送者即为生产者
- 消息队列(Queue):本质上是一个消息缓冲区,可以理解成邮箱,容量受限于主机内存和磁盘;多个生产者可以往同一个消息队列发消息,多个消费者可以从同一个消息队列接收消息
- 消费者(Consumer):消息接收者即为消费者
- 代理(Broker):中间人,它接受消息,转发消息
通常情况下,Producer,Consumer,Broker它们都不在同一台主机上。
Tutorial 1 - Hello Rabbit MQ
本例使用:rabbitmq_server-3.7.4, Intellij Idea; 并且Producer,Broker,Consumer都在统一电脑上。
(1)Broker:直接使用本机,Rabbit MQ Server安装(见下面安装部分)完成后,直接从开始菜单“RabbitMQ Service - start”启动服务。(2)实现Publisher,通过一个线程,每隔5s发送一条消息到Broker。(3)实现Consumer,从Broker接收消息。
Rabbit MQ安装
下载并安装Erlang
下载安装Rabbit MQ
注意它们俩之间的版本对应关系。
Rabbit MQ Service运行
安装完成之后,在开始菜单上有常用命令的快捷方式:
RabbitMQ Service - start
RabbitMQ Service - stop
RabbitMQ Command Prompt(sbin dir)
通过RabbitMQ Command Prompt(sbin dir)开启management plugin:rabbitmq-plugins enable rabbitmq_management
这样就可以通过http://localhost:15672使用RabbitMQ management
使用默认的user=guest,password=guest登录
想要了解RabbitMQ使用的相关端口,参见http://www.rabbitmq.com/install-windows.html页面。
通过 RabbitMQ Service - start启动服务,这就相当于把代理(broker)运行起来了,Producer就可以往它发送message,而Consumer也可以从它接收message。
实现Publisher(Publisher往Broker发送消息)
- 运行Intellij Idea 创建一个maven的quickstart类型的项目
POM.xml依赖如下
<dependencies>
<dependency>
<groupId>com.rabbitmqgroupId>
<artifactId>amqp-clientartifactId>
<version>5.2.0version>
dependency>
<dependency>
<groupId>org.slf4jgroupId>
<artifactId>slf4j-apiartifactId>
<version>1.7.25version>
dependency>
<dependency>
<groupId>org.slf4jgroupId>
<artifactId>slf4j-log4j12artifactId>
<version>1.7.25version>
dependency>
dependencies>
- 新加一个Send类和一个线程实现类用于发送消息
代码实现如下,详细步骤见代码注释
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class MySendTask implements Runnable {
private final static String QUEUE_NAME = "hello";
private Connection rbtMqConn;
private Channel rbtMqChnl;
private boolean isStop = false;
public void setIsStop(boolean stop){
this.isStop = stop;
}
@Override
public void run() {
try{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
rbtMqCOnn= factory.newConnection();
rbtMqChnl = rbtMqConn.createChannel();
rbtMqChnl.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello Rabbit MQ!";
while (!isStop){
rbtMqChnl.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + message + "'");
Thread.sleep(5000);
}
rbtMqChnl.close();
rbtMqConn.close();
}catch(Exception ex){
System.out.println(ex.getMessage());
}
System.out.println(" Send task stop");
}
}
public class Send {
public static void main(String[] argv) throws Exception {
MySendTask sendTask = new MySendTask();
Thread thread = new Thread(sendTask);
thread.start();
Thread.sleep(60000);
sendTask.setIsStop(true);
}
}
实现Consumer(Consumer从Broker接收消息)
和Publisher类似,Consumer也要打开链接和通道(channel),并且要和Publisher匹配。步骤见如下代码。
import com.rabbitmq.client.*;
import java.io.IOException;
public class Recv {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection cOnnection= factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
Consumer cOnsumer= new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
运行结果截图