作者:threedayman 恒生LIGHT云社区
RabbitMQ是部署最广泛的开源消息代理。RabbitMQ有轻量级且易部署的特点。支持多种消息协议。
常见的使用场景有解耦、异步、削峰填谷。下面我们通过例子来感受下各自场景下使用MQ带来的效益。
解耦
假设有系统A,依赖系统B、系统C、系统D,依赖关系在代码中已经写死,结构如下图。
假设此时又来了一个新需求,系统A需要调用系统E进行一些新的业务操作,那么系统A的程序员又免不了一顿操作,处理接入系统E的需求。同理如果要去掉某个系统的依赖比如系统C,也需要系统A负责的开发进行处理。
那么此时我们如果引入了MQ来看看会带来什么样的变化。
系统A发送消息到MQ,系统B、C、D订阅对应的消息进行业务处理。那么我们再来看看之前的场景,假设需要增加一个依赖系统E,只需要系统E的开发人员进行对应的订阅消费即可,同理如果要取消系统C的依赖,只需要系统C取消订阅对应的消息。
异步
假设系统A操作耗时30ms,系统A还将同步调用系统B(300ms)、系统C(600ms)、系统D(200ms)那么这个请求的响应时间将会达到1130ms。过长的响应时间会给客户带来不好的用户体验。
引入MQ之后我们看看会发生什么变化
系统A将消息发送给MQ(7ms)之后就返回,系统B、C、D分别监听MQ进行业务处理。那么我们看到针对刚才长耗时的同步依赖,引入MQ进行异步处理后,总体的响应时间从1130ms降到了37ms。
削峰填谷
假设我们有个业务高峰期的请求量能够到达7000 /s而业务低谷流量只有100/s,但是我们的mysql数据库只能承受2000/s的请求。
在这种情况下会导致在高峰期超过了mqsql最高的负载能力而直接打挂,而低峰期没有将mqsql的资源合理利用起来。
引入MQ之后我们看看会发生什么变化
此时系统可以按照自己最大的消费能力2000/s去拉取消息,可以平稳度过业务高峰期,同时将一部分消息延迟到业务低谷时期进行处理。不至于出现由于高流量导致数据库被打挂,出现整体服务不可用的现象。
本小节主要针对RabbitMQ的java客户端编写的几个常用的例子,如果您对使用RabbitMQ已熟练掌握,可跳过本小节。查看完整的RabbitMQ使用说明,请访问官方文档。
Hello world
我们通过一个Hello world 的例子来感受下RabbitMQ。首先介绍下本例中使用到的术语
本例中我们我们将生产Hello World的消息,通过消费者接受并打印出消息。
生产者Send 关键步骤见注释说明
public class Send { //队列名称 private final static String QUEUE_NAME = "hello"; public static void main(String[] argv) throws Exception { //创建和server之间的连接 connection、channel ConnectionFactory factory = new ConnectionFactory(); //请设置实际部署节点ip factory.setHost("localhost"); try (Connection cOnnection= factory.newConnection(); Channel channel = connection.createChannel()) { //声明一个queue去发送消息 channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "Hello World!"; //发布消息 channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8)); System.out.println(" [x] Sent '" + message + "'"); } } }
完整Send代码查阅
消费者Recv 关键步骤见注释说明
public class Recv { //队列名称 private final static String QUEUE_NAME = "hello"; public static void main(String[] argv) throws Exception { //创建和server之间的连接 connection、channel ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection cOnnection= factory.newConnection(); Channel channel = connection.createChannel(); //声明要去消费的队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); //通过该类来处理消息 DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); }; channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { }); } }
完整Recv代码查阅
Work Queues
在本例中我们将介绍通过RabbitMQ分发耗时任务给多个工作者。RabbitMQ会通过轮询(round-robin)的方式将消息投递给消费者,这是的我们能够很容易的扩展消费能力。
生产者NewTask 关键步骤见注释说明
public class NewTask { private static final String TASK_QUEUE_NAME = "task_queue"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection cOnnection= factory.newConnection(); Channel channel = connection.createChannel()) { //将队列设置成持久化 channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); String message = String.join(" ", argv); //将消息设置成持久化 channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8")); System.out.println(" [x] Sent '" + message + "'"); } } }
完整NewTask代码查阅
消费者Woker 关键步骤见注释说明
public class Worker { private static final String TASK_QUEUE_NAME = "task_queue"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); final Connection cOnnection= factory.newConnection(); final Channel channel = connection.createChannel(); //将队列设置成持久化 channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); //一个消费者最多同时处理一个未确认的消息 channel.basicQos(1); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); try { doWork(message); } finally { System.out.println(" [x] Done"); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } }; channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, consumerTag -> { }); } //模拟耗时任务,一个.代表耗时1S private static void doWork(String task) { for (char ch : task.toCharArray()) { if (ch == '.') { try { Thread.sleep(1000); } catch (InterruptedException _ignored) { Thread.currentThread().interrupt(); } } } } }
完整Worker代码查阅
Publish/Subscribe
上面我们已经介绍过RabbitMQ的核心消息模型,生产者、消费者、队列,在本小节我们将接触到另一个消息模型exchange** ,它负责从生产者中接收消息,并把消息投递到队列中。exchage主要有以下几种类型**
本例中我们将已fanout类型作为讲解,通过名称我们大概也能猜到此类型exchange会广播接收到的消息到其绑定的队列中。
生产者EmitLog 关键步骤见注释说明
public class EmitLog { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection cOnnection= factory.newConnection(); Channel channel = connection.createChannel()) { //创建一个exchange 并指定类型 channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); String message = argv.length <1 ? "info: Hello World!" : String.join(" ", argv); //此处和之前发消息不一样,指定具体的exchange没有指定具体的queue channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8")); System.out.println(" [x] Sent '" + message + "'"); } } }
EmitLog完整代码查阅
消费者ReceiveLogs 关键步骤见注释说明
public class ReceiveLogs { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection cOnnection= factory.newConnection(); Channel channel = connection.createChannel(); //创建fanout类型的exchange channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); //获取一个独有的,非持久化的,自动删除的队列 String queueName = channel.queueDeclare().getQueue(); //通过绑定方法将exchage和queue之间简历关系 channel.queueBind(queueName, EXCHANGE_NAME, ""); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); }; channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); } }
ReceiveLogs完整代码查阅
Routing
上一个例子中exchange将接收到的信息广播给了绑定的队列中,本例中我们将增加绑定的一些特定,使exchange有能力通过routingKey(全匹配)来投递不同的消息到不同的队列中。例如日常日志区分error日志进单独的队列。
生产者EmitLogDirect 关键步骤见注释说明
public class EmitLogDirect { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection cOnnection= factory.newConnection(); Channel channel = connection.createChannel()) { //声明一个direct类型的exchange channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); String severity = getSeverity(argv); String message = getMessage(argv); channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8")); System.out.println(" [x] Sent '" + severity + "':'" + message + "'"); } } private static String getSeverity(String[] strings) { if (strings.length <1) return "info"; return strings[0]; } private static String getMessage(String[] strings) { if (strings.length <2) return "Hello World!"; return joinStrings(strings, " ", 1); } private static String joinStrings(String[] strings, String delimiter, int startIndex) { int length = strings.length; if (length == 0) return ""; if (length <= startIndex) return ""; StringBuilder words = new StringBuilder(strings[startIndex]); for (int i = startIndex + 1; i
EmitLogDirect完整代码查阅
消费者ReceiveLogsDirect 关键步骤见注释说明
public class ReceiveLogsDirect { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection cOnnection= factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); String queueName = channel.queueDeclare().getQueue(); if (argv.length <1) { System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]"); System.exit(1); } for (String severity : argv) { //建立exchange和queue之间关系并设置routingKey channel.queueBind(queueName, EXCHANGE_NAME, severity); } System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'"); }; channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); } }
ReceiveLogsDirect完整代码查阅
Topics
提供更丰富的exchange到queue之间的路由规则。规则通过.分隔的routingKey,最高限制 255bytes。跟之前的全匹配routingKey不同,topic类型的exchange的routingKey主要增加了两个特性。
生产者EmitLogTopic 关键步骤见注释说明
public class EmitLogTopic { private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection cOnnection= factory.newConnection(); Channel channel = connection.createChannel()) { channel.exchangeDeclare(EXCHANGE_NAME, "topic"); String routingKey = getRouting(argv); String message = getMessage(argv); channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8")); System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'"); } } private static String getRouting(String[] strings) { if (strings.length <1) return "anonymous.info"; return strings[0]; } private static String getMessage(String[] strings) { if (strings.length <2) return "Hello World!"; return joinStrings(strings, " ", 1); } private static String joinStrings(String[] strings, String delimiter, int startIndex) { int length = strings.length; if (length == 0) return ""; if (length
EmitLogTopic完整代码查阅
消费者ReceiveLogsTopic 关键步骤见注释说明
public class ReceiveLogsTopic { private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection cOnnection= factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "topic"); String queueName = channel.queueDeclare().getQueue(); if (argv.length <1) { System.err.println("Usage: ReceiveLogsTopic [binding_key]..."); System.exit(1); } for (String bindingKey : argv) { channel.queueBind(queueName, EXCHANGE_NAME, bindingKey); } System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'"); }; channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); } }
ReceiveLogsTopic完整代码查阅
看到这,各位看官是不是越越欲试想在项目中引入RabbitMQ去优化现在的使用场景,那么是不是我们部署一个RabbitMQ服务,然后发送消息就高枕无忧了呢?其实在引入一个中间件时,同时伴随着一些问题,如果我们对这些问题了解不够深入或者全面,那恭喜你将进入挖坑选手序列。为了成为一个靠谱的程序员,我们要充分了解引入中间件给我们 项目带来的挑战,才能在之后的应用上从容应对。下面列了下消息中间件中常见的几类问题
之后的文章,我们将逐个去讲解上述问题的解决方案。 下一讲:RabbitMQ消息可靠性传输
参考文档
https://www.rabbitmq.com/ RabbitMQ官方文档
tips:作者个人经验有限,不足之处烦请指正。