官方文档链接:https://www.rabbitmq.com/tutorials/amqp-concepts.html
AMQP(Advanced Message Queuing Protocol,高级消息队列协议)是一个进程间传递异步消息的协议。
生产者(Publisher)发送消息,经由交换机(Exchange)。交换机根据路由(routingKey)规则将收到的消息分发给与该交换机绑定(Bind)的队列。最后AMQP代理会将消息投递给订阅此队列的消费者,或者消费者按照需求自行获取。
交换机是用来发送消息的AMQP实体。
交换机拿到一个消息之后将它路由给一个或零个队列。它使用那种路由算法是由交换机类型和绑定(Bindings)规则所决定的。
AMQP0-9-1的代理提供了四种交换机:
Name(交换机类型) | Default pre-declared names(预声明的默认名称) |
---|---|
Direct exchange(直连交换机) | (Empty string)and amq.direct |
Fanout exchange(扇形交换机) | amq.fanout |
Topic exchange(主题交换机) | amq.topic |
Headers exchange(头交换) | amq.match (and amq.headers in rabbitmq) |
除交换机类型外,在声明交换机时还可以附带许多其他的属性,其中最重要的几个分别是:
交换机可以有两个状态:持久(durable)、暂存(transient)。
持久化的交换机会在消息代理(broker)重启后依旧存在,而暂存的交换机则不会(它们需要在代理再次上线后重新被声明)。并不是所有的交换机都需要持久化。
在正式的介绍五种交换机(包括默认交换机)前,在这里重申一下,发布者生产的消息中包含了交换机的类型。消息中声明了交换机类型不同,路由规则也就不同,也就会采取不同的规则将消息投入到队列中。
默认交换机(default exchange)实际上是一个由消息代理预先声明好的没有名字(名字为空字符串)的直连交换机 (direct exchange)。
它有一个特殊的属性使得它对于简单应用特别有用处:那就是每个新建队列(queue)都会自动绑定到默认交换机上,绑定的路由键(routing key)名称与队列名称相同。
举个栗子:当你声明了“search-indexing-online”的队列,AMQP代理会自动将其绑定到默认交换机上,绑定(binding)的路由键名称也是为“search-indexing-online”。
因此,携带着名为“search-indexing-online”的路由键的消息被发送到默认的交换机的时候,次消息会被默认交换机路由至名为“search-indexing-online”的队列中。换句话说,默认交换机看起来貌似能够直接将消息投递给队列,尽管技术上并没有做相关的操作。
直连型交换机(direct exchange)是根据消息携带的路由键(routing key)将消息投递给对应绑定键的队列。直连交换机用来处理消息的单播路由(unicast routing)。(尽管它也可以处理多播路由)。
下面介绍它是如何工作的:
在上面这张图中,我们可以看到 X 绑定了两个队列,绑定类型是 direct。队列Q1 绑定键为 orange, 队列 Q2 绑定键有两个:一个绑定键为 black,另一个绑定键为 green.
在这种绑定情况下,生产者发布消息到 exchange 上,绑定键为 orange 的消息会被发布到队列 Q1。绑定键为 blackgreen 和的消息会被发布到队列 Q2,其他消息类型的消息将被丢弃。
直连交换机的队列通常是循环分发多任务给多个消费者(我门称之为轮询)。比如说有3个消费者,4个任务。分别分发给每个消费者一个任务后,第4个任务又分发给了第一个消费者。
综上,我们很容易得出一个结论,在AMQP 0-9-1中,消息的负载均衡是发声在消费者(consumer)之间的,而不是队列(queue)之间。
扇形交换机(fanout exchange)将消息路由给绑定到它生上的所有的队列。而不理会绑定的路由键。如果N个队列绑定到某个扇形交换机上,当有消息发送给扇形交换机时,交换机会将消息的拷贝分别发送给这所有的N个队列。扇形交换机处理消息的广播路由(broadcast routing)。因为扇形交换机投递消息的拷贝到所有绑定到它的队列上,所以他的应用案例都极其相似:
上图所示,生产者(P)生产消息1将消息1推送到 Exchange,由于 Exchange Type=fanout 这时候会遵循fanout的规则将消息推送到所有与它绑定Queue,也就是图上的两个Queue最后两个消费者消费。
前面提到的direct规则是严格意义上的匹配,换言之Routing key必须与Binding key相匹配的时候才将消息传送给queue。
而Topic的路由规则是一种模糊匹配,可以通过通配符满足一部分规则就可以传送。
它的约定是:
*
、#
,用于做模糊匹配,其中*
用于匹配一个单词,#
用于匹配多个单词(可以是0个)。.
分割的字符串(我们将被句点号.
分割开的每一段独立字符串成为一个单词),如stock.usd.nyse
、nyse.vmw
、quick.orange.rabbit
。.
分割的字符串。当生产者发送消息routing key=quick.orange.rabbit,这时候满足Q1、Q2,所以会被路由到Q1、Q2中,如果routing key=quick.orange.fox,就只会被路由到Q1。
主题交换机拥有非常多用户案例。无论何时,当一个问题涉及到哪些想要有针对性的选择需要接受消息的多消费者/多应用(multiple consumers/applications)的时候,主题交换机都可以被列入考虑范围。
使用案例:
headers类型的Exchange不依赖于routing key与binding key的匹配规则来路由消息,而是根据发送的消息的内容中的headers属性进行匹配。
头交换机可以视为直连交换机的另一种变现形式。但直连交换机的路由键必须是一个字符串,而头属性值则没有这个约束,它们甚至可以是整数或者哈希值(字典)等。灵活性更强(但实际上我们很少使用到头部交换机)。
工作流程:
类型名称 | 路由规则 |
---|---|
Default | 自动命名的直交换机 |
Direct | Routing Key==Binding Key,严格匹配 |
Fanout | 把发送到该 Exchange 的消息路由到所有与它绑定的 Queue 中 |
Topic | Routing Key==Binding Key,模糊匹配 |
Headers | 根据发送的消息内容中的 headers 属性进行匹配 |
AMQP 中的队列(queue)跟其他消息队列或任务队列中的队列是很相似的:它们存储着即将被应用消费掉的消息。
队列跟交换机共享某些属性,但是队列也有一些另外的属性。
队列在声明(declare)后才能被使用。如果一个队列尚不存在,声明一个队列会创建它。如果声明的队列已经存在,并且属性完全相同,那么此次声明不会对原有队列产生任何影响。如果声明中的属性与已存在队列的属性有差异,那么一个错误代码为 406 的通道级异常就会被抛出。
持久化队列(Durable queues)会被存储在磁盘上,当消息代理(broker)重启的时候,它依旧存在。没有被持久化的队列称作暂存队列(Transient queues)。并不是所有的场景和案例都需要将队列持久化。
持久化的队列并不会使得路由到它的消息也具有持久性。倘若消息代理挂掉了,重新启动,那么在重启的过程中持久化队列会被重新声明,无论怎样,只有经过持久化的消息才能被重新恢复。
消息如果只是存储在队列里是没有任何用处的。被应用消费掉,消息的价值才能够体现。在 AMQP 0-9-1 模型中,有两种途径可以达到此目的:
使用push API,应用(application)需要明确表示出它在某个特定队列里所感兴趣的,想要消费的消息。如是,我们可以说应用注册了一个消费者,或者说订阅了一个队列。一个队列可以注册多个消费者,也可以注册一个独享的消费者(当独享消费者存在时,其他消费者即被排除在外)。
每个消费者(订阅者)都有一个叫做消费者标签的标识符。它可以被用来退订消息。消费者标签实际上是一个字符串。
消费者应用(Consumer applications) - 用来接受和处理消息的应用 - 在处理消息的时候偶尔会失败或者有时会直接崩溃掉。而且网络原因也有可能引起各种问题。这就给我们出了个难题,AMQP 代理在什么时候删除消息才是正确的?AMQP 0-9-1 规范给我们两种建议:
如果一个消费者在尚未发送确认回执的情况下挂掉了,那 AMQP 代理会将消息重新投递给另一个消费者。如果当时没有可用的消费者了,消息代理会死等下一个注册到此队列的消费者,然后再次尝试投递。
当一个消费者接收到某条消息后,处理过程有可能成功,有可能失败。应用可以向消息代理表明,本条消息由于 “拒绝消息(Rejecting Messages)” 的原因处理失败了(或者未能在此时完成)。
当拒绝某条消息时,应用可以告诉消息代理如何处理这条消息——销毁它或者重新放入队列。
当此队列只有一个消费者时,请确认不要由于拒绝消息并且选择了重新放入队列的行为而引起消息在同一个消费者身上无限循环的情况发生。
在 AMQP 中,basic.reject 方法用来执行拒绝消息的操作。但 basic.reject 有个限制:你不能使用它决绝多个带有确认回执(acknowledgements)的消息。但是如果你使用的是 RabbitMQ,那么你可以使用被称作 negative acknowledgements(也叫 nacks)的 AMQP 0-9-1 扩展来解决这个问题。
在多个消费者共享一个队列的案例中,明确指定在收到下一个确认回执前每个消费者一次可以接受多少条消息是非常有用的。这可以在试图批量发布消息的时候起到简单的负载均衡和提高消息吞吐量的作用。For example, if a producing application sends messages every minute because of the nature of the work it is doing.(???例如,如果生产应用每分钟才发送一条消息,这说明处理工作尚在运行。)
注意,RabbitMQ 只支持通道级的预取计数,而不是连接级的或者基于大小的预取。
AMQP模型中的消息(Message)对象是带有属性(Attributes)的。有些属性及其常见,以至于AMQP 0-9-1明确的定义了它们,并且应用开发者们无需费心思思考这些属性名字所代表的具体含义。例如:
有些属性是被AMQP代理所使用的,但是大多数是开放给接收它们的应用解释器用的。有些属性是可选的也被称作消息头(headers)。他们跟HTTP协议的X-Headers很相似。消息属性需要在消息被发布的时候定义。
AMQP 的消息除属性外,也含有一个有效载荷 - Payload(消息实际携带的数据),它被 AMQP 代理当作不透明的字节数组来对待。
消息代理不会检查或者修改有效载荷。消息可以只包含属性而不携带有效载荷。它通常会使用类似JSON这种序列化的格式数据,为了节省,协议缓冲器和MessagePack将结构化数据序列化,以便以消息的有效载荷的形式发布。AMQP及其同行者们通常使用 “content-type” 和 “content-encoding” 这两个字段来与消息沟通进行有效载荷的辨识工作,但这仅仅是基于约定而已。
消息能够以持久化的方式发布,AMQP 代理会将此消息存储在磁盘上。如果服务器重启,系统会确认收到的持久化消息未丢失。
简单地将消息发送给一个持久化的交换机或者路由给一个持久化的队列,并不会使得此消息具有持久化性质:它完全取决与消息本身的持久模式(persistence mode)。将消息以持久化方式发布时,会对性能造成一定的影响(就像数据库操作一样,健壮性的存在必定造成一些性能牺牲)。
AMQP连接通常是长连接。AMQP是一个使用TCP提供可靠投递的应用层协议。AMQP使用认证机制并且提供 TLS(SSL)保护。当一个应用不再需要连接到AMQP代理的时候,需要优雅的释放掉AMQP连接,而不是直接将TCP连接关闭。
有些应用需要与AMQP代理建立多个连接。无论怎样,同时开启多个TCP 连接都是不合适的,因为这样做会消耗掉过多的系统资源并且使得防火墙的配置更加困难。AMQP 0-9-1 提供了通道(channels)来处理多连接,可以把通道理解成共享一个 TCP 连接的多个轻量化连接。
在涉及多线程 / 进程的应用中,为每个线程/进程开启一个通道(channel)是很常见的,并且这些通道不能被线程 / 进程共享。
一个特定通道上的通讯与其他通道上的通讯是完全隔离的,因此每个 AMQP 方法都需要携带一个通道号,这样客户端就可以指定此方法是为哪个通道准备的。
为了在一个单独的代理上实现多个隔离的环境(用户、用户组、交换机、队列 等),AMQP 提供了一个虚拟主机(virtual hosts-vhosts)的概念。这跟 Web servers 虚拟主机概念非常相似,这为 AMQP 实体提供了完全隔离的环境。当连接被建立的时候,AMQP 客户端来指定使用哪个虚拟主机。
AMQP 0-9-1 拥有多个扩展点:
这些特性使得 AMQP 0-9-1 模型更加灵活,并且能够适用于解决更加宽泛的问题。
AMQP 0-9-1 拥有众多的适用于各种流行语言和框架的客户端。其中一部分严格遵循 AMQP 规范,提供 AMQP 方法的实现。另一部分提供了额外的技术,方便使用的方法和抽象。有些客户端是异步的(非阻塞的),有些是同步的(阻塞的),有些将这两者同时实现。有些客户端支持 “供应商的特定扩展”(例如 RabbitMQ 的特定扩展)。
因为 AMQP 的主要目标之一就是实现交互性,所以对于开发者来讲,了解协议的操作方法而不是只停留在弄懂特定客户端的库就显得十分重要。这样一来,开发者使用不同类型的库与协议进行沟通时就会容易的多。