热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

RabbitMQ主题(Topic)

我们进步改良了我们的日志系统。我们使用direct类型转发器,使得接收者有能力进行选择性的接收日志,,而非fanout那样,只能够无脑的转

  我们进步改良了我们的日志系统。我们使用direct类型转发器,使得接收者有能力进行选择性的接收日志,,而非fanout那样,只能够无脑的转发。

  虽然使用direct类型改良了我们的系统,但是仍然存在一些局限性:它不能够基于多重条件进行路由选择。
  在我们的日志系统中,我们有可能希望不仅根据日志的级别而且想根据日志的来源进行订阅。这个概念类似unix工具:syslog,它转发日志基于严重性(info/warning/crit…)和设备(auth/cron/kern…)
  这样可能给我们更多的灵活性:我们可能只想订阅来自’cron’的致命错误日志,而不是来自’kern’的。
  为了在我们的系统中实现上述的需求,我们需要学习稍微复杂的主题类型的转发器(topic exchange)。

  1、 主题转发(Topic Exchange)

  发往主题类型的转发器的消息不能随意的设置选择键(routing_key),必须是由点隔开的一系列的标识符组成。标识符可以是任何东西,但是一般都与消息的某些特性相关。一些合法的选择键的例子:"stock.usd.nyse", "nyse.vmw","quick.orange.rabbit".你可以定义任何数量的标识符,上限为255个字节。
  绑定键和选择键的形式一样。主题类型的转发器背后的逻辑和直接类型的转发器很类似:一个附带特殊的选择键将会被转发到绑定键与之匹配的队列中。需要注意的是:关于绑定键有两种特殊的情况。
  *可以匹配一个标识符。
  #可以匹配0个或多个标识符

  2、 图解:

   我们准备发送关于动物的消息。消息会附加一个选择键包含3个标识符(两个点隔开)。第一个标识符描述动物的速度,第二个标识符描述动物的颜色,第三个标识符描述动物的物种:..
  我们创建3个绑定键:Q1与*.orange.*绑定Q2与*.*.rabbit和lazy.#绑定。
  可以简单的认为:
  Q1对所有的橙色动物感兴趣。
  Q2想要知道关于兔子的一切以及关于懒洋洋的动物的一切。
  一个附带quick.orange.rabbit的选择键的消息将会被转发到两个队列。附带lazy.orange.elephant的消息也会被转发到两个队列。另一方面quick.orange.fox只会被转发到Q1,lazy.brown.fox将会被转发到Q2。lazy.pink.rabbit虽然与两个绑定键匹配,但是也只会被转发到Q2一次。quick.brown.fox不能与任何绑定键匹配,所以会被丢弃。
  如果我们违法我们的约定,发送一个或者四个标识符的选择键,类似:orange,quick.orange.male.rabbit,这些选择键不能与任何绑定键匹配,所以消息将会被丢弃。
  另一方面,lazy.orange.male.rabbit,虽然是四个标识符,也可以与lazy.#匹配,从而转发至Q2。
  注:主题类型的转发器非常强大,可以实现其他类型的转发器。
  当一个队列与绑定键#绑定,将会收到所有的消息,类似fanout类型转发器。
  当绑定键中不包含任何#与*时,类似direct类型转发器。

  3、 完整的例子

  发送端

1 public class EmitLogTopic
2 {
3
4 private static final String EXCHANGE_NAME = "topic_logs";
5
6 public static void main(String[] argv) throws Exception
7 {
8 // 创建连接和频道
9 ConnectionFactory factory = new ConnectionFactory();
10 factory.setHost("localhost");
11 Connection connection = factory.newConnection();
12 Channel channel = connection.createChannel();
13
14 channel.exchangeDeclare(EXCHANGE_NAME, "topic");
15
16 String[] routing_keys = new String[] { "kernal.info", "cron.warning",
17 "auth.info", "kernel.critical" };
18 for (String routing_key : routing_keys)
19 {
20 String msg = UUID.randomUUID().toString();
21 channel.basicPublish(EXCHANGE_NAME, routing_key, null, msg
22 .getBytes());
23 System.out.println(" [x] Sent routingKey = "+routing_key+" ,msg = " + msg + ".");
24 }
25
26 channel.close();
27 connection.close();
28 }
29 }

  接收端1:

