热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

消息中间件之RabbitMQ初识

消息,中间件,之,ra

作者:threedayman 恒生LIGHT云社区

RabbitMQ是什么

RabbitMQ是部署最广泛的开源消息代理。RabbitMQ有轻量级且易部署的特点。支持多种消息协议。

为什么使用RabbitMQ

常见的使用场景有解耦、异步、削峰填谷。下面我们通过例子来感受下各自场景下使用MQ带来的效益。

解耦

假设有系统A,依赖系统B、系统C、系统D,依赖关系在代码中已经写死,结构如下图。

1621939617(1).png

假设此时又来了一个新需求,系统A需要调用系统E进行一些新的业务操作,那么系统A的程序员又免不了一顿操作,处理接入系统E的需求。同理如果要去掉某个系统的依赖比如系统C,也需要系统A负责的开发进行处理。

那么此时我们如果引入了MQ来看看会带来什么样的变化。

image-20210525185231787.png

系统A发送消息到MQ,系统B、C、D订阅对应的消息进行业务处理。那么我们再来看看之前的场景,假设需要增加一个依赖系统E,只需要系统E的开发人员进行对应的订阅消费即可,同理如果要取消系统C的依赖,只需要系统C取消订阅对应的消息。

异步

假设系统A操作耗时30ms,系统A还将同步调用系统B(300ms)、系统C(600ms)、系统D(200ms)那么这个请求的响应时间将会达到1130ms。过长的响应时间会给客户带来不好的用户体验。

1621940629(1).png

引入MQ之后我们看看会发生什么变化

image-20210525190839346.png

系统A将消息发送给MQ(7ms)之后就返回,系统B、C、D分别监听MQ进行业务处理。那么我们看到针对刚才长耗时的同步依赖,引入MQ进行异步处理后,总体的响应时间从1130ms降到了37ms。

削峰填谷

假设我们有个业务高峰期的请求量能够到达7000 /s而业务低谷流量只有100/s,但是我们的mysql数据库只能承受2000/s的请求。

1621941575(1).png

在这种情况下会导致在高峰期超过了mqsql最高的负载能力而直接打挂,而低峰期没有将mqsql的资源合理利用起来。

引入MQ之后我们看看会发生什么变化

image-20210525192439287.png

此时系统可以按照自己最大的消费能力2000/s去拉取消息,可以平稳度过业务高峰期,同时将一部分消息延迟到业务低谷时期进行处理。不至于出现由于高流量导致数据库被打挂,出现整体服务不可用的现象。

怎样使用RabbitMQ

本小节主要针对RabbitMQ的java客户端编写的几个常用的例子,如果您对使用RabbitMQ已熟练掌握,可跳过本小节。查看完整的RabbitMQ使用说明,请访问官方文档

Hello world

我们通过一个Hello world 的例子来感受下RabbitMQ。首先介绍下本例中使用到的术语

  • Producer:生产者,用来发送消息。
  • Queue:消息队列,用于存储消息,消息经由生产者投递到消息队列,最终被投递到消费者进行消费,消息队列收到机器内存和硬盘资源的限制。
  • Consumer:消费者,用于接收并处理消息。

本例中我们我们将生产Hello World的消息,通过消费者接受并打印出消息。

1621942986(1).png

生产者Send 关键步骤见注释说明

