热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

RabbitMQ入门(四)——topic交换器

今天我们来学习最后一个交换器类型:topic。direct是放到exchange绑定的一个queue里,fanout是放到exchange绑定的所有queue里。那有没有放到exc

今天我们来学习最后一个交换器类型:topic。direct是放到exchange绑定的一个queue里,fanout是放到exchange绑定的所有queue里。那有没有放到exchange绑定的一部分queue里,或者多个routing key可以路由到一个queue里呢,那就要用到topic类型的exchange。

技术分享



我们先来看看多个routing key如何路由到一个queue里。假设我们有三个系统,在出错的时候会写日志,并会把日志发送到RabbitMQ,路由键为:系统名.error。在RabbitMQ里我们想把所有的error信息放到一个queue里面,就可以使用如下的方式:

package com.jaeger.exchange.topic;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import org.junit.Test;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class Producer {
	private static final String MY_EXCHANGE_NAME = "MyExchange";
	// 三个不同系统发送日志时使用的路由键
	private static final String SYS1_ERROR_ROUTING_KEY = "sys1.error";
	private static final String SYS2_ERROR_ROUTING_KEY = "sys2.error";
	private static final String SYS3_ERROR_ROUTING_KEY = "sys3.error";
	private static final String MY_QUEUE_NAME = "MyQueue";
	private static final String TOPIC = "topic";
	private static final String HOST = "172.19.64.21";
	private static final String USER = "jaeger";
	private static final String PASSWORD = "root";
	private static final int PORT = 5672;

	@Test
	public void createExchangeAndQueue() throws IOException, TimeoutException {
		ConnectionFactory connectionFactory = new ConnectionFactory();
		connectionFactory.setHost(HOST);
		connectionFactory.setUsername(USER);
		connectionFactory.setPassword(PASSWORD);
		connectionFactory.setPort(PORT);
		Connection connection = connectionFactory.newConnection();
		Channel channel = connection.createChannel();
		// 创建一个topic类型的exchange
		channel.exchangeDeclare(MY_EXCHANGE_NAME, TOPIC);
		// 创建一个queue
		channel.queueDeclare(MY_QUEUE_NAME, false, false, false, null);
		// 创建一个routing key,把exchange和queue绑定到一起,但这里的routing key并不是一个
		// 具体的名称,而是可以匹配所有以.error结尾的routing key
		channel.queueBind(MY_QUEUE_NAME, MY_EXCHANGE_NAME, "*.error");
		channel.close();
		connection.close();
	}

	@Test
	public void produce() throws IOException, TimeoutException {
		ConnectionFactory connectionFactory = new ConnectionFactory();
		connectionFactory.setHost(HOST);
		connectionFactory.setUsername(USER);
		connectionFactory.setPassword(PASSWORD);
		connectionFactory.setPort(PORT);
		Connection connection = connectionFactory.newConnection();
		Channel channel = connection.createChannel();
		String message = "Hello 世界!";
		/*
		向RabbitMQ发送消息。我们这里指定了exchange和3个不同的routing key的名称,RabbitMQ会去找有没有叫这个名称的exchange,
		如果找到了又发现这个exchange是topic类型,就会尝试用指定的routing key去匹配exchange绑定的routing key,
		凡是匹配到的routing key的queue都会收到消息。
		*/
		channel.basicPublish(MY_EXCHANGE_NAME, SYS1_ERROR_ROUTING_KEY, null, message.getBytes("utf-8"));
		channel.basicPublish(MY_EXCHANGE_NAME, SYS2_ERROR_ROUTING_KEY, null, message.getBytes("utf-8"));
		channel.basicPublish(MY_EXCHANGE_NAME, SYS3_ERROR_ROUTING_KEY, null, message.getBytes("utf-8"));
		System.out.println("Sent ‘" + message + "‘");
		channel.close();
		connection.close();
	}
	
	@Test
	public void consume() throws IOException, TimeoutException, InterruptedException{
		ConnectionFactory connectionFactory = new ConnectionFactory();
		connectionFactory.setHost(HOST);
		connectionFactory.setUsername(USER);
		connectionFactory.setPassword(PASSWORD);
		connectionFactory.setPort(PORT);
		Connection connection = connectionFactory.newConnection();
		Channel channel = connection.createChannel();
		Consumer consumer = new DefaultConsumer(channel) {
			@Override
			public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
					byte[] body) throws IOException {
				String message = new String(body, "UTF-8");
				System.out.println("Received ‘" + message + "‘");
			}
		};
		channel.basicConsume(MY_QUEUE_NAME, true, consumer);
		Thread.sleep(1000);
	}
}

