热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

RabbitMQ主题(Topic)

我们进步改良了我们的日志系统。我们使用direct类型转发器,使得接收者有能力进行选择性的接收日志,,而非fanout那样,只能够无脑的转

  我们进步改良了我们的日志系统。我们使用direct类型转发器,使得接收者有能力进行选择性的接收日志,,而非fanout那样,只能够无脑的转发。

  虽然使用direct类型改良了我们的系统,但是仍然存在一些局限性:它不能够基于多重条件进行路由选择。
  在我们的日志系统中,我们有可能希望不仅根据日志的级别而且想根据日志的来源进行订阅。这个概念类似unix工具:syslog,它转发日志基于严重性(info/warning/crit…)和设备(auth/cron/kern…)
  这样可能给我们更多的灵活性:我们可能只想订阅来自’cron’的致命错误日志,而不是来自’kern’的。
  为了在我们的系统中实现上述的需求,我们需要学习稍微复杂的主题类型的转发器(topic exchange)。

  1、 主题转发(Topic Exchange)

  发往主题类型的转发器的消息不能随意的设置选择键(routing_key),必须是由点隔开的一系列的标识符组成。标识符可以是任何东西,但是一般都与消息的某些特性相关。一些合法的选择键的例子:"stock.usd.nyse", "nyse.vmw","quick.orange.rabbit".你可以定义任何数量的标识符,上限为255个字节。
  绑定键和选择键的形式一样。主题类型的转发器背后的逻辑和直接类型的转发器很类似:一个附带特殊的选择键将会被转发到绑定键与之匹配的队列中。需要注意的是:关于绑定键有两种特殊的情况。
  *可以匹配一个标识符。
  #可以匹配0个或多个标识符

  2、 图解:

   我们准备发送关于动物的消息。消息会附加一个选择键包含3个标识符(两个点隔开)。第一个标识符描述动物的速度,第二个标识符描述动物的颜色,第三个标识符描述动物的物种:..
  我们创建3个绑定键:Q1与*.orange.*绑定Q2与*.*.rabbit和lazy.#绑定。
  可以简单的认为:
  Q1对所有的橙色动物感兴趣。
  Q2想要知道关于兔子的一切以及关于懒洋洋的动物的一切。
  一个附带quick.orange.rabbit的选择键的消息将会被转发到两个队列。附带lazy.orange.elephant的消息也会被转发到两个队列。另一方面quick.orange.fox只会被转发到Q1,lazy.brown.fox将会被转发到Q2。lazy.pink.rabbit虽然与两个绑定键匹配,但是也只会被转发到Q2一次。quick.brown.fox不能与任何绑定键匹配,所以会被丢弃。
  如果我们违法我们的约定,发送一个或者四个标识符的选择键,类似:orange,quick.orange.male.rabbit,这些选择键不能与任何绑定键匹配,所以消息将会被丢弃。
  另一方面,lazy.orange.male.rabbit,虽然是四个标识符,也可以与lazy.#匹配,从而转发至Q2。
  注:主题类型的转发器非常强大,可以实现其他类型的转发器。
  当一个队列与绑定键#绑定,将会收到所有的消息,类似fanout类型转发器。
  当绑定键中不包含任何#与*时,类似direct类型转发器。

  3、 完整的例子

  发送端

1 public class EmitLogTopic
2 {
3
4 private static final String EXCHANGE_NAME = "topic_logs";
5
6 public static void main(String[] argv) throws Exception
7 {
8 // 创建连接和频道
9 ConnectionFactory factory = new ConnectionFactory();
10 factory.setHost("localhost");
11 Connection connection = factory.newConnection();
12 Channel channel = connection.createChannel();
13
14 channel.exchangeDeclare(EXCHANGE_NAME, "topic");
15
16 String[] routing_keys = new String[] { "kernal.info", "cron.warning",
17 "auth.info", "kernel.critical" };
18 for (String routing_key : routing_keys)
19 {
20 String msg = UUID.randomUUID().toString();
21 channel.basicPublish(EXCHANGE_NAME, routing_key, null, msg
22 .getBytes());
23 System.out.println(" [x] Sent routingKey = "+routing_key+" ,msg = " + msg + ".");
24 }
25
26 channel.close();
27 connection.close();
28 }
29 }

  接收端1:

