热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

RabbitMQ之Routing

在前篇文章中我们创建了一个简单的日志系统,能够广播日志到多个接收者。在这篇文章中我们打算为日志系统添加一个特征,使它能够仅订阅消息集的子集。例如&#x

在前篇文章中我们创建了一个简单的日志系统,能够广播日志到多个接收者。

在这篇文章中我们打算为日志系统添加一个特征,使它能够仅订阅消息集的子集。例如:我们将能够仅把严重的错误信息直接保存到磁盘,而仍然在控制台打印全部的日志信息。

Bindings

在之前的文章中我们已经创建了bindings,你能够回忆到的代码像下面这样

channel.queueBind(queueName, EXCHANGE_NAME, "");

binding是exchange和queue这两者之间的一种关联关系,它可以简单的理解为:queue对exchange中的消息是感兴趣的。

bindings 能够得到一个额外的参数routingKey,为避免与basic_public参数混淆,我们打算称它为binding关键字,下面我们看下如何利用这个参数创建binding:

channel.queueBind(queueName, EXCHANGE_NAME, "black");

这意味着binding key 依赖于exchange的类型,例如,我们之前使用的fanout类型的exchange就直接忽略了binding关键字的值。

Direct exchange

在前篇文章中,我们创建的日志系统将全部的消息广播给所有的用户。我们希望将它扩展为允许基于消息自身的程度来过滤他们。比如,我们可能希望将日志写入到磁盘的程序仅接收出错的日志信息,而不在警告日志、信息日志上浪费磁盘空间

我们用过的fanout类型的exchange,无法给我们许多灵活性,它仅能够做不用思考的就可以直接广播的操作

现在我们将利用direct类型的exchange代替它。一个direct类型exchange的路由算法是简单的—一条消息被导航到binding关键字与该消息的路由关键字(routing key)精确匹配的队列。

为了说明这个逻辑,思考下面的设置:
这里写图片描述

在这个设置中,我们能看到direct类型的exchange X 绑定了两个队列,第一个队列是绑定了binding关键字orange,第二个队列绑定了两个binding,一个binding关键字black和另一个binding关键字green.

一个路由关键字为orange的被发送到这个exchange中的消息将被导航到队列Q1上, 在这样的设置中路由关键字为blackgreen的消息将分发到Q2。 其它消息将被丢弃。

Multiple bindings
这里写图片描述

多个队列使用相同的binding关键字是完全合法的。在我们的例子中我们将在Q1队列和X exchange中间添加一个关键字为black 的binding。由于这个情况,这direct类型的exchange将表现的像fanout类型的exhange,将这消息广播到所有匹配的队列(queue)上
一个路由关键字(routing key)为black的消息将被分发到Q1和Q2

发送日志

我们将为我们的日志系统利用这种模式—我们将发送消息到direct类型的exchange来取代fanout类型的exchange。我们将提供日志的serverity作为路由关键字(routing key)。通过这种方式,接收消息的程序将能够选择它想接收的消息的severity。让我们先看看如何发送日志信息。

一如既往,首先我们需要创建一个exchange

channel.exchangeDeclare(EXCHANGE_NAME, "direct");

准备发送一个消息

channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());

为了便于演示,我们将假定severityinfowarningerror中的一个

订阅

接收消息的过程将看起来和之前的一样,但有一点不同,我们将为我们感兴趣的每一个severity创建新的binding

String queueName = channel.queueDeclare().getQueue();for(String severity : argv){channel.queueBind(queueName, EXCHANGE_NAME, severity);
}

完整的代码

这里写图片描述