1 public class ReceiveLogsTopicForKernel
2 {
3
4 private static final String EXCHANGE_NAME = "topic_logs";
5
6 public static void main(String[] argv) throws Exception
7 {
8 // 创建连接和频道
9 ConnectionFactory factory = new ConnectionFactory();
10 factory.setHost("localhost");
11 Connection connection = factory.newConnection();
12 Channel channel = connection.createChannel();
13 // 声明转发器
14 channel.exchangeDeclare(EXCHANGE_NAME, "topic");
15 // 随机生成一个队列
16 String queueName = channel.queueDeclare().getQueue();
17
18 //接收所有与kernel相关的消息
19 channel.queueBind(queueName, EXCHANGE_NAME, "kernel.*");
20
21 System.out.println(" [*] Waiting for messages about kernel. To exit press CTRL+C");
22
23 QueueingConsumer consumer = new QueueingConsumer(channel);
24 channel.basicConsume(queueName, true, consumer);
25
26 while (true)
27 {
28 QueueingConsumer.Delivery delivery = consumer.nextDelivery();
29 String message = new String(delivery.getBody());
30 String routingKey = delivery.getEnvelope().getRoutingKey();
31
32 System.out.println(" [x] Received routingKey = " + routingKey
33 + ",msg = " + message + ".");
34 }
35 }
36 }

  接收端2:

1 public class ReceiveLogsTopicForCritical
2 {
3
4 private static final String EXCHANGE_NAME = "topic_logs";
5
6 public static void main(String[] argv) throws Exception
7 {
8 // 创建连接和频道
9 ConnectionFactory factory = new ConnectionFactory();
10 factory.setHost("localhost");
11 Connection connection = factory.newConnection();
12 Channel channel = connection.createChannel();
13 // 声明转发器
14 channel.exchangeDeclare(EXCHANGE_NAME, "topic");
15 // 随机生成一个队列
16 String queueName = channel.queueDeclare().getQueue();
17
18 // 接收所有与kernel相关的消息
19 channel.queueBind(queueName, EXCHANGE_NAME, "*.critical");
20
21 System.out
22 .println(" [*] Waiting for critical messages. To exit press CTRL+C");
23
24 QueueingConsumer consumer = new QueueingConsumer(channel);
25 channel.basicConsume(queueName, true, consumer);
26
27 while (true)
28 {
29 QueueingConsumer.Delivery delivery = consumer.nextDelivery();
30 String message = new String(delivery.getBody());
31 String routingKey = delivery.getEnvelope().getRoutingKey();
32
33 System.out.println(" [x] Received routingKey = " + routingKey
34 + ",msg = " + message + ".");
35 }
36 }
37 }

  只接收致命错误的日志消息。

   运行结果:

 [x] Sent routingKey = kernal.info ,msg = a7261f0d-18cc-4c85-ba80-5ecd9283dae7.
 [x] Sent routingKey = cron.warning ,msg = 0c7e4484-66e0-4846-a869-a7a266e16281.
 [x] Sent routingKey = auth.info ,msg = 3273f21f-6e6e-42f2-83df-1f2fafa7a19a.
 [x] Sent routingKey = kernel.critical ,msg = f65d3e1a-0619-4f85-8b0d-59375380ecc9.

--------------------------------------------------------------------------------------------------------------------

 [*] Waiting for messages about kernel. To exit press CTRL+C
 [x] Received routingKey = kernel.critical,msg = f65d3e1a-0619-4f85-8b0d-59375380ecc9.

--------------------------------------------------------------------------------------------------------------------

 [*] Waiting for critical messages. To exit press CTRL+C
 [x] Received routingKey = kernel.critical,msg = f65d3e1a-0619-4f85-8b0d-59375380ecc9.

  可以看到,我们通过使用topic类型的转发器,成功实现了多重条件选择的订阅。

参考博客:http://blog.csdn.net/lmj623565791/article/details/37706355

转:https://www.cnblogs.com/lcngu/p/5932227.html



