热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

javarabbitmqtopic_rabbitmq+java入门(五)Topic

参考:http:www.rabbitmq.comtutorialstutorial-five-java.html源码:https:github.comz

参考:http://www.rabbitmq.com/tutorials/tutorial-five-java.html

源码:https://github.com/zuzhaoyue/JavaDemo

主题

(使用Java客户端)

先决条件

本教程假定RabbitMQ在标准端口(5672)上的本地主机上安装并运行。如果您使用不同的主机,端口或证书,则连接设置需要进行调整。

在教程四中,我们改进了日志记录系统。我们没有使用只有虚拟广播的fanout交换机,而是使用了direct交换机,并获得了选择性接收日志的可能性。

尽管使用直接交换改进了我们的系统,但它仍然有局限性 - 它不能根据多个规则进行路由。

在我们的日志系统中,我们可能不仅需要根据严重等级来订阅日志,还要根据发布日志的来源进行订阅。您可能从syslogunix工具知道这个概念,该工具根据严重性(info/warn/error...)和工具(auth / cron / kern ...)来路由日志。

这会给我们很大的灵活性 - 我们可能想听取来自'cron'的严重错误,而且还听取来自'kern'的所有日志。

为了在我们的日志系统中实现这一点,我们需要了解更复杂的topic交换。

Topic exchange

发送到topic exchange的消息不是一个随意的routing_key- 它必须是由点分隔的单词列表。单词可以是任何东西,但通常它们指定了与该消息相关的一些功能。一些有效的路由键例子如下:“stock.usd.nyse”,“nyse.vmw”,“quick.orange.rabbit”。只要您愿意,路由键中可以有多少个字,最多255个字节。

绑定键也必须是相同的形式。topic交换背后的逻辑类似于direct exchange- 使用特定路由键发送的消息将被传递到与匹配绑定键绑定的所有队列。绑定键有两个重要的特殊用法:

*(星号)可以代替一个单词。

#(散列)可以替代零个或多个单词。

在一个例子中解释这个很简单:

df65347f930f4fba0f2545f74fa05edd.png

在这个例子中,我们将发送所有描述动物的消息。消息将使用由三个字(两个点)组成的路由键发送。路由关键字中的第一个单词将描述速度,第二个颜色和第三个种类:“..”。

我们创建了三个绑定:Q1绑定了绑定键“* .orange。*”,Q2绑定了“*。*。rabbit”和“lazy。#”。

这些绑定可以概括为:

Q1对所有的橙色动物都感兴趣。

Q2希望听到关于兔子的一切,以及关于懒惰的一切。

将路由键设置为“quick.orange.rabbit”的消息将传递到两个队列。消息“lazy.orange.elephant”也会传送到他们两个。另一方面,“quick.orange.fox”只会进入第一个队列,而“lazy.brown.fox”只会进入第二个队列。“lazy.pink.rabbit”只会传递到第二个队列一次,即使第二个队列匹配了两个绑定。“quick.brown.fox”不匹配任何绑定,因此将被丢弃。

如果我们违反我们的规则并发送带有一个或四个单词的消息,如“orange”或“quick.orange.male.rabbit”,这些消息将不匹配任何绑定,于是会被丢失。

另一方面,“lazy.orange.male.rabbit”即使有四个单词,也会匹配最后一个绑定,并将传递到第二个队列。

topic exchange

topic exchage功能强大,可以像其他exchange一样行事。

当使用“#”(hash)绑定键绑定队列时,它将接收所有消息,而不管路由密钥如何 - 就像在fanout exchange中一样。

当在绑定中没有使用特殊字符“*”(星号)和“#”(hash)时,topic将像direct一样。

把以上放在一起

我们将在我们的日志系统中使用topic。我们首先假定日志的路由键有两个词:“。”。

代码几乎与的代码相同 。

生产者EmitLogTopic.java代码如下:

//package rmq.topics;

/*** Created by zuzhaoyue on 18/5/17.*/

importcom.rabbitmq.client.BuiltinExchangeType;importcom.rabbitmq.client.ConnectionFactory;importcom.rabbitmq.client.Connection;importcom.rabbitmq.client.Channel;public classEmitLogTopic {private static final String EXCHANGE_NAME = "topic_logs";public static voidmain(String[] argv) {

System.out.println("参数是:" +argv.toString());

Connection connection= null;

Channel channel= null;try{

ConnectionFactory factory= newConnectionFactory();

factory.setHost("localhost");

connection=factory.newConnection();

channel=connection.createChannel();

channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);

String routingKey=getRouting(argv);

String message=getMessage(argv);

channel.basicPublish(EXCHANGE_NAME, routingKey,null, message.getBytes("UTF-8"));

System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");

}catch(Exception e) {

e.printStackTrace();

}finally{if (connection != null) {try{

connection.close();

}catch(Exception ignore) {}

}

}

}//若没有参数 ,则返回anoymous.info//若有参数 ,则返回参数第一个