EmitLogDirect.java

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;public class EmitLogDirect {private final static String EXCHANGE_NAME &#61; "direct_logs";public static void main(String[] args) throws java.io.IOException,java.util.concurrent.TimeoutException{ConnectionFactory factory &#61; new ConnectionFactory();factory.setHost("localhost");Connection connection &#61; factory.newConnection();Channel channel &#61; connection.createChannel();channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);String severity &#61; getSeverity(args);String message &#61; getMessage(args);channel.basicPublish(EXCHANGE_NAME,severity, null, message.getBytes());System.out.println("[x] Sent&#39;"&#43;message&#43;"&#39;");channel.close();connection.close();}private static String getSeverity(String[] args) {if (args.length <1) {return "info";}return args[0];}private static String getMessage(String[] args) {if(args.length <2) {return "Hello World!";}return joinString(args," ", 1);}private static String joinString(String[] args, String delimiter, int startIndex) {int len &#61; args.length;if(len&#61;&#61;0) return "";if(len return "";StringBuilder sbd &#61; new StringBuilder(args[0]);for(int i&#61;1; ireturn sbd.toString();}}

ReceiveLogsDirect.java

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;import java.io.IOException;
import java.lang.InterruptedException;import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;public class ReceiveLogsDirect {private final static String EXCHANGE_NAME &#61; "direct_logs";public static void main(String[] args)throws java.io.IOException,java.util.concurrent.TimeoutException,java.lang.InterruptedException {ConnectionFactory factory &#61; new ConnectionFactory();factory.setHost("localhost");Connection connection &#61; factory.newConnection();Channel channel &#61; connection.createChannel();channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);String queueName &#61; channel.queueDeclare().getQueue();if(args.length <1){System.out.println("Usage: ReceiveLogsDirect [info][warning][error]");System.exit(1);}for(String severity : args){channel.queueBind(queueName, EXCHANGE_NAME, severity);}System.out.println(" [*] Waiting for messages. To exit press CTRL&#43;C");channel.basicQos(1);Consumer consumer &#61; new DefaultConsumer(channel) {&#64;Overridepublic void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)throws IOException {String message &#61; new String(body,"UTF-8");System.out.println(" [x] Received &#39;"&#43; envelope.getRoutingKey()&#43;"&#39;:&#39;" &#43; message &#43; "&#39;");}};boolean autoAsk &#61; true;channel.basicConsume(queueName, autoAsk, consumer);}private static void doWork(String task) throws InterruptedException{for (char ch: task.toCharArray()) {if (ch&#61;&#61;&#39;.&#39;) Thread.sleep(1000);}}
}

演示效果&#xff1a;

这里简单演示了接收者能够根据routingKey参数的值获取相对应的消息&#xff0c;并没有按照官方文档中的操作将日志保存的磁盘中&#xff0c;有兴趣的读者可自行尝试&#xff01;
这里写图片描述

关于cmd窗口执行java文件时依赖类的设置问题

在RabbitMQ之Work Queues这篇文章中我们是通过在cmd窗口中设置CP参数的值&#xff0c;但当重新打开cmd窗口时又需要重新设置&#xff0c;现在为了避免重新设置我们将CP参数设置为系统参数。配置信息如下&#xff1a;

这里写图片描述

图(1)


变量名:
CP

变量值:
.;amqp-client-4.0.2.jar;slf4j-api-1.7.21.jar;slf4j-simple-1.7.22.jar

推荐阅读
  • 2019年后蚂蚁集团与拼多多面试经验详述与深度剖析
    2019年后蚂蚁集团与拼多多面试经验详述与深度剖析 ... [详细]
  • 本文提供了 RabbitMQ 3.7 的快速上手指南,详细介绍了环境搭建、生产者和消费者的配置与使用。通过官方教程的指引,读者可以轻松完成初步测试和实践,快速掌握 RabbitMQ 的核心功能和基本操作。 ... [详细]
  • 深入解析十大经典排序算法:动画演示、原理分析与代码实现
    本文深入探讨了十种经典的排序算法,不仅通过动画直观展示了每种算法的运行过程,还详细解析了其背后的原理与机制,并提供了相应的代码实现,帮助读者全面理解和掌握这些算法的核心要点。 ... [详细]
  • Prim算法在处理稠密图时表现出色,尤其适用于边数远多于顶点数的情形。传统实现的时间复杂度为 \(O(n^2)\),但通过引入优先队列进行优化,可以在点数为 \(m\)、边数为 \(n\) 的情况下显著降低时间复杂度,提高算法效率。这种优化方法不仅能够加速最小生成树的构建过程,还能在大规模数据集上保持良好的性能表现。 ... [详细]
  • HBase在金融大数据迁移中的应用与挑战
    随着最后一台设备的下线,标志着超过10PB的HBase数据迁移项目顺利完成。目前,新的集群已在新机房稳定运行超过两个月,监控数据显示,新集群的查询响应时间显著降低,系统稳定性大幅提升。此外,数据消费的波动也变得更加平滑,整体性能得到了显著优化。 ... [详细]
  • 本文详细介绍了HDFS的基础知识及其数据读写机制。首先,文章阐述了HDFS的架构,包括其核心组件及其角色和功能。特别地,对NameNode进行了深入解析,指出其主要负责在内存中存储元数据、目录结构以及文件块的映射关系,并通过持久化方案确保数据的可靠性和高可用性。此外,还探讨了DataNode的角色及其在数据存储和读取过程中的关键作用。 ... [详细]
  • 本文深入探讨了IO复用技术的原理与实现,重点分析了其在解决C10K问题中的关键作用。IO复用技术允许单个进程同时管理多个IO对象,如文件、套接字和管道等,通过系统调用如`select`、`poll`和`epoll`,高效地处理大量并发连接。文章详细介绍了这些技术的工作机制,并结合实际案例,展示了它们在高并发场景下的应用效果。 ... [详细]
  • 魅族Flyme 7正式发布:全面解析与亮点介绍
    在22日晚的发布会上,魅族不仅推出了m15、15和15 Plus三款新机型,还正式发布了全新的Flyme 7系统。Flyme 7在保持流畅体验的基础上,进一步增强了功能性和实用性,为用户带来更加丰富的使用体验。首批适配包已准备就绪,将逐步推送给现有设备。 ... [详细]
  • 开发心得:利用 Redis 构建分布式系统的轻量级协调机制
    开发心得:利用 Redis 构建分布式系统的轻量级协调机制 ... [详细]
  • Java服务问题快速定位与解决策略全面指南 ... [详细]
  • 本文作为“实现简易版Spring系列”的第五篇,继前文深入探讨了Spring框架的核心技术之一——控制反转(IoC)之后,将重点转向另一个关键技术——面向切面编程(AOP)。对于使用Spring框架进行开发的开发者来说,AOP是一个不可或缺的概念。了解AOP的背景及其基本原理,对于掌握这一技术至关重要。本文将通过具体示例,详细解析AOP的实现机制,帮助读者更好地理解和应用这一技术。 ... [详细]
  • 西北工业大学作为陕西省三所985和211高校之一,虽然在农业和林业领域不如某些顶尖院校,但在航空航天领域的实力尤为突出。该校的计算机科学专业在科研和教学方面也具有显著优势,是考研的理想选择。 ... [详细]
  • 如何正确配置与使用日志组件:Log4j、SLF4J及Logback的连接与整合方法
    在当前的软件开发实践中,无论是开源项目还是日常工作中,日志框架都是不可或缺的工具之一。本文详细探讨了如何正确配置与使用Log4j、SLF4J及Logback这三个流行的日志组件,并深入解析了它们之间的连接与整合方法,旨在帮助开发者高效地管理和优化日志记录流程。 ... [详细]
  • 本文深入探讨了 `ExpressionChangedAfterItHasBeenCheckedError` 错误的原因及其解决方案。通过分析 Angular 的变更检测机制,详细解释了该错误的发生条件,并提供了多种有效的应对策略,帮助开发者在实际开发中避免这一常见问题。 ... [详细]
  • 本文深入剖析了ScheduledThreadPoolExecutor的并发执行机制及其源代码,详细解读了该线程池如何在指定延时或定期执行任务,探讨了其内部的工作原理和优化策略,为开发者提供了宝贵的参考和实践指导。 ... [详细]
author-avatar
龙争虎斗石榴agj
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有