public class Send { //队列名称 private final static String QUEUE_NAME = "hello"; public static void main(String[] argv) throws Exception { //创建和server之间的连接 connection、channel ConnectionFactory factory = new ConnectionFactory(); //请设置实际部署节点ip factory.setHost("localhost"); try (Connection cOnnection= factory.newConnection(); Channel channel = connection.createChannel()) { //声明一个queue去发送消息 channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "Hello World!"; //发布消息 channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8)); System.out.println(" [x] Sent '" + message + "'"); } } } 

完整Send代码查阅

消费者Recv 关键步骤见注释说明

public class Recv { //队列名称 private final static String QUEUE_NAME = "hello"; public static void main(String[] argv) throws Exception { //创建和server之间的连接 connection、channel ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection cOnnection= factory.newConnection(); Channel channel = connection.createChannel(); //声明要去消费的队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); //通过该类来处理消息 DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); }; channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { }); } } 

完整Recv代码查阅

Work Queues

在本例中我们将介绍通过RabbitMQ分发耗时任务给多个工作者。RabbitMQ会通过轮询(round-robin)的方式将消息投递给消费者,这是的我们能够很容易的扩展消费能力。

1621944984(1).png

生产者NewTask 关键步骤见注释说明

public class NewTask { private static final String TASK_QUEUE_NAME = "task_queue"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection cOnnection= factory.newConnection(); Channel channel = connection.createChannel()) { //将队列设置成持久化 channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); String message = String.join(" ", argv); //将消息设置成持久化 channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8")); System.out.println(" [x] Sent '" + message + "'"); } } } 

完整NewTask代码查阅

消费者Woker 关键步骤见注释说明

public class Worker { private static final String TASK_QUEUE_NAME = "task_queue"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); final Connection cOnnection= factory.newConnection(); final Channel channel = connection.createChannel(); //将队列设置成持久化 channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); //一个消费者最多同时处理一个未确认的消息 channel.basicQos(1); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); try { doWork(message); } finally { System.out.println(" [x] Done"); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } }; channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, consumerTag -> { }); } //模拟耗时任务,一个.代表耗时1S private static void doWork(String task) { for (char ch : task.toCharArray()) { if (ch == '.') { try { Thread.sleep(1000); } catch (InterruptedException _ignored) { Thread.currentThread().interrupt(); } } } } } 

完整Worker代码查阅

Publish/Subscribe

上面我们已经介绍过RabbitMQ的核心消息模型,生产者、消费者、队列,在本小节我们将接触到另一个消息模型exchange** ,它负责从生产者中接收消息,并把消息投递到队列中。exchage主要有以下几种类型**

  • direct
  • topic
  • headers
  • fanout

本例中我们将已fanout类型作为讲解,通过名称我们大概也能猜到此类型exchange会广播接收到的消息到其绑定的队列中。

1622082071(1).jpg

生产者EmitLog 关键步骤见注释说明

public class EmitLog { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection cOnnection= factory.newConnection(); Channel channel = connection.createChannel()) { //创建一个exchange 并指定类型 channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); String message = argv.length <1 ? "info: Hello World!" : String.join(" ", argv); //此处和之前发消息不一样,指定具体的exchange没有指定具体的queue channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8")); System.out.println(" [x] Sent '" + message + "'"); } } } 

EmitLog完整代码查阅

消费者ReceiveLogs 关键步骤见注释说明

public class ReceiveLogs { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection cOnnection= factory.newConnection(); Channel channel = connection.createChannel(); //创建fanout类型的exchange channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); //获取一个独有的,非持久化的,自动删除的队列 String queueName = channel.queueDeclare().getQueue(); //通过绑定方法将exchage和queue之间简历关系 channel.queueBind(queueName, EXCHANGE_NAME, ""); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); }; channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); } } 

ReceiveLogs完整代码查阅

Routing

上一个例子中exchange将接收到的信息广播给了绑定的队列中,本例中我们将增加绑定的一些特定,使exchange有能力通过routingKey(全匹配)来投递不同的消息到不同的队列中。例如日常日志区分error日志进单独的队列。

image-20210526095222998.png

生产者EmitLogDirect 关键步骤见注释说明

public class EmitLogDirect { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection cOnnection= factory.newConnection(); Channel channel = connection.createChannel()) { //声明一个direct类型的exchange channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); String severity = getSeverity(argv); String message = getMessage(argv); channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8")); System.out.println(" [x] Sent '" + severity + "':'" + message + "'"); } } private static String getSeverity(String[] strings) { if (strings.length <1) return "info"; return strings[0]; } private static String getMessage(String[] strings) { if (strings.length <2) return "Hello World!"; return joinStrings(strings, " ", 1); } private static String joinStrings(String[] strings, String delimiter, int startIndex) { int length = strings.length; if (length == 0) return ""; if (length <= startIndex) return ""; StringBuilder words = new StringBuilder(strings[startIndex]); for (int i = startIndex + 1; i 

EmitLogDirect完整代码查阅

消费者ReceiveLogsDirect 关键步骤见注释说明

public class ReceiveLogsDirect { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection cOnnection= factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); String queueName = channel.queueDeclare().getQueue(); if (argv.length <1) { System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]"); System.exit(1); } for (String severity : argv) { //建立exchange和queue之间关系并设置routingKey channel.queueBind(queueName, EXCHANGE_NAME, severity); } System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'"); }; channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); } } 

ReceiveLogsDirect完整代码查阅

Topics

提供更丰富的exchange到queue之间的路由规则。规则通过.分隔的routingKey,最高限制 255bytes。跟之前的全匹配routingKey不同,topic类型的exchange的routingKey主要增加了两个特性。

