官网
轻松搞定RabbitMQ
RabbitMQ的应用场景以及基本原理介绍
springboot+rabbitmq整合示例程
MQ
(Message Queue
,消息队列)是一个开源的消息代理和队列服务器,用来通过普通协议在完全不同的应用之间共享数据。是通过读写出入队列的消息来通信(RPC
则是通过直接调用彼此来通信的)。
RabbitMQ
是一个开源的AMQP
实现,服务器端用Erlang
语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP
等,支持AJAX
。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。
AMQP
协议,即Advanced Message Queuing Protocol
,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。AMQP
的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。
Exchange
:消息交换机,它指定消息按什么规则,路由到哪个队列。Queue
:消息的载体,每个消息都会被投到一个或多个队列。Binding
:绑定,它的作用就是把exchange和queue按照路由规则绑定起来。Routing Key
:路由关键字,exchange根据这个关键字进行消息投递。vhost
:虚拟主机,一个broker里可以有多个vhost,用作不同用户的权限分离。Producer
:消息生产者,就是投递消息的程序。Consumer
:消息消费者,就是接受消息的程序。Channel
:消息通道,在客户端的每个连接里,可建立多个channel。
需要预先安装erlang
http://localhost:15672
生产者和消费者都能可以创建队列,在关注队列的时候需要有一个明确的可监听队列。
生产者和消费者关注的都是队列,都是在队列上定义的操作。
循环分发
消息确认
消息持久化
公平分发
The core idea in the messaging model in RabbitMQ is that the producer never sends any messages directly to a queue.
MQ消息模型中的核心思想是生产者从不直接发送消息到队列中。
Producer sends messages to the “hello” queue. The consumer receives messages from that queue.
<dependency>
<groupId>com.rabbitmqgroupId>
<artifactId>amqp-clientartifactId>
<version>3.6.5version>
dependency>
Producer.java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 生产者
* Created by PengHongfu 2018-05-14 11:24
*/
public class Producer {
private final static String QUEUE_NAME = "test_queue";//队列名称
public static void main(String args[]) throws IOException, TimeoutException {
// 1.创建连接连接到RabbitMQ
ConnectionFactory factory = new ConnectionFactory();
// 2.设置地址、端口、账号、密码
factory.setHost("localhost");
// 3.获取连接
Connection conn = factory.newConnection();
// 4.获取通道
Channel channel = conn.createChannel();
// 5.创建队列 1-队列名称 2-队列是否持久化 3-队列是否是独占 4-使用完之后是否删除此队列 5-其他属性
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 6.发送消息
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [Producer] Sent '" + message + "'");
// 7.关闭资源
channel.close();
conn.close();
}
}
Consumer.java
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 消费者
* Created by PengHongfu 2018-05-14 11:50
*/
public class Consumer {
private final static String QUEUE_NAME = "helloWorld_queue";//队列名称
public static void main(String args[]) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection conn = factory.newConnection();
Channel channel = conn.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [消费者] Waiting for messages. To exit press CTRL+C");
//消费消息
com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [消费者] Received '" + message + "'");
}
};
/*
* 监听队列
* 参数1:队列名称
* 参数2:是否发送ack包,不发送ack消息会持续在服务端保存,直到收到ack。 可以通过channel.basicAck手动回复ack
* 参数3:消费者
*/
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
依次启动消费者和生产者
工作队列
Producer.java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* work模式
* Created by PengHongfu 2018-05-14 14:27
*/
public class Producer {
private final static String QUEUE_NAME = "test_queue_work";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
for (int i = 0; i < 10; i++) {
// 消息内容
String message = "" + i;
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [生产者] Sent '" + message + "'");
//发送的消息间隔越来越长
Thread.sleep(i * 10);
}
channel.close();
connection.close();
}
}
消费者处理消息时,利用休眠时间长短来模拟工作任务的轻重
Consumer1.java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
/**
* 消费者1
* Created by PengHongfu 2018-05-14 14:29
*/
public class Consumer1 {
private final static String QUEUE_NAME = "test_queue_work";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 同一时刻服务器只会发一条消息给消费者(能者多劳模式),空闲多的消费者,消费更多的消息
//channel.basicQos(1);
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(QUEUE_NAME, false, consumer);
// 获取消息
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [消费者1] Received '" + message + "'");
//休眠
Thread.sleep(10);
// 手动返回ack包确认状态
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
//channel.basicReject(); channel.basicNack(); //可以通过这两个函数拒绝消息,可以指定消息在服务器删除还是继续投递给其他消费者
}
}
}
Consumer2.java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
/**
* 消费者2
* Created by PengHongfu 2018-05-14 14:29
*/
public class Consumer2 {
private final static String QUEUE_NAME = "test_queue_work";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 同一时刻服务器只会发一条消息给消费者(能者多劳模式),空闲多的消费者,消费更多的消息
//channel.basicQos(1);
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(QUEUE_NAME, false, consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [消费者2] Received '" + message + "'");
// 休眠1秒
Thread.sleep(1000);
//反馈消息的消费状态
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}
依次启动Consumer1,Consumer2,Producer
,结果如下:
平均每个消费者获得相同数量的消息。这是由于分发消息机制—Round-Robin(轮询),合理的方案是能者多劳,休眠多的消费者,工作任务重的消费者少分发消息任务。下面的代码正是处理这个问题的,使得分配公正。
channel.basicQos(1);
每个消费者在处理完一个消息之后,服务器才会发一条消息给消费者,这使得空闲的消费者,会分配到更多的消息。
发布/订阅
we'll deliver a message to multiple consumers.
There are a few exchange types available:
direct
,topic
,headers
andfanout
Direct Exchange
:直接匹配,通过Exchange名称+RountingKey来发送与接收消息.Fanout Exchange
:广播订阅,向所有的消费者发布消息,但是只有消费者将队列绑定到该Exchange路由器才能收到消息,忽略Routing Key.Topic Exchange
:主题匹配订阅,这里的主题指的是RoutingKey,RoutingKey可以采用通配符,如:*或#,RoutingKey命名采用.来分隔多个词,只有消息这将队列绑定到该路由器且指定RoutingKey符合匹配规则时才能收到消息;Headers Exchange
:消息头订阅,消息发布前,为消息定义一个或多个键值对的消息头,然后消费者接收消息同时需要定义类似的键值对请求头:(如:x-mactch=all或者x_match=any),只有请求头与消息头匹配,才能接收消息,忽略RoutingKey.默认的exchange
:如果用空字符串去声明一个exchange,那么系统就会使用”amq.direct”这个exchange,我们创建一个queue时,默认的都会有一个和新建queue同名的routingKey绑定到这个默认的exchange上去
channel.BasicPublish("", "TaskQueue", properties, bytes);
Producer.java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* 订阅模式
* Created by PengHongfu 2018-05-14 16:27
*/
public class Producer {
//交换机的名称
private final static String EXCHANGE_NAME = "test_exchange_fanout";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
/*
* 声明exchange(交换机)
* 参数1:交换机名称
* 参数2:交换机类型
*/
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
for (int i = 0; i < 5; i++) {
String message = "订阅消息-"+i;
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
System.out.println(" [生产者] Sent '" + message + "'");
Thread.sleep(100);
}
channel.close();
connection.close();
}
}
Consumer1.java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import java.io.File;
import java.io.FileOutputStream;
import java.text.SimpleDateFormat;
import java.util.Date;
/**
* Created by PengHongfu 2018-05-14 16:29
*/
public class Consumer1 {
private final static String QUEUE_NAME = "test_queue_exchange_1";//可以是任意的队列名
private final static String EXCHANGE_NAME = "test_exchange_fanout";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
/*
* 绑定队列到交换机(这个交换机的名称一定要和上面的生产者交换机名称相同)
* 参数1:队列的名称
* 参数2:交换机的名称
* 参数3:Routing Key
*
*/
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
// 定义队列的消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 监听队列,自动返回完成
channel.basicConsume(QUEUE_NAME, true, consumer);
System.out.println(" [消费者1] Waiting for messages. To exit press CTRL+C");
// 获取消息
while (true) {
String dir = Consumer1.class.getClassLoader().getResource("").getPath();
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
String fileName = new SimpleDateFormat("yyyy-MM-dd").format(new Date());
File file = new File(dir, fileName + ".log");
FileOutputStream outputStream = new FileOutputStream(file, true);
outputStream.write(((new SimpleDateFormat("HH:mm:ss").format(new Date())) + "-" + message + "rn").getBytes());
outputStream.flush();
outputStream.close();
}
}
}
Consumer2.java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
/**
* Created by PengHongfu 2018-05-14 16:30
*/
public class Consumer2 {
private final static String QUEUE_NAME = "test_queue_exchange_2";
private final static String EXCHANGE_NAME = "test_exchange_fanout";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
c