private staticString getRouting(String[] strings){if (strings.length <1)return "anonymous.info";return strings[0];

}//若参数个数小于2,则返回hello world,否则返回joinstring()相应的值

private staticString getMessage(String[] strings){if (strings.length <2)return "Hello World!";return joinStrings(strings, " ", 1);

}//返回输入的数组中从startindex开始的值,这些值以delimeter为分隔符。

private static String joinStrings(String[] strings, String delimiter, intstartIndex) {int length &#61;strings.length;if (length &#61;&#61; 0 ) return "";if (length

StringBuilder words&#61; newStringBuilder(strings[startIndex]);for (int i &#61; startIndex &#43; 1; i

words.append(delimiter).append(strings[i]);

}returnwords.toString();

}

}

消费者ReceiveLogsTopic.java代码如下&#xff1a;

//package rmq.topics;

/*** Created by zuzhaoyue on 18/5/17.*/

import com.rabbitmq.client.*;importjava.io.IOException;public classReceiveLogsTopic {private static final String EXCHANGE_NAME &#61; "topic_logs";public static void main(String[] argv) throwsException {

ConnectionFactory factory&#61; newConnectionFactory();

factory.setHost("localhost");

Connection connection&#61;factory.newConnection();

Channel channel&#61;connection.createChannel();

channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);

String queueName&#61;channel.queueDeclare().getQueue();if (argv.length <1) {

System.err.println("Usage: ReceiveLogsTopic [binding_key]...");

System.exit(1);

}for(String bindingKey : argv) {

channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);

}

System.out.println(" [*] Waiting for messages. To exit press CTRL&#43;C");

Consumer consumer&#61; newDefaultConsumer(channel) {

&#64;Overridepublic voidhandleDelivery(String consumerTag, Envelope envelope,

AMQP.BasicProperties properties,byte[] body) throwsIOException {

String message&#61; new String(body, "UTF-8");

System.out.println(" [x] Received &#39;" &#43; envelope.getRoutingKey() &#43; "&#39;:&#39;" &#43; message &#43; "&#39;");

}

};

channel.basicConsume(queueName,true, consumer);

}

}

测试

1.编译

javac -cp /data/amqp-client-4.2.0.jar EmitLogTopic.java    ReceiveLogsTopic.java

2.执行

1)启动消费者

打开三个窗口&#xff0c;分别输入以下命令&#xff1a;

第一个窗口输入java -cp /data/amqp-client-4.2.0.jar:/data/slf4j-api-1.7.21.jar:. ReceiveLogsTopic ""(表示接收所有的消息)

第二个窗口输入java -cp /data/amqp-client-4.2.0.jar:/data/slf4j-api-1.7.21.jar:. ReceiveLogsTopic "*.critical"(表示接收后缀为critical的消息)

第三个窗口输入java -cp /data/amqp-client-4.2.0.jar:/data/slf4j-api-1.7.21.jar:. ReceiveLogsTopic "kern.*" "*.critical"(表示接收前缀为kern和后缀为critical的消息)

2)启动生产者

依次输入以下命令

java -cp /data/amqp-client-4.2.0.jar:/data/slf4j-api-1.7.21.jar:. EmitLogTopic "aa.critical"

java -cp /data/amqp-client-4.2.0.jar:/data/slf4j-api-1.7.21.jar:. EmitLogTopic "kern.0"

java -cp /data/amqp-client-4.2.0.jar:/data/slf4j-api-1.7.21.jar:. EmitLogTopic "a"

观察消费者的打印情况&#xff0c;发现已经按照不同的规则进行了接收&#xff1a;

9079ad6975421abbd9835fcbc66a9912.png

调试成功~



