热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

消息队列RabbitMQ总结(一)

RabbitMQ是一个开源的消息代理和队列服务器,基于AMQP协议,RabbitMQ高性能的原因之一是因为基于Erlang语言开发,Erlang语言有着和原生Socket一样好的低

RabbitMQ是一个开源的消息代理和队列服务器,基于AMQP协议,RabbitMQ高性能的原因之一是因为基于Erlang语言开发,Erlang语言有着和原生Socket一样好的低延迟。

官网:https://www.rabbitmq.com/



一、AMQP核心概念

Server:又称Broker,接受客户端的连接,实现AMQP实体服务。

Connection:连接,应用程序与Broker的网络连接。

Channel:网络信道,几乎所有的操作都在Channel中进行,Channel是进行消息读写的通道。客户端可建立多个Channel,每个Channel代表一个会话任务。

Message:消息,服务器和应用程序之间传送的数据,由Properties和Body组成。Properties可以对消息进行修饰,比如消息的优先级、延迟等高级特性;Body侧则就是消息体内容。

Virtual host:虚拟地址,用于进行逻辑隔离,最上层的消息路由。一个Virtual Host里面可以有若干个Exchange和Queue,同一个Virtual Host里面不能有相同名称的Exchange或Queue。

Exchange:交换机,接收消息,根据路由键转发消息到绑定的队列。

Binding:Exchange和Queue之间的虚拟连接,binding中可以包含routing key。

Routing key:一个路由规则,虚拟机可用它来确定如何路由一个特定消息。

Queue:也称为Message Queue,消息队列,保存消息并将它们转发给消费者。

 


二、RabbitMQ的整体架

 

AMQP 中消息的路由过程和JMS(Java Message Service) 存在一些差别,AMQP 中增加了 Exchange 和 Binding 的角色。生产者发布消息到Exchange上,通过 Binding 决定交换器的消息应该发送到哪个队列,最终到达队列并被消费者接收。

 


三、Exchange 类型

目前共四种类型:direct、fanout、topic、headers 。headers 匹配 AMQP 消息的 header 而不是路由键,此外 headers 交换器和 direct 交换器完全一致,但性能差很多,目前几乎不用

 

注意:理论上Exchange与队列支持多对多,但是在实际业务中一般一个队列只对应一个Exchange,一个Exchange对应多个队列,不然设计过于复杂。


1.direct

消息中的路由键(routing key)如果和 Binding 中的 binding key 一致, 交换器就将消息发到对应的队列中。路由键与队列名完全匹配,如果一个队列绑定到交换机要求路由键为“user”,则只转发 routing key 标记为“user”的消息,不会转发“user.name”,也不会转发“user.group”等等。它是完全匹配、单播的模式。


2.topic

topic 交换器通过路由键的模式匹配分配消息到队列,可通过通配符进行模糊匹配,它将路由键和绑定键的字符串切分成单词,这些单词之间用点隔开。它会识别两个通配符:符号“#”和符号“”。

#匹配0个或多个单词

*匹配不多不少一个单词

示例:

notice.# 能匹配到 notice.cluster.g001 和  notice.global

notice.* 只能匹配到 notice.global

 


3.fanout