推荐阅读
  • Spring Boot 中配置全局文件上传路径并实现文件上传功能
    本文介绍如何在 Spring Boot 项目中配置全局文件上传路径,并通过读取配置项实现文件上传功能。通过这种方式,可以更好地管理和维护文件路径。 ... [详细]
  • 本文介绍了如何使用Python的Paramiko库批量更新多台服务器的登录密码。通过示例代码展示了具体实现方法,确保了操作的高效性和安全性。Paramiko库提供了强大的SSH2协议支持,使得远程服务器管理变得更加便捷。此外,文章还详细说明了代码的各个部分,帮助读者更好地理解和应用这一技术。 ... [详细]
  • 分享一款基于Java开发的经典贪吃蛇游戏实现
    本文介绍了一款使用Java语言开发的经典贪吃蛇游戏的实现。游戏主要由两个核心类组成:`GameFrame` 和 `GamePanel`。`GameFrame` 类负责设置游戏窗口的标题、关闭按钮以及是否允许调整窗口大小,并初始化数据模型以支持绘制操作。`GamePanel` 类则负责管理游戏中的蛇和苹果的逻辑与渲染,确保游戏的流畅运行和良好的用户体验。 ... [详细]
  • 每年,意甲、德甲、英超和西甲等各大足球联赛的赛程表都是球迷们关注的焦点。本文通过 Python 编程实现了一种生成赛程表的方法,该方法基于蛇形环算法。具体而言,将所有球队排列成两列的环形结构,左侧球队对阵右侧球队,首支队伍固定不动,其余队伍按顺时针方向循环移动,从而确保每场比赛不重复。此算法不仅高效,而且易于实现,为赛程安排提供了可靠的解决方案。 ... [详细]
  • 2012年9月12日优酷土豆校园招聘笔试题目解析与备考指南
    2012年9月12日,优酷土豆校园招聘笔试题目解析与备考指南。在选择题部分,有一道题目涉及中国人的血型分布情况,具体为A型30%、B型20%、O型40%、AB型10%。若需确保在随机选取的样本中,至少有一人为B型血的概率不低于90%,则需要选取的最少人数是多少?该问题不仅考察了概率统计的基本知识,还要求考生具备一定的逻辑推理能力。 ... [详细]
  • 本文介绍了UUID(通用唯一标识符)的概念及其在JavaScript中生成Java兼容UUID的代码实现与优化技巧。UUID是一个128位的唯一标识符,广泛应用于分布式系统中以确保唯一性。文章详细探讨了如何利用JavaScript生成符合Java标准的UUID,并提供了多种优化方法,以提高生成效率和兼容性。 ... [详细]
  • 本地存储组件实现对IE低版本浏览器的兼容性支持 ... [详细]
  • 本文是Java并发编程系列的开篇之作,将详细解析Java 1.5及以上版本中提供的并发工具。文章假设读者已经具备同步和易失性关键字的基本知识,重点介绍信号量机制的内部工作原理及其在实际开发中的应用。 ... [详细]
  • Java Socket 关键参数详解与优化建议
    Java Socket 的 API 虽然被广泛使用,但其关键参数的用途却鲜为人知。本文详细解析了 Java Socket 中的重要参数,如 backlog 参数,它用于控制服务器等待连接请求的队列长度。此外,还探讨了其他参数如 SO_TIMEOUT、SO_REUSEADDR 等的配置方法及其对性能的影响,并提供了优化建议,帮助开发者提升网络通信的稳定性和效率。 ... [详细]
  • PHP预处理常量详解:如何定义与使用常量 ... [详细]
  • QT框架中事件循环机制及事件分发类详解
    在QT框架中,QCoreApplication类作为事件循环的核心组件,为应用程序提供了基础的事件处理机制。该类继承自QObject,负责管理和调度各种事件,确保程序能够响应用户操作和其他系统事件。通过事件循环,QCoreApplication实现了高效的事件分发和处理,使得应用程序能够保持流畅的运行状态。此外,QCoreApplication还提供了多种方法和信号槽机制,方便开发者进行事件的定制和扩展。 ... [详细]
  • 如何利用Java 5 Executor框架高效构建和管理线程池
    Java 5 引入了 Executor 框架,为开发人员提供了一种高效管理和构建线程池的方法。该框架通过将任务提交与任务执行分离,简化了多线程编程的复杂性。利用 Executor 框架,开发人员可以更灵活地控制线程的创建、分配和管理,从而提高服务器端应用的性能和响应能力。此外,该框架还提供了多种线程池实现,如固定线程池、缓存线程池和单线程池,以适应不同的应用场景和需求。 ... [详细]
  • 手指触控|Android电容屏幕驱动调试指南
    手指触控|Android电容屏幕驱动调试指南 ... [详细]
  • 寒假作业解析:第三周 2月12日 第7题
    尽快完成之前的练习任务!每日一练2.1 Problem A Laurenty and Shop 的题目要求是选择两条不同的路线以最小化总的等待时间。简要分析:通过对比不同路线的等待时间,可以找到最优解。此问题可以通过动态规划或贪心算法来解决,具体取决于路线的复杂性和约束条件。 ... [详细]
  • 第二章:Kafka基础入门与核心概念解析
    本章节主要介绍了Kafka的基本概念及其核心特性。Kafka是一种分布式消息发布和订阅系统,以其卓越的性能和高吞吐量而著称。最初,Kafka被设计用于LinkedIn的活动流和运营数据处理,旨在高效地管理和传输大规模的数据流。这些数据主要包括用户活动记录、系统日志和其他实时信息。通过深入解析Kafka的设计原理和应用场景,读者将能够更好地理解其在现代大数据架构中的重要地位。 ... [详细]
author-avatar
好宝贝蛋_282
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有