推荐阅读
  • 欢乐的票圈重构之旅——RecyclerView的头尾布局增加
    项目重构的Git地址:https:github.comrazerdpFriendCircletreemain-dev项目同步更新的文集:http:www.jianshu.comno ... [详细]
  • 在Docker中,将主机目录挂载到容器中作为volume使用时,常常会遇到文件权限问题。这是因为容器内外的UID不同所导致的。本文介绍了解决这个问题的方法,包括使用gosu和suexec工具以及在Dockerfile中配置volume的权限。通过这些方法,可以避免在使用Docker时出现无写权限的情况。 ... [详细]
  • 目录实现效果:实现环境实现方法一:基本思路主要代码JavaScript代码总结方法二主要代码总结方法三基本思路主要代码JavaScriptHTML总结实 ... [详细]
  • 在重复造轮子的情况下用ProxyServlet反向代理来减少工作量
    像不少公司内部不同团队都会自己研发自己工具产品,当各个产品逐渐成熟,到达了一定的发展瓶颈,同时每个产品都有着自己的入口,用户 ... [详细]
  • imx6ull开发板驱动MT7601U无线网卡的方法和步骤详解
    本文详细介绍了在imx6ull开发板上驱动MT7601U无线网卡的方法和步骤。首先介绍了开发环境和硬件平台,然后说明了MT7601U驱动已经集成在linux内核的linux-4.x.x/drivers/net/wireless/mediatek/mt7601u文件中。接着介绍了移植mt7601u驱动的过程,包括编译内核和配置设备驱动。最后,列举了关键词和相关信息供读者参考。 ... [详细]
  • 开发笔记:Java是如何读取和写入浏览器Cookies的
    篇首语:本文由编程笔记#小编为大家整理,主要介绍了Java是如何读取和写入浏览器Cookies的相关的知识,希望对你有一定的参考价值。首先我 ... [详细]
  • 解决nginx启动报错epoll_wait() reported that client prematurely closed connection的方法
    本文介绍了解决nginx启动报错epoll_wait() reported that client prematurely closed connection的方法,包括检查location配置是否正确、pass_proxy是否需要加“/”等。同时,还介绍了修改nginx的error.log日志级别为debug,以便查看详细日志信息。 ... [详细]
  • 本文介绍了一个适用于PHP应用快速接入TRX和TRC20数字资产的开发包,该开发包支持使用自有Tron区块链节点的应用场景,也支持基于Tron官方公共API服务的轻量级部署场景。提供的功能包括生成地址、验证地址、查询余额、交易转账、查询最新区块和查询交易信息等。详细信息可参考tron-php的Github地址:https://github.com/Fenguoz/tron-php。 ... [详细]
  • 本文讨论了在VMWARE5.1的虚拟服务器Windows Server 2008R2上安装oracle 10g客户端时出现的问题,并提供了解决方法。错误日志显示了异常访问违例,通过分析日志中的问题帧,找到了解决问题的线索。文章详细介绍了解决方法,帮助读者顺利安装oracle 10g客户端。 ... [详细]
  • EzPP 0.2发布,新增YAML布局渲染功能
    EzPP发布了0.2.1版本,新增了YAML布局渲染功能,可以将YAML文件渲染为图片,并且可以复用YAML作为模版,通过传递不同参数生成不同的图片。这个功能可以用于绘制Logo、封面或其他图片,让用户不需要安装或卸载Photoshop。文章还提供了一个入门例子,介绍了使用ezpp的基本渲染方法,以及如何使用canvas、text类元素、自定义字体等。 ... [详细]
  • 十大经典排序算法动图演示+Python实现
    本文介绍了十大经典排序算法的原理、演示和Python实现。排序算法分为内部排序和外部排序,常见的内部排序算法有插入排序、希尔排序、选择排序、冒泡排序、归并排序、快速排序、堆排序、基数排序等。文章还解释了时间复杂度和稳定性的概念,并提供了相关的名词解释。 ... [详细]
  • 超级简单加解密工具的方案和功能
    本文介绍了一个超级简单的加解密工具的方案和功能。该工具可以读取文件头,并根据特定长度进行加密,加密后将加密部分写入源文件。同时,该工具也支持解密操作。加密和解密过程是可逆的。本文还提到了一些相关的功能和使用方法,并给出了Python代码示例。 ... [详细]
  • 本文介绍了pack布局管理器在Perl/Tk中的使用方法及注意事项。通过调用pack()方法,可以控制部件在显示窗口中的位置和大小。同时,本文还提到了在使用pack布局管理器时,应注意将部件分组以便在水平和垂直方向上进行堆放。此外,还介绍了使用Frame部件或Toplevel部件来组织部件在窗口内的方法。最后,本文强调了在使用pack布局管理器时,应避免在中间切换到grid布局管理器,以免造成混乱。 ... [详细]
  • 本文介绍了在C#中SByte类型的GetHashCode方法,该方法用于获取当前SByte实例的HashCode。给出了该方法的语法和返回值,并提供了一个示例程序演示了该方法的使用。 ... [详细]
  • Google Play推出全新的应用内评价API,帮助开发者获取更多优质用户反馈。用户每天在Google Play上发表数百万条评论,这有助于开发者了解用户喜好和改进需求。开发者可以选择在适当的时间请求用户撰写评论,以获得全面而有用的反馈。全新应用内评价功能让用户无需返回应用详情页面即可发表评论,提升用户体验。 ... [详细]
author-avatar
流水无痕8676
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有