广播模式,简单理解时可看做特殊的topic(路由键#)

每个发到 fanout 类型交换器的消息都会分到所有绑定的队列上去。fanout 交换器不处理路由键,只是简单的将队列绑定到交换器上,每个发送到交换器的消息都会被转发到与该交换器绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。fanout 类型转发消息是最快的。


四、RabbitMQ特性


1、持久化



  •  交换机持久化



 

交换机的持久化会将交换机的属性在服务器内部保存,当重启RabbitMQ服务器时不需要重新手动或执行代码去建立交换机,交换机会自动建立。

 

durable参数

 

true:交换机的属性在服务器内部保存,当重启RabbitMQ服务器时不需要重新手动或执行代码去建立交换机,交换机会自动建立。

 

false:重启后该交换机会被删掉

 

 



  • 队列持久化

 

队列持久化类似于交换机持久化

 

durable参数

 

true:队列的属性在服务器内部保存,当重启RabbitMQ服务器时不需要重新手动或执行代码去建立队列,队列会自动建立。

 

false:重启后该队列会被删掉

 

 



  • 消息持久化

 

RabbitMQ的消息是依附于队列存在的,所以想要消息持久化,那么前提是队列也要持久化。

 

消息的持久化与交换机持久化与队列持久化有所不同,消息的持久化在于创建消息的时候,加一个持久化消息的属性

 

消息属性deliveryMode值为1时表示消息不持久化,为2时则表示消息持久化,且把消息存在磁盘里

 


2、自动删除

autoDelete = true,表示自动删除,自动删除的条件是向后的。

自动删除的条件是向后的意思是

对于交换机,有队列或者其他交换机绑定了本交换机,然后所有队列和其他交换机都解除了绑定本交换机,autoDelete = true时,本交换机会被自动删除

对于队列,有消费者订阅了本队列,然后所有队列解除了订阅本队列,autoDelete = true时,本队列会被自动删除,即使队列中还有消息

 


3、Confirm确认消息

消息的确认,是指生产者投递消息后,如果Broker收到消息,则会给我们生产者一个应答。

如何实现Confirm消息确认?

第一步:在channel上开启确认模式:channel:confirmSelect()

第二步:在channel上添加监听:addConfirmListener,监听成功和失败的返回结果,根据具体的结果对消息进行重新发送、或记录日志等待后续处理!

 


4、Return返回消息

Return Listener 用于处理一些不可路由的消息

我们的消息生产者,通过指定一个Exchange和RoutingKey,把消息送达到某一个队列中去,然后我们的消费监听队列,进行消费处理操作。

但是在某些情况下,如果我们在发送消息的时候,当前的exchange不存在或者指定的路由key路由不到,这个时候如果我们监听这种不可达的消息,就是使用Return Listener!

在基础API中有一个关键的配置项:Mandatory:如果为true,则监听器会接收到路由不可达的消息,然后进行后续处理,如果为false,那么Broker端会自动删除该消息!

 


5、消息的限流

 



  • 假设一个场景,首先,我们Rabbitmq服务器上有上万条未处理的消息,我们随便打开一个消费客户端,会出现以下情况:巨量的消息瞬间全部推送过来,但是我们单个客户端无法同时处理这么多数据。

  • RabbitMQ提供了一种qos(服务质量保证)功能,即在非自动确认消息的前提下,如果一定数目的消息(通过基于consumer或者channel设置Qos的值)未被确认前,不进行消费新的消息。 

 

 



  • void BasicQos(unit prefetchSize,ushort prefetchCount,bool flobal)

 

  prefetchSize : 报文的大小(RabbitMQ暂不支持)

  prefetchCount:会告诉RabbitMQ不要同时给一个消费者推送多于N个消息,即一旦有N个消息还没有ack,则该consumer将block掉,直到有消息ack

  gaobal: true\false 是否将上面设置应用于channel,就是上面限制是channel级别还是consumer级别(RabbitMQ暂不支持)

 


6、TTL消息

TTL Time to Live的缩写,也就是生存时间

 



  • RabbitMQ支持消息的过期时间,在消息发送时可以进行指定。

  • RabbitMQ支持队列的过期时间,从消息入队列开始计算,只要超过了队列的超时时间配置,那么消息就会自动的清除。

 


7、死信队列  

死信队列:DLX, Dead-Letter-Exchange



  • 利用DLX,当消息在队列中变成死信(dead message)之后,它能被重新publish到另一个Exchange,这个Exchange就是DLX。



  • DLX也是一个正常的Exchange,和一般的Exchange没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性。



  • 当这个队列中有死信时,RabbitMQ就会自动将这个消息重新发布到设置的Exchange上去,进而被路由到另一个队列。



  • 可以监听这个队列中消息做相应的处理,这个特性可以弥补RabbitMQ3.0以前支持的immediate参数的功能。



 

immediate参数补充说明

RabbitMQ3.0以前,immediate标志位设置为true时,如果exchange在将消息路由到的所有队列都没有消费者时,该消息会通过basic.return方法返还给生产者。immediate的参数功能和Mandatory做对比,mandatory标志告诉服务器至少将该消息route到一个队列中,否则将消息返还给生产者;immediate标志告诉服务器至少将该消息route到具有消费者的一个队列中,否则将消息返还给生产者;

 

消息变成死信有以下几种情况



  • 消息被拒绝(basic.reject/basic,nack)并且requeue=false

  • 消息TTL过期

  • 队列达到最大长度

 


8、消息的Ack与重回队列

 


9、Message属性

 

 



五、使用场景

1 服务解耦

2 削峰填谷

3异步化缓冲(最终一致性)


 

 





推荐阅读
  • 在Kubernetes上部署JupyterHub的步骤和实验依赖
    本文介绍了在Kubernetes上部署JupyterHub的步骤和实验所需的依赖,包括安装Docker和K8s,使用kubeadm进行安装,以及更新下载的镜像等。 ... [详细]
  • 本文介绍了Python高级网络编程及TCP/IP协议簇的OSI七层模型。首先简单介绍了七层模型的各层及其封装解封装过程。然后讨论了程序开发中涉及到的网络通信内容,主要包括TCP协议、UDP协议和IPV4协议。最后还介绍了socket编程、聊天socket实现、远程执行命令、上传文件、socketserver及其源码分析等相关内容。 ... [详细]
  • 本文介绍了计算机网络的定义和通信流程,包括客户端编译文件、二进制转换、三层路由设备等。同时,还介绍了计算机网络中常用的关键词,如MAC地址和IP地址。 ... [详细]
  • 本文介绍了在mac环境下使用nginx配置nodejs代理服务器的步骤,包括安装nginx、创建目录和文件、配置代理的域名和日志记录等。 ... [详细]
  • .NetCoreWebApi生成Swagger接口文档的使用方法
    本文介绍了使用.NetCoreWebApi生成Swagger接口文档的方法,并详细说明了Swagger的定义和功能。通过使用Swagger,可以实现接口和服务的可视化,方便测试人员进行接口测试。同时,还提供了Github链接和具体的步骤,包括创建WebApi工程、引入swagger的包、配置XML文档文件和跨域处理。通过本文,读者可以了解到如何使用Swagger生成接口文档,并加深对Swagger的理解。 ... [详细]
  • RabbitMq的最终一致性分布式事务
    RabbitMq的最终一致性分布式事务使用rabbitmq的步骤1.运行安装在服务器上的rabbit服务2.在项目中安装依赖3.编写对应的配置文件4.创建对应配置并加上启动注解5. ... [详细]
  • Mac OS 升级到11.2.2 Eclipse打不开了,报错Failed to create the Java Virtual Machine
    本文介绍了在Mac OS升级到11.2.2版本后,使用Eclipse打开时出现报错Failed to create the Java Virtual Machine的问题,并提供了解决方法。 ... [详细]
  • Python瓦片图下载、合并、绘图、标记的代码示例
    本文提供了Python瓦片图下载、合并、绘图、标记的代码示例,包括下载代码、多线程下载、图像处理等功能。通过参考geoserver,使用PIL、cv2、numpy、gdal、osr等库实现了瓦片图的下载、合并、绘图和标记功能。代码示例详细介绍了各个功能的实现方法,供读者参考使用。 ... [详细]
  • 使用正则表达式爬取36Kr网站首页新闻的操作步骤和代码示例
    本文介绍了使用正则表达式来爬取36Kr网站首页所有新闻的操作步骤和代码示例。通过访问网站、查找关键词、编写代码等步骤,可以获取到网站首页的新闻数据。代码示例使用Python编写,并使用正则表达式来提取所需的数据。详细的操作步骤和代码示例可以参考本文内容。 ... [详细]
  • phpcomposer 那个中文镜像是不是凉了 ... [详细]
  • celery 爬虫使用
    简介celery是一个基于分布式消息传输的异步任务队列,它专注于实时处理,同时也支持任务调度。它由三部分组成,消息中间件, ... [详细]
  • rabbtimq相关下载与安装
    rabbitmq与elang版本关系查看https:www.rabbitmq.comwhich-erlang.html#eol-series  rpm下载地址https:githu ... [详细]
  • OpenStack 的 Nova 和 Glance 组件
    简单回顾一下OpenStack三大组件的用途:OpenStackCompute(Nova),为云组织的控制器,它提供一个工具来部署云&#x ... [详细]
  •  Rabbitmq是对AMQP协议的一种实现。使用范围也比较广泛,主要用于消息异步通讯。 ... [详细]
  • RabbitMQ消息中间件快速入门:SpringBoot整合生产者与消费者
    前言本章我们来一次快速入门RabbitMQ——生产者与消费者。需要构建一个生产端与消费端的模型。什么意思呢?我们的生产者发送一条消息,投递到RabbitMQ集群也就是Broker。 ... [详细]
author-avatar
我_纯情至上
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有