热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

rabbit路由源码分析

三种路由模式rabbitMq有三种交换机路由模式https:my.oschina.nethuangcongmin12blog885345任何发送到TopicExchange的消息

三种路由模式

rabbitMq有三种交换机路由模式 https://my.oschina.net/huangcongmin12/blog/885345


 
任何发送到Topic Exchange的消息都会被转发到所有关心RouteKey中指定话题的Queue上
1.这种模式较为复杂,简单来说,就是每个队列都有其关心的主题,所有的消息都带有一个“标题”(RouteKey),Exchange会将消息转发到所有关注主题能与RouteKey模糊匹配的队列。
2.这种模式需要RouteKey,也许要提前绑定Exchange与Queue。
3.在进行绑定时,要提供一个该队列关心的主题,如“#.log.#”表示该队列关心所有涉及log的消息(一个RouteKey为”MQ.log.error”的消息会被转发到该队列)。
4.“#”表示0个或若干个关键字,“*”表示一个关键字。如“log.*”能与“log.warn”匹配,无法与“log.warn.timeout”匹配;但是“log.#”能与上述两者匹配。
5.同样,如果Exchange没有发现能够与RouteKey匹配的Queue,则会抛弃此消息。


数据结构

每一个队列有一个consumer来维护

consumer的数据结构是

queue: 消息

queues:队列

 


源码

接受消息 

