1. 什么是AMQP?
在异步通讯中,消息不会立刻到达接收方,而是被存放到一个容器中,当满足一定的条件之后,消息会被容器发送给接收方,这个容器即消息队列,而完成这个功能需要双方和容器以及其中的各个组件遵守统一的约定和规则,AMQP就是这样的一种协议,消息发送与接受的双方遵守这个协议可以实现异步通讯。这个协议约定了消息的格式和工作方式。
2. 为什么使用AMQP
为什么使用AMQP或者AMQP解决了什么问题?
在分布式的系统中,子系统之间如果使用socket连接进行通讯,有很多问题需要解决。比如:
1)信息的发送者和接受者如何维持这个连接?如果一方中断,这期间的数据如何防止丢失?
2)如何降低发送者和接受者的耦合度?
3)如何让优先级高的接受者先接到数据?
4)如何做到load balance? 均衡接受者的负载?高并发场景
5)如何将信息发送到相关的接收者,如果接受者订阅了不同的数据,如何正确的分发到接受者?
6)如何做到可扩展,将通信模块发到集群上去?异构系统
7)如何保证接受者接到了完整,正确或是有序的数据?
AMQP解决了这些问题。与此同时,基于AMQP实现的产品相比其他类似产品(AcitveMQ)有着自己的特点。基于AMQP的RabbitMQ具有路由灵活,消息可靠等特点,如果路由策略多样化和消息需要可靠传输的需求的情况下可考虑使用基于AMQP的产品。
3. AMQP 的模型和原理3.1 AMQP 中包含的主要元素
生产者(Producer):向Exchange发布消息的应用。 消费者(Consumer):从消息队列中消费消息的应用。
消息队列(Message Queue):服务器组件,用于保存消息,直到发送给消费者。 消息(Message):传输的内容。
交换器(exchange):路由组件,接收Producer发送的消息,并将消息路由转发给消息队列。
虚拟主机(Virtual Host): 一批交换器,消息队列和相关对象。虚拟主机是共享相同身份认证和加密环境的独立服务器域。
Broker :AMQP的服务端称为Broker。
连接(Connection):一个网络连接,比如TCP/IP套接字连接。
信道(Channel):多路复用连接中的一条独立的双向数据流通道,为会话提供物理传输介质。
绑定器(Binding):消息队列和交换器直接的关联。
3.2 AMQP 的组件结构图
3.3 AMQP 如何实现通信的
(1)建立连接Connection。由producer和consumer创建连接,连接到broker的物理节点上。
(2)建立消息Channel。Channel是建立在Connection之上的,一个Connection可以建立多个Channel。
producer连接Virtual Host 建立Channel,Consumer连接到相应的queue上建立Channel。
(3)发送消息。由Producer发送消息到Broker中的exchange中。
(4)路由转发。exchange收到消息后,根据一定的路由策略,将消息转发到相应的queue中去。
(5)消息接收。Consumer会监听相应的queue,一旦queue中有可以消费的消息,queue就将消息发送给Consumer端。
(6)消息确认。当Consumer完成某一条消息的处理之后,需要发送一条ACK消息给对应的Queue。Queue收到ACK信息后,才会认为消息处理成功,并将消息从Queue中移除;如果在对应的Channel断开后,Queue没有收到这条消息的ACK信息,该消息将被发送给另外的Channel。至此一个消息的发送接收流程走完了。消息的确认机制提高了通信的可靠性。
3.4 exchange 与 Queue 的路由机制
exchange 将消息发送到哪一个queue是由exchange type 和bing 规则决定的,目前常用的有3种exchange,
Direct exchange, Fanout exchange, Topic exchange 。
Direct exchange 直接转发路由,其实现原理是通过消息中的routkey,与queue 中的routkey 进行比对,若二者匹配,则将消息发送到这个消息队列。 Fanout exchange
复制分发路由,该路由不需要routkey,当exchange收到消息后,将消息复制多份转发给与自己绑定的消息队列。
topic exchange 通配路由,是direct exchange的通配符模式,消息中的routkey可以写成通配的模式,exchange
支持“#”和“*” 的通配。收到消息后,将消息转发给所有符合匹配表达式的queue。
需要注意的一点只有queue具有保持消息的功能,exchange不能保存消息。
4. AQMP 的应用场景AQMP是实现消息机制的一种协议,消息队列主要有以下几种应用场景:4.1 异步处理
比如公司新入职一个员工,需要开通系统账号,有几件事情要做,开通系统账号,发短信通知用户,发邮件给员工,在公司内部通讯系统中发送消息给员工。其中发短信,发邮件,发内部通讯系统消息,这三件事情可以串行也可以并行,并行的好处就是可以提高效率,这时可以应用MQ来实现并行。
4.2 应用解耦
在公司内部系统中,有人事系统,OA系统,财务系统,外围应用系统等等,当人事发生变动的时候(离职入职调岗),人事系统需要将这些变动通知给其他系统,这时只需人事系统发送一条消息,各个外围系统订阅该消息,就可得知人事变动,与实时服务调用相比,如果人事系统挂掉,各个外围系统不会受到影响,继续运行。
4.3 流量缓冲
在有些流量会瞬间暴增的场景下,如秒杀,为了防止流量突然增大而使得应用挂掉,可以引入MQ,将请求存入MQ中,如果超过了MQ的长度,就把请求丢弃掉,这样来限制流量。
4.4 日志处理
将消息队列引入到日志处理中,如kafka的应用,解决了大量日志的传输问题。日志客户端(生产者)负责采集日志数据,并定期写入kafka队列,kafka负责接收,存储和转发日志,日志处理系统(消费者)订阅并消费kafka中的日志数据。
5. AMQP 与 RabbitMQ
AMQP 是一种协议, RabbitMQ是一个由erlang开发的AMQP的开源实现,目前使用比较广泛的MQ有RabbitMQ,ActiveMQ,KafKa等等,其中ActiveMQ是基于JMS的一个开源实现,JMS 是一个接口标准或者说是一个API消息服务的规范(JAVA Message Service,java消息服务),KafKa是一种高吞吐量的分布式发布订阅消息系统,通常有吞吐量需求的日志处理和日志聚合应用会使用Kafka,性能要优于Rabbit,但是稳定性和可靠性相对而言RabbitMQ要成熟一些。