1 public class ReceiveLogsTopicForKernel
2 {
3
4 private static final String EXCHANGE_NAME = "topic_logs";
5
6 public static void main(String[] argv) throws Exception
7 {
8 // 创建连接和频道
9 ConnectionFactory factory = new ConnectionFactory();
10 factory.setHost("localhost");
11 Connection connection = factory.newConnection();
12 Channel channel = connection.createChannel();
13 // 声明转发器
14 channel.exchangeDeclare(EXCHANGE_NAME, "topic");
15 // 随机生成一个队列
16 String queueName = channel.queueDeclare().getQueue();
17
18 //接收所有与kernel相关的消息
19 channel.queueBind(queueName, EXCHANGE_NAME, "kernel.*");
20
21 System.out.println(" [*] Waiting for messages about kernel. To exit press CTRL+C");
22
23 QueueingConsumer consumer = new QueueingConsumer(channel);
24 channel.basicConsume(queueName, true, consumer);
25
26 while (true)
27 {
28 QueueingConsumer.Delivery delivery = consumer.nextDelivery();
29 String message = new String(delivery.getBody());
30 String routingKey = delivery.getEnvelope().getRoutingKey();
31
32 System.out.println(" [x] Received routingKey = " + routingKey
33 + ",msg = " + message + ".");
34 }
35 }
36 }

  接收端2:

1 public class ReceiveLogsTopicForCritical
2 {
3
4 private static final String EXCHANGE_NAME = "topic_logs";
5
6 public static void main(String[] argv) throws Exception
7 {
8 // 创建连接和频道
9 ConnectionFactory factory = new ConnectionFactory();
10 factory.setHost("localhost");
11 Connection connection = factory.newConnection();
12 Channel channel = connection.createChannel();
13 // 声明转发器
14 channel.exchangeDeclare(EXCHANGE_NAME, "topic");
15 // 随机生成一个队列
16 String queueName = channel.queueDeclare().getQueue();
17
18 // 接收所有与kernel相关的消息
19 channel.queueBind(queueName, EXCHANGE_NAME, "*.critical");
20
21 System.out
22 .println(" [*] Waiting for critical messages. To exit press CTRL+C");
23
24 QueueingConsumer consumer = new QueueingConsumer(channel);
25 channel.basicConsume(queueName, true, consumer);
26
27 while (true)
28 {
29 QueueingConsumer.Delivery delivery = consumer.nextDelivery();
30 String message = new String(delivery.getBody());
31 String routingKey = delivery.getEnvelope().getRoutingKey();
32
33 System.out.println(" [x] Received routingKey = " + routingKey
34 + ",msg = " + message + ".");
35 }
36 }
37 }

  只接收致命错误的日志消息。

   运行结果:

 [x] Sent routingKey = kernal.info ,msg = a7261f0d-18cc-4c85-ba80-5ecd9283dae7.
 [x] Sent routingKey = cron.warning ,msg = 0c7e4484-66e0-4846-a869-a7a266e16281.
 [x] Sent routingKey = auth.info ,msg = 3273f21f-6e6e-42f2-83df-1f2fafa7a19a.
 [x] Sent routingKey = kernel.critical ,msg = f65d3e1a-0619-4f85-8b0d-59375380ecc9.

--------------------------------------------------------------------------------------------------------------------

 [*] Waiting for messages about kernel. To exit press CTRL+C
 [x] Received routingKey = kernel.critical,msg = f65d3e1a-0619-4f85-8b0d-59375380ecc9.

--------------------------------------------------------------------------------------------------------------------

 [*] Waiting for critical messages. To exit press CTRL+C
 [x] Received routingKey = kernel.critical,msg = f65d3e1a-0619-4f85-8b0d-59375380ecc9.

  可以看到,我们通过使用topic类型的转发器,成功实现了多重条件选择的订阅。

参考博客:http://blog.csdn.net/lmj623565791/article/details/37706355

转:https://www.cnblogs.com/lcngu/p/5932227.html