private boolean doReceiveAndExecute(BlockingQueueConsumer consumer) throws Throwable { //NOSONARChannel channel = consumer.getChannel();for (int i = 0; i //取出消息出来,consumer是一个封装,对所有连接信息,channel的封装logger.trace("Waiting for message from consumer.");Message message = consumer.nextMessage(this.receiveTimeout);if (message == null) {break;}try {executeListener(channel, message);}catch (ImmediateAcknowledgeAmqpException e) {if (this.logger.isDebugEnabled()) {this.logger.debug("User requested ack for failed delivery: "+ message.getMessageProperties().getDeliveryTag());}break;}catch (Throwable ex) { //NOSONARif (causeChainHasImmediateAcknowledgeAmqpException(ex)) {if (this.logger.isDebugEnabled()) {this.logger.debug("User requested ack for failed delivery: "+ message.getMessageProperties().getDeliveryTag());}break;}if (this.transactionManager != null) {if (this.transactionAttribute.rollbackOn(ex)) {RabbitResourceHolder resourceHolder = (RabbitResourceHolder) TransactionSynchronizationManager.getResource(getConnectionFactory());if (resourceHolder != null) {consumer.clearDeliveryTags();}else {/** If we don't actually have a transaction, we have to roll back* manually. See prepareHolderForRollback().*/consumer.rollbackOnExceptionIfNecessary(ex);}throw ex; // encompassing transaction will handle the rollback.}else {if (this.logger.isDebugEnabled()) {this.logger.debug("No rollback for " + ex);}break;}}else {consumer.rollbackOnExceptionIfNecessary(ex);throw ex;}}}return consumer.commitIfNecessary(isChannelLocallyTransacted(channel));}

 

consumer处理消息投递

@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)throws IOException {if (logger.isDebugEnabled()) {logger.debug("Storing delivery for consumerTag: '"+ consumerTag + "' with deliveryTag: '" + envelope.getDeliveryTag() + "' in "+ BlockingQueueConsumer.this);}try {if (BlockingQueueConsumer.this.abortStarted > 0) {if (!BlockingQueueConsumer.this.queue.offer(new Delivery(consumerTag, envelope, properties, body),BlockingQueueConsumer.this.shutdownTimeout, TimeUnit.MILLISECONDS)) {RabbitUtils.setPhysicalCloseRequired(getChannel(), true);// Defensive - should never happenBlockingQueueConsumer.this.queue.clear();getChannel().basicNack(envelope.getDeliveryTag(), true, true);getChannel().basicCancel(consumerTag);try {getChannel().close();}catch (TimeoutException e) {// no-op}}}else {BlockingQueueConsumer.this.queue.put(new Delivery(consumerTag, envelope, properties, body));}}catch (InterruptedException e) {Thread.currentThread().interrupt();}}

 

 


推荐阅读
  • 本文深入探讨了 MXOTDLL.dll 在 C# 环境中的应用与优化策略。针对近期公司从某生物技术供应商采购的指纹识别设备,该设备提供的 DLL 文件是用 C 语言编写的。为了更好地集成到现有的 C# 系统中,我们对原生的 C 语言 DLL 进行了封装,并利用 C# 的互操作性功能实现了高效调用。此外,文章还详细分析了在实际应用中可能遇到的性能瓶颈,并提出了一系列优化措施,以确保系统的稳定性和高效运行。 ... [详细]
  • 在Hive中合理配置Map和Reduce任务的数量对于优化不同场景下的性能至关重要。本文探讨了如何控制Hive任务中的Map数量,分析了当输入数据超过128MB时是否会自动拆分,以及Map数量是否越多越好的问题。通过实际案例和实验数据,本文提供了具体的配置建议,帮助用户在不同场景下实现最佳性能。 ... [详细]
  • 本文详细介绍了HDFS的基础知识及其数据读写机制。首先,文章阐述了HDFS的架构,包括其核心组件及其角色和功能。特别地,对NameNode进行了深入解析,指出其主要负责在内存中存储元数据、目录结构以及文件块的映射关系,并通过持久化方案确保数据的可靠性和高可用性。此外,还探讨了DataNode的角色及其在数据存储和读取过程中的关键作用。 ... [详细]
  • Spring Batch 异常处理与任务限制优化策略 ... [详细]
  • 通过整合JavaFX与Swing,我们成功地将现有的Swing应用程序组件进行了现代化改造。此次升级不仅提升了用户界面的美观性和交互性,还确保了与原有Swing应用程序的无缝集成,为开发高质量的Java桌面应用提供了坚实的基础。 ... [详细]
  • 如何在Spark数据排序过程中有效避免内存溢出(OOM)问题
    本文深入探讨了在使用Spark进行数据排序时如何有效预防内存溢出(OOM)问题。通过具体的代码示例,详细阐述了优化策略和技术手段,为读者在实际工作中遇到类似问题提供了宝贵的参考和指导。 ... [详细]
  • 深入解析 Django 中用户模型的自定义方法与技巧 ... [详细]
  • 本文详细探讨了Java集合框架的使用方法及其性能特点。首先,通过关系图展示了集合接口之间的层次结构,如`Collection`接口作为对象集合的基础,其下分为`List`、`Set`和`Queue`等子接口。其中,`List`接口支持按插入顺序保存元素且允许重复,而`Set`接口则确保元素唯一性。此外,文章还深入分析了不同集合类在实际应用中的性能表现,为开发者选择合适的集合类型提供了参考依据。 ... [详细]
  • 开发心得:深入探讨Servlet、Dubbo与MyBatis中的责任链模式应用
    开发心得:深入探讨Servlet、Dubbo与MyBatis中的责任链模式应用 ... [详细]
  • 本项目在Java Maven框架下,利用POI库实现了Excel数据的高效导入与导出功能。通过优化数据处理流程,提升了数据操作的性能和稳定性。项目已发布至GitHub,当前最新版本为0.0.5。该项目不仅适用于小型应用,也可扩展用于大型企业级系统,提供了灵活的数据管理解决方案。GitHub地址:https://github.com/83945105/holygrail,Maven坐标:`com.github.83945105:holygrail:0.0.5`。 ... [详细]
  • 本文介绍了如何通过掌握 IScroll 技巧来实现流畅的上拉加载和下拉刷新功能。首先,需要按正确的顺序引入相关文件:1. Zepto;2. iScroll.js;3. scroll-probe.js。此外,还提供了完整的代码示例,可在 GitHub 仓库中查看。通过这些步骤,开发者可以轻松实现高效、流畅的滚动效果,提升用户体验。 ... [详细]
  • 在稀疏直接法视觉里程计中,通过优化特征点并采用基于光度误差最小化的灰度图像线性插值技术,提高了定位精度。该方法通过对空间点的非齐次和齐次表示进行处理,利用RGB-D传感器获取的3D坐标信息,在两帧图像之间实现精确匹配,有效减少了光度误差,提升了系统的鲁棒性和稳定性。 ... [详细]
  • 在高清节目的高比特率传输过程中,使用外接USB硬盘进行时间平移(timeshift)时,出现了性能不足和流数据丢失的问题。通过深入研究,我们发现通过对图像组(GOP)和图像头(I-frame)的精确定位技术进行优化,可以显著提升系统的性能和稳定性。本研究提出了改进的图像组与图像头定位算法,有效减少了数据丢失,提高了流媒体传输的效率和质量。 ... [详细]
  • 本文介绍了一种简化版的在线购物车系统,重点探讨了用户登录和购物流程的设计与实现。该系统通过优化界面交互和后端逻辑,提升了用户体验和操作便捷性。具体实现了用户注册、登录验证、商品浏览、加入购物车以及订单提交等功能,旨在为用户提供高效、流畅的购物体验。 ... [详细]
  • 深入解析:RKHunter与AIDE在入侵检测中的应用与优势
    本文深入探讨了RKHunter与AIDE在入侵检测领域的应用及其独特优势。通过对比分析,详细阐述了这两种工具在系统完整性验证、恶意软件检测及日志文件监控等方面的技术特点和实际效果,为安全管理人员提供了有效的防护策略建议。 ... [详细]
author-avatar
woshishuia小姐
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有