  • *代表一个单词**。**
  • **#** 代表0个或一个单词。

image-20210526101017541.png

生产者EmitLogTopic 关键步骤见注释说明

public class EmitLogTopic { private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection cOnnection= factory.newConnection(); Channel channel = connection.createChannel()) { channel.exchangeDeclare(EXCHANGE_NAME, "topic"); String routingKey = getRouting(argv); String message = getMessage(argv); channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8")); System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'"); } } private static String getRouting(String[] strings) { if (strings.length <1) return "anonymous.info"; return strings[0]; } private static String getMessage(String[] strings) { if (strings.length <2) return "Hello World!"; return joinStrings(strings, " ", 1); } private static String joinStrings(String[] strings, String delimiter, int startIndex) { int length = strings.length; if (length == 0) return ""; if (length 

EmitLogTopic完整代码查阅

消费者ReceiveLogsTopic 关键步骤见注释说明

public class ReceiveLogsTopic { private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection cOnnection= factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "topic"); String queueName = channel.queueDeclare().getQueue(); if (argv.length <1) { System.err.println("Usage: ReceiveLogsTopic [binding_key]..."); System.exit(1); } for (String bindingKey : argv) { channel.queueBind(queueName, EXCHANGE_NAME, bindingKey); } System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'"); }; channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); } } 

ReceiveLogsTopic完整代码查阅

引入RabbitMQ带来什么挑战

看到这,各位看官是不是越越欲试想在项目中引入RabbitMQ去优化现在的使用场景,那么是不是我们部署一个RabbitMQ服务,然后发送消息就高枕无忧了呢?其实在引入一个中间件时,同时伴随着一些问题,如果我们对这些问题了解不够深入或者全面,那恭喜你将进入挖坑选手序列。为了成为一个靠谱的程序员,我们要充分了解引入中间件给我们 项目带来的挑战,才能在之后的应用上从容应对。下面列了下消息中间件中常见的几类问题

  • 消息丢失
  • 消息重复
  • 消息堆积
  • RabbitMQ的可用性保证

之后的文章,我们将逐个去讲解上述问题的解决方案。 下一讲:RabbitMQ消息可靠性传输

参考文档

https://www.rabbitmq.com/ RabbitMQ官方文档

tips:作者个人经验有限,不足之处烦请指正。


推荐阅读
  • 本文提供了 RabbitMQ 3.7 的快速上手指南,详细介绍了环境搭建、生产者和消费者的配置与使用。通过官方教程的指引,读者可以轻松完成初步测试和实践,快速掌握 RabbitMQ 的核心功能和基本操作。 ... [详细]
  • HBase在金融大数据迁移中的应用与挑战
    随着最后一台设备的下线,标志着超过10PB的HBase数据迁移项目顺利完成。目前,新的集群已在新机房稳定运行超过两个月,监控数据显示,新集群的查询响应时间显著降低,系统稳定性大幅提升。此外,数据消费的波动也变得更加平滑,整体性能得到了显著优化。 ... [详细]
  • Python与R语言在功能和应用场景上各有优势。尽管R语言在统计分析和数据可视化方面具有更强的专业性,但Python作为一种通用编程语言,适用于更广泛的领域,包括Web开发、自动化脚本和机器学习等。对于初学者而言,Python的学习曲线更为平缓,上手更加容易。此外,Python拥有庞大的社区支持和丰富的第三方库,使其在实际应用中更具灵活性和扩展性。 ... [详细]
  • 基于Java和JSP的电子医疗记录管理平台
    随着信息技术的快速发展,各类管理系统已在各行各业得到广泛应用。传统的人工管理模式已逐渐无法满足现代需求。本文介绍了一种基于Java和JSP技术开发的电子医疗记录管理平台,旨在提高医疗行业的信息化水平和管理效率。该平台通过整合先进的数据库技术和Web开发框架,实现了医疗记录的高效存储、查询和管理,为医护人员提供了便捷的操作界面和强大的数据支持。 ... [详细]
  • 本文推荐了六款高效的Java Web应用开发工具,并详细介绍了它们的实用功能。其中,分布式敏捷开发系统架构“zheng”项目,基于Spring、Spring MVC和MyBatis技术栈,提供了完整的分布式敏捷开发解决方案,支持快速构建高性能的企业级应用。此外,该工具还集成了多种中间件和服务,进一步提升了开发效率和系统的可维护性。 ... [详细]
  • 深入解析Tomcat:开发者的实用指南
    深入解析Tomcat:开发者的实用指南 ... [详细]
  • Java 零基础入门:SQL Server 学习笔记(第21篇)
    Java 零基础入门:SQL Server 学习笔记(第21篇) ... [详细]
  • 修复一个 Bug 竟耗时两天?真的有那么复杂吗?
    修复一个 Bug 竟然耗费了两天时间?这背后究竟隐藏着怎样的复杂性?本文将深入探讨这个看似简单的 Bug 为何会如此棘手,从代码层面剖析问题根源,并分享解决过程中遇到的技术挑战和心得。 ... [详细]
  • Java新手求助:如何优雅地向心仪女生索要QQ联系方式(附代码示例与技巧)
    在端午节后的闲暇时光中,我无意间在技术社区里发现了一篇关于如何巧妙地向心仪女生索取QQ联系方式的文章,顿时感到精神焕发。这篇文章详细介绍了源自《啊哈!算法》的方法,不仅图文并茂,还提供了实用的代码示例和技巧,非常适合 Java 新手学习和参考。 ... [详细]
  • Java 8 引入了 Stream API,这一新特性极大地增强了集合数据的处理能力。通过 Stream API,开发者可以更加高效、简洁地进行集合数据的遍历、过滤和转换操作。本文将详细解析 Stream API 的核心概念和常见用法,帮助读者更好地理解和应用这一强大的工具。 ... [详细]
  • 本文作为“实现简易版Spring系列”的第五篇,继前文深入探讨了Spring框架的核心技术之一——控制反转(IoC)之后,将重点转向另一个关键技术——面向切面编程(AOP)。对于使用Spring框架进行开发的开发者来说,AOP是一个不可或缺的概念。了解AOP的背景及其基本原理,对于掌握这一技术至关重要。本文将通过具体示例,详细解析AOP的实现机制,帮助读者更好地理解和应用这一技术。 ... [详细]
  • 从Google Jam落选后,意外收获了一颗“桔子”,重新整理行囊再出发 ... [详细]
  • Django框架下的对象关系映射(ORM)详解
    在Django框架中,对象关系映射(ORM)技术是解决面向对象编程与关系型数据库之间不兼容问题的关键工具。通过将数据库表结构映射到Python类,ORM使得开发者能够以面向对象的方式操作数据库,从而简化了数据访问和管理的复杂性。这种技术不仅提高了代码的可读性和可维护性,还增强了应用程序的灵活性和扩展性。 ... [详细]
  • 在软件开发领域,“池”技术被广泛应用,如数据库连接池、线程池等。本文重点探讨Java中的线程池ThreadPoolExecutor,通过详细解析其内部机制,帮助开发者理解如何高效利用线程池管理任务执行。线程池不仅能够显著减少系统资源的消耗,提高响应速度,还能通过合理的配置,如饱和策略,确保在高负载情况下系统的稳定性和可靠性。文章还将结合实际案例,展示线程池在不同应用场景下的具体实现与优化技巧。 ... [详细]
  • 本文深入探讨了IO复用技术的原理与实现,重点分析了其在解决C10K问题中的关键作用。IO复用技术允许单个进程同时管理多个IO对象,如文件、套接字和管道等,通过系统调用如`select`、`poll`和`epoll`,高效地处理大量并发连接。文章详细介绍了这些技术的工作机制,并结合实际案例,展示了它们在高并发场景下的应用效果。 ... [详细]
author-avatar
mobiledu2502870747
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有