在前篇文章中我们创建了一个简单的日志系统,能够广播日志到多个接收者。
在这篇文章中我们打算为日志系统添加一个特征,使它能够仅订阅消息集的子集。例如:我们将能够仅把严重的错误信息直接保存到磁盘,而仍然在控制台打印全部的日志信息。
Bindings
在之前的文章中我们已经创建了bindings,你能够回忆到的代码像下面这样
channel.queueBind(queueName, EXCHANGE_NAME, "")
binding是exchange和queue这两者之间的一种关联关系,它可以简单的理解为:queue对exchange中的消息是感兴趣的。
bindings 能够得到一个额外的参数routingKey,为避免与basic_public参数混淆,我们打算称它为binding关键字,下面我们看下如何利用这个参数创建binding:
channel.queueBind(queueName, EXCHANGE_NAME, "black")
这意味着binding key 依赖于exchange的类型,例如,我们之前使用的fanout
类型的exchange就直接忽略了binding关键字的值。
Direct exchange
在前篇文章中,我们创建的日志系统将全部的消息广播给所有的用户。我们希望将它扩展为允许基于消息自身的程度来过滤他们。比如,我们可能希望将日志写入到磁盘的程序仅接收出错的日志信息,而不在警告日志、信息日志上浪费磁盘空间
我们用过的fanout
类型的exchange,无法给我们许多灵活性,它仅能够做不用思考的就可以直接广播的操作
现在我们将利用direct
类型的exchange代替它。一个direct
类型exchange的路由算法是简单的—一条消息被导航到binding关键字与该消息的路由关键字(routing key)精确匹配的队列。
为了说明这个逻辑,思考下面的设置:
在这个设置中,我们能看到direct
类型的exchange X 绑定了两个队列,第一个队列是绑定了binding关键字orange,第二个队列绑定了两个binding,一个binding关键字black和另一个binding关键字green.
一个路由关键字为orange的被发送到这个exchange中的消息将被导航到队列Q1上, 在这样的设置中路由关键字为black或green的消息将分发到Q2。 其它消息将被丢弃。
Multiple bindings
多个队列使用相同的binding关键字是完全合法的。在我们的例子中我们将在Q1队列和X exchange中间添加一个关键字为black 的binding。由于这个情况,这direct
类型的exchange将表现的像fanout
类型的exhange,将这消息广播到所有匹配的队列(queue)上
一个路由关键字(routing key)为black的消息将被分发到Q1和Q2
发送日志
我们将为我们的日志系统利用这种模式—我们将发送消息到direct
类型的exchange来取代fanout
类型的exchange。我们将提供日志的serverity作为路由关键字(routing key)。通过这种方式,接收消息的程序将能够选择它想接收的消息的severity。让我们先看看如何发送日志信息。
一如既往,首先我们需要创建一个exchange
channel.exchangeDeclare(EXCHANGE_NAME, "direct")
准备发送一个消息
channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes())
为了便于演示,我们将假定severity是info、warning、error中的一个
订阅
接收消息的过程将看起来和之前的一样,但有一点不同,我们将为我们感兴趣的每一个severity创建新的binding
String queueName = channel.queueDeclare().getQueue();for(String severity : argv){channel.queueBind(queueName, EXCHANGE_NAME, severity);
}
完整的代码
EmitLogDirect.java
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;public class EmitLogDirect {private final static String EXCHANGE_NAME &#61; "direct_logs";public static void main(String[] args) throws java.io.IOException,java.util.concurrent.TimeoutException{ConnectionFactory factory &#61; new ConnectionFactory();factory.setHost("localhost");Connection connection &#61; factory.newConnection();Channel channel &#61; connection.createChannel();channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);String severity &#61; getSeverity(args);String message &#61; getMessage(args);channel.basicPublish(EXCHANGE_NAME,severity, null, message.getBytes());System.out.println("[x] Sent&#39;"&#43;message&#43;"&#39;");channel.close();connection.close();}private static String getSeverity(String[] args) {if (args.length <1) {return "info";}return args[0];}private static String getMessage(String[] args) {if(args.length <2) {return "Hello World!";}return joinString(args," ", 1);}private static String joinString(String[] args, String delimiter, int startIndex) {int len &#61; args.length;if(len&#61;&#61;0) return "";if(len return "";StringBuilder sbd &#61; new StringBuilder(args[0]);for(int i&#61;1; ireturn sbd.toString();}}
ReceiveLogsDirect.java
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;import java.io.IOException;
import java.lang.InterruptedException;import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;public class ReceiveLogsDirect {private final static String EXCHANGE_NAME &#61; "direct_logs";public static void main(String[] args)throws java.io.IOException,java.util.concurrent.TimeoutException,java.lang.InterruptedException {ConnectionFactory factory &#61; new ConnectionFactory();factory.setHost("localhost");Connection connection &#61; factory.newConnection();Channel channel &#61; connection.createChannel();channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);String queueName &#61; channel.queueDeclare().getQueue();if(args.length <1){System.out.println("Usage: ReceiveLogsDirect [info][warning][error]");System.exit(1);}for(String severity : args){channel.queueBind(queueName, EXCHANGE_NAME, severity);}System.out.println(" [*] Waiting for messages. To exit press CTRL&#43;C");channel.basicQos(1);Consumer consumer &#61; new DefaultConsumer(channel) {&#64;Overridepublic void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)throws IOException {String message &#61; new String(body,"UTF-8");System.out.println(" [x] Received &#39;"&#43; envelope.getRoutingKey()&#43;"&#39;:&#39;" &#43; message &#43; "&#39;");}};boolean autoAsk &#61; true;channel.basicConsume(queueName, autoAsk, consumer);}private static void doWork(String task) throws InterruptedException{for (char ch: task.toCharArray()) {if (ch&#61;&#61;&#39;.&#39;) Thread.sleep(1000);}}
}
演示效果&#xff1a;
这里简单演示了接收者能够根据routingKey参数的值获取相对应的消息&#xff0c;并没有按照官方文档中的操作将日志保存的磁盘中&#xff0c;有兴趣的读者可自行尝试&#xff01;
关于cmd
窗口执行java文件时依赖类的设置问题
在RabbitMQ之Work Queues这篇文章中我们是通过在cmd
窗口中设置CP
参数的值&#xff0c;但当重新打开cmd
窗口时又需要重新设置&#xff0c;现在为了避免重新设置我们将CP
参数设置为系统参数。配置信息如下&#xff1a;
图(1)
变量名:
CP
变量值:
.;amqp-client-4.0.2.jar;slf4j-api-1.7.21.jar;slf4j-simple-1.7.22.jar