运行createExchangeAndQueue,发现exchange上绑定了一个*.error的路由键:

技术分享

技术分享

技术分享

然后运行produce方法,向RabbitMQ发送消息:

技术分享可以看到3条消息进入了同一个queue。最后运行consume来消费消息:

技术分享

技术分享




上面我们展示了如何让多个routing key路由到同一个queue。那有没有办法让一个routing key路由到多个queue呢?其实用topic类型的exchange是完全可以做到的。比如,我们的系统出错后会根据不同的错误级别生成error_levelX.log日志,我们在后台首先要把所有的error保存在一个总的queue里,然后再按level分别存放在不同的queue。我们把上面的代码修改下:

package com.jaeger.exchange.topic;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import org.junit.Test;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class Producer {
	private static final String MY_EXCHANGE = "MyExchange";
	private static final String SYS_ERROR_ROUTING_KEY = "error.level1.log";
	private static final String SYS_ERROR_QUEUE = "ErrorQueue";
	private static final String SYS_LEVEL1_ERROR_QUEUE = "Level1ErrorQueue";
	private static final String SYS_LEVEL2_ERROR_QUEUE = "Level2ErrorQueue";
	private static final String TOPIC = "topic";
	private static final String HOST = "172.19.64.21";
	private static final String USER = "jaeger";
	private static final String PASSWORD = "root";
	private static final int PORT = 5672;

	@Test
	public void createExchangeAndQueue() throws IOException, TimeoutException {
		ConnectionFactory connectionFactory = new ConnectionFactory();
		connectionFactory.setHost(HOST);
		connectionFactory.setUsername(USER);
		connectionFactory.setPassword(PASSWORD);
		connectionFactory.setPort(PORT);
		Connection connection = connectionFactory.newConnection();
		Channel channel = connection.createChannel();
		// 创建一个topic类型的exchange
		channel.exchangeDeclare(MY_EXCHANGE, TOPIC);
		// 创建三个存放error的queue
		channel.queueDeclare(SYS_ERROR_QUEUE, false, false, false, null);
		channel.queueDeclare(SYS_LEVEL1_ERROR_QUEUE, false, false, false, null);
		channel.queueDeclare(SYS_LEVEL2_ERROR_QUEUE, false, false, false, null);
		// 创建三个routing key,把exchange和三个queue绑定到一起
		channel.queueBind(SYS_ERROR_QUEUE, MY_EXCHANGE, "error.*.log");
		channel.queueBind(SYS_LEVEL1_ERROR_QUEUE, MY_EXCHANGE, "error.level1.log");
		channel.queueBind(SYS_LEVEL2_ERROR_QUEUE, MY_EXCHANGE, "error.level2.log");
		channel.close();
		connection.close();
	}

	@Test
	public void produce() throws IOException, TimeoutException {
		ConnectionFactory connectionFactory = new ConnectionFactory();
		connectionFactory.setHost(HOST);
		connectionFactory.setUsername(USER);
		connectionFactory.setPassword(PASSWORD);
		connectionFactory.setPort(PORT);
		Connection connection = connectionFactory.newConnection();
		Channel channel = connection.createChannel();
		String message = "Hello 世界!";
		/*
		向RabbitMQ发送消息。我们这里指定了exchange和一个routing key的名称,RabbitMQ会去找有没有叫这个名称的exchange,
		如果找到了又发现这个exchange是topic类型,就会尝试用指定的routing key去匹配exchange绑定的routing key,
		凡是匹配到的routing key的queue都会收到消息。
		*/
		channel.basicPublish(MY_EXCHANGE, SYS_ERROR_ROUTING_KEY, null, message.getBytes("utf-8"));
		System.out.println("Sent ‘" + message + "‘");
		channel.close();
		connection.close();
	}
	
	@Test
	public void consume() throws IOException, TimeoutException, InterruptedException{
		ConnectionFactory connectionFactory = new ConnectionFactory();
		connectionFactory.setHost(HOST);
		connectionFactory.setUsername(USER);
		connectionFactory.setPassword(PASSWORD);
		connectionFactory.setPort(PORT);
		Connection connection = connectionFactory.newConnection();
		Channel channel = connection.createChannel();
		Consumer consumer = new DefaultConsumer(channel) {
			@Override
			public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
					byte[] body) throws IOException {
				String message = new String(body, "UTF-8");
				System.out.println("Received ‘" + message + "‘");
			}
		};
		channel.basicConsume(SYS_ERROR_QUEUE, true, consumer);
		Thread.sleep(1000);
	}
}