推荐阅读
  • 在Android开发中,使用Picasso库可以实现对网络图片的等比例缩放。本文介绍了使用Picasso库进行图片缩放的方法,并提供了具体的代码实现。通过获取图片的宽高,计算目标宽度和高度,并创建新图实现等比例缩放。 ... [详细]
  • JVM 学习总结(三)——对象存活判定算法的两种实现
    本文介绍了垃圾收集器在回收堆内存前确定对象存活的两种算法:引用计数算法和可达性分析算法。引用计数算法通过计数器判定对象是否存活,虽然简单高效,但无法解决循环引用的问题;可达性分析算法通过判断对象是否可达来确定存活对象,是主流的Java虚拟机内存管理算法。 ... [详细]
  • 先看官方文档TheJavaTutorialshavebeenwrittenforJDK8.Examplesandpracticesdescribedinthispagedontta ... [详细]
  • 云原生边缘计算之KubeEdge简介及功能特点
    本文介绍了云原生边缘计算中的KubeEdge系统,该系统是一个开源系统,用于将容器化应用程序编排功能扩展到Edge的主机。它基于Kubernetes构建,并为网络应用程序提供基础架构支持。同时,KubeEdge具有离线模式、基于Kubernetes的节点、群集、应用程序和设备管理、资源优化等特点。此外,KubeEdge还支持跨平台工作,在私有、公共和混合云中都可以运行。同时,KubeEdge还提供数据管理和数据分析管道引擎的支持。最后,本文还介绍了KubeEdge系统生成证书的方法。 ... [详细]
  • SpringBoot uri统一权限管理的实现方法及步骤详解
    本文详细介绍了SpringBoot中实现uri统一权限管理的方法,包括表结构定义、自动统计URI并自动删除脏数据、程序启动加载等步骤。通过该方法可以提高系统的安全性,实现对系统任意接口的权限拦截验证。 ... [详细]
  • IhaveconfiguredanactionforaremotenotificationwhenitarrivestomyiOsapp.Iwanttwodiff ... [详细]
  • 阿,里,云,物,联网,net,core,客户端,czgl,aliiotclient, ... [详细]
  • JavaSE笔试题-接口、抽象类、多态等问题解答
    本文解答了JavaSE笔试题中关于接口、抽象类、多态等问题。包括Math类的取整数方法、接口是否可继承、抽象类是否可实现接口、抽象类是否可继承具体类、抽象类中是否可以有静态main方法等问题。同时介绍了面向对象的特征,以及Java中实现多态的机制。 ... [详细]
  • Spring特性实现接口多类的动态调用详解
    本文详细介绍了如何使用Spring特性实现接口多类的动态调用。通过对Spring IoC容器的基础类BeanFactory和ApplicationContext的介绍,以及getBeansOfType方法的应用,解决了在实际工作中遇到的接口及多个实现类的问题。同时,文章还提到了SPI使用的不便之处,并介绍了借助ApplicationContext实现需求的方法。阅读本文,你将了解到Spring特性的实现原理和实际应用方式。 ... [详细]
  • 本文详细介绍了Java中vector的使用方法和相关知识,包括vector类的功能、构造方法和使用注意事项。通过使用vector类,可以方便地实现动态数组的功能,并且可以随意插入不同类型的对象,进行查找、插入和删除操作。这篇文章对于需要频繁进行查找、插入和删除操作的情况下,使用vector类是一个很好的选择。 ... [详细]
  • C++字符字符串处理及字符集编码方案
    本文介绍了C++中字符字符串处理的问题,并详细解释了字符集编码方案,包括UNICODE、Windows apps采用的UTF-16编码、ASCII、SBCS和DBCS编码方案。同时说明了ANSI C标准和Windows中的字符/字符串数据类型实现。文章还提到了在编译时需要定义UNICODE宏以支持unicode编码,否则将使用windows code page编译。最后,给出了相关的头文件和数据类型定义。 ... [详细]
  • Windows7 64位系统安装PLSQL Developer的步骤和注意事项
    本文介绍了在Windows7 64位系统上安装PLSQL Developer的步骤和注意事项。首先下载并安装PLSQL Developer,注意不要安装在默认目录下。然后下载Windows 32位的oracle instant client,并解压到指定路径。最后,按照自己的喜好对解压后的文件进行命名和压缩。 ... [详细]
  • JDK源码学习之HashTable(附带面试题)的学习笔记
    本文介绍了JDK源码学习之HashTable(附带面试题)的学习笔记,包括HashTable的定义、数据类型、与HashMap的关系和区别。文章提供了干货,并附带了其他相关主题的学习笔记。 ... [详细]
  • This article discusses the efficiency of using char str[] and char *str and whether there is any reason to prefer one over the other. It explains the difference between the two and provides an example to illustrate their usage. ... [详细]
  • 本文介绍了OpenStack的逻辑概念以及其构成简介,包括了软件开源项目、基础设施资源管理平台、三大核心组件等内容。同时还介绍了Horizon(UI模块)等相关信息。 ... [详细]
author-avatar
好宝贝蛋_282
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有