运行createExchangeAndQueue方法,创建3个queue:

技术分享

技术分享

技术分享

然后运行produce方法向RabbitMQ发送消息。可以看到消息只进入了ErrorQueue和Level1ErrorQueue两个队列:

技术分享最后运行consume来消费ErrorQueue队列里的消息:

技术分享



到此为止,RabbitMQ常用的3类exchange全部介绍完了。对于topic有一点需要注意,就是它的匹配规则。topic的匹配规则是基于标识符的,用.分隔。比如error.level1.log中,error、level1和log都是标识符。

*只能匹配一个标识符,比如error.*.log只能匹配error.level1.log或者error.high.log,而不能匹配error.log或者error.level1.high.log。

#可以匹配0个或多个标识符,如error.#.log就可以匹配error.log或者error.level1.high.log。


本文出自 “銅鑼衛門” 博客,请务必保留此出处http://jaeger.blog.51cto.com/11064196/1763091

RabbitMQ入门(四) —— topic交换器


推荐阅读
  • 本文深入解析了 Apache 配置文件 `httpd.conf` 和 `.htaccess` 的优化方法,探讨了如何通过合理配置提升服务器性能和安全性。文章详细介绍了这两个文件的关键参数及其作用,并提供了实际应用中的最佳实践,帮助读者更好地理解和运用 Apache 配置。 ... [详细]
  • 本文详细探讨了Java集合框架的使用方法及其性能特点。首先,通过关系图展示了集合接口之间的层次结构,如`Collection`接口作为对象集合的基础,其下分为`List`、`Set`和`Queue`等子接口。其中,`List`接口支持按插入顺序保存元素且允许重复,而`Set`接口则确保元素唯一性。此外,文章还深入分析了不同集合类在实际应用中的性能表现,为开发者选择合适的集合类型提供了参考依据。 ... [详细]
  • 本文深入探讨了 iOS 开发中 `int`、`NSInteger`、`NSUInteger` 和 `NSNumber` 的应用与区别。首先,我们将详细介绍 `NSNumber` 类型,该类用于封装基本数据类型,如整数、浮点数等,使其能够在 Objective-C 的集合类中使用。通过分析这些类型的特性和应用场景,帮助开发者更好地理解和选择合适的数据类型,提高代码的健壮性和可维护性。苹果官方文档提供了更多详细信息,可供进一步参考。 ... [详细]
  • 解决基于XML配置的MyBatis在Spring整合中出现“无效绑定语句(未找到):com.music.dao.MusicDao.findAll”问题的方法
    在将Spring与MyBatis进行整合时,作者遇到了“无效绑定语句(未找到):com.music.dao.MusicDao.findAll”的问题。该问题主要出现在使用XML文件配置DAO层的情况下,而注解方式配置则未出现类似问题。作者详细分析了两个配置文件之间的差异,并最终找到了解决方案。本文将详细介绍问题的原因及解决方法,帮助读者避免类似问题的发生。 ... [详细]
  • 基于Node.js的高性能实时消息推送系统通过集成Socket.IO和Express框架,实现了高效的高并发消息转发功能。该系统能够支持大量用户同时在线,并确保消息的实时性和可靠性,适用于需要即时通信的应用场景。 ... [详细]
  • 新年伊始,正是学习的最佳时机。本文全面解析了CK1957-Zookeeper的核心概念与实践技巧,旨在帮助初学者快速掌握这一深度学习工具。通过详细的理论讲解和实际操作示例,读者可以更好地理解Zookeeper的工作原理及其在分布式系统中的应用。无论是新手还是有一定基础的学习者,都能从中受益匪浅。 ... [详细]
  • Python学习:环境配置与安装指南
    Python作为一种跨平台的编程语言,适用于Windows、Linux和macOS等多种操作系统。为了确保本地已成功安装Python,用户可以通过终端或命令行界面输入`python`或`python3`命令进行验证。此外,建议使用虚拟环境管理工具如`venv`或`conda`,以便更好地隔离不同项目依赖,提高开发效率。 ... [详细]
  • jQuery学习笔记:深入理解事件委派(2014年8月3日)
    在jQuery中,事件委托机制主要通过`closest()`方法实现。该方法用于查找与指定选择器匹配的最近祖先元素,从当前元素开始逐级向上遍历DOM树。这一技术不仅提高了代码的效率,还能有效处理动态生成的元素。参考资料:jQuery遍历方法详解。 ... [详细]
  • 深入解析Tomcat:开发者的实用指南
    深入解析Tomcat:开发者的实用指南 ... [详细]
  • 题目描述:小K不幸被LL邪教洗脑,洗脑程度之深使他决定彻底脱离这个邪教。在最终离开前,他计划再进行一次亚瑟王游戏。作为最后一战,他希望这次游戏能够尽善尽美。众所周知,亚瑟王游戏的结果很大程度上取决于运气,但通过合理的策略和算法优化,可以提高获胜的概率。本文将详细解析洛谷P3239 [HNOI2015] 亚瑟王问题,并提供具体的算法实现方法,帮助读者更好地理解和应用相关技术。 ... [详细]
  • JVM参数设置与命令行工具详解
    JVM参数配置与命令行工具的深入解析旨在优化系统性能,通过合理设置JVM参数,确保在高吞吐量的前提下,有效减少垃圾回收(GC)的频率,进而降低系统停顿时间,提升服务的稳定性和响应速度。此外,本文还将详细介绍常用的JVM命令行工具,帮助开发者更好地监控和调优JVM运行状态。 ... [详细]
  • 通过优化模板消息机制,本研究提出了一种高效的信息化推送方案。该方案利用获取的访问令牌(access token)和指定的模板ID,实现了精准且快速的信息推送,显著提升了用户体验和信息传递效率。具体实现中,通过调用相关API接口,确保了消息的准确性和及时性,为用户提供更加便捷的服务。 ... [详细]
  • 在ASP.NET MVC项目中,通过实战解决了Ajax请求500错误及多表数据查询的问题。具体而言,将页面分为两个部分,用户点击右侧导航栏时,通过Ajax请求动态加载数据,并在右侧显示相应的页面内容。最初尝试使用Partial Action方法,但遇到了500错误。通过详细排查和调试,最终成功解决了这一问题,并实现了预期功能。此外,还优化了多表数据查询的性能,确保系统的高效运行。 ... [详细]
  • 尽管许多人认为跑步是一项简单的运动,但实际上它涉及诸多专业知识。不正确的跑步方式不仅会降低锻炼效果,还可能引发伤害。例如,穿着不合脚或过于陈旧的跑鞋,会导致足部支撑不足,增加受伤风险。此外,跑步姿势不当、热身不足、过度训练等问题也同样值得关注。本文将详细介绍七大常见跑步误区,并提供专业的改进建议,帮助跑者避免这些问题,提高运动效率和安全性。 ... [详细]
  • 在第六章中,我们将深入探讨MySQL中的多表查询技术,包括联结查询和子查询。联结查询通过将两个或多个表进行连接,基于连接条件生成结果集。常见的联结类型有内联结、外联结和全外联结。交叉联结(CROSS JOIN)虽然使用较少,但其原理是生成所有可能的组合,类似于笛卡尔积的概念。此外,子查询则是在一个查询语句中嵌套另一个查询,用于获取更复杂的数据集。本章将通过实例详细讲解这些查询方法的应用和优化技巧。 ... [详细]
author-avatar
hanhff
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有