本文大纲:
1. RabbitMQ简介
2. RabbitMQ安装与配置
3. C# 如何使用RabbitMQ
4. 几种Exchange模式
5. RPC 远程过程调用
6. RabbitMQ高可用集群搭建
一、RabbitMQ简介
1、介绍
RabbitMQ是一个由erlang开发的基于AMQP(Advanced Message Queue )协议的开源实现。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面都非常的优秀。是当前最主流的消息中间件之一。
RabbitMQ官网:http://www.rabbitmq.com
2、AMQP
AMQP是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,同样,消息使用者也不用知道发送者的存在。AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。
3、系统架构
消息队列的使用过程大概如下:
(1)客户端连接到消息队列服务器,打开一个channel。
(2)客户端声明一个exchange,并设置相关属性。
(3)客户端声明一个queue,并设置相关属性。
(4)客户端使用routing key,在exchange和queue之间建立好绑定关系。
(5) 客户端投递消息到exchange。exchange接收到消息后,就根据消息的
key和已经设置的binding,进行消息路由,将消息投递到一个或多个队列里。
如上图所示:AMQP里主要说两个组件:Exchange和Queue。绿色的X就是Exchange ,红色的是Queue ,这两者都在Server端,又称作Broker,这部分是RabbitMQ实现的,而蓝色的则是客户端,通常有Producer和Consumer两种类型。
4、几个概念
-
P: 为Producer,数据的发送方。
-
C:为Consumer,数据的接收方。
-
Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。
-
Queue:消息队列载体,每个消息都会被投入到一个或多个队列。
-
Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来。
-
Routing Key:路由关键字,exchange根据这个关键字进行消息投递。
-
vhost:虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离。
-
channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务。
二、RabbitMQ安装与配置
1、安装
RabbitMQ是建立在强大的Erlang OTP平台上,因此安装RabbitMQ之前要先安装Erlang。
erlang:http://www.erlang.org/download.html
RabbitMQ:http://www.rabbitmq.com/download.html
注意:
-
现在先别装最新的 3.6.3 ,本人在安装完最新的版本,queue 队列有问题,降到了 3.6.2 就解决了。
-
默认安装的RabbitMQ监听端口是:5672
2、配置
(1)安装完以后erlang需要手动设置ERLANG_HOME 的系统变量。
输入:set ERLANG_HOME=C:\Program Files\erl8.0
(2)激活RabbitMQ's Management Plugin
使用Rabbit MQ 管理插件,可以更好的可视化方式查看Rabbit MQ 服务器实例的状态,你可以在命令行中使用下面的命令激活。
输入:rabbitmq-plugins.bat enable rabbitmq_management
4、创建vhosts
创建vhosts,在admin页面,点击右侧Virtual Hosts:
将刚创建的OrderQueue分配给相关用户。
说明:前面我们提到过,RabbitMQ由Producer(生成者) 和Consumer(消费者)两部分组成。Weiz.Consumer就是Consumer(消费者),Weiz. Producer为Producer(生成者),Weiz.MQ为消息队列的通用处理类库。
3、项目搭建
(1)Weiz.MQ 项目,消息队列的通用处理类库,用于正在的订阅和发布消息。
-
通过nuget安装项目EasyNetQ 相关组件(略)
-
增加BusBuilder.cs管道创建类,主要负责链接RabbitMQ。
-
增加IProcessMessage类,定义了一个消息方法,用于消息传递
-
增加MQHelper类,用于正在的订阅和发布消息。
Consumer 消费者,使用的是Subscribe (订阅)的模式,所以,Weiz.Consumer客户端启动后,会自动创建connection,生成相关的exchange 和queue。
(2)启动Weiz. Producer 里的TestMQ.aspx 页面,往队列里面写一条消息。订阅的消费者立马就能拿到这条消息。
至此,C#向RabbitMQ消息队列发送消息已经简单完成。
四、几种Exchange模式
本节说些理论的东西——Exchange 的几种模式。
AMQP协议中的核心思想就是生产者和消费者隔离,生产者从不直接将消息发送给队列。生产者通常不知道是否一个消息会被发送到队列中,只是将消息发送到一个交换机。先由Exchange来接收,然后Exchange按照特定的策略转发到Queue进行存储。同理,消费者也是如此。Exchange 就类似于一个交换机,转发各个消息分发到相应的队列中。
RabbitMQ提供了四种Exchange模式:fanout、direct、topic、header 。 header模式在实际使用中较少,本文只对前三种模式进行比较。
1、Fanout Exchange
2、Direct Exchange
所有发送到Direct Exchange的消息被转发到RouteKey中指定的Queue。
Direct模式可以使用RabbitMQ自带的Exchange:default Exchange 。所以不需要将Exchange进行任何绑定(binding)操作 。消息传递时,RouteKey必须完全匹配,才会被队列接收,否则该消息会被抛弃。
3、Topic Exchange
所有发送到Topic Exchange的消息被转发到所有关心RouteKey中指定Topic的Queue上,Exchange 将RouteKey 和某Topic 进行模糊匹配。此时队列需要绑定一个Topic。可以使用通配符进行模糊匹配,符号“#”匹配一个或多个词,符号“*”匹配不多不少一个词。因此“log.#”能够匹配到“log.info.oa”,但是“log.*” 只会匹配到“log.error”。
所以,Topic Exchange 使用非常灵活。
基本概念:
Callback queue 回调队列,客户端向服务器发送请求,服务器端处理请求后,将其处理结果保存在一个存储体中。而客户端为了获得处理结果,那么客户在向服务器发送请求时,同时发送一个回调队列地址reply_to。
Correlation id 关联标识,客户端可能会发送多个请求给服务器,当服务器处理完后,客户端无法辨别在回调队列中的响应具体和那个请求时对应的。为了处理这种情况,客户端在发送每个请求时,同时会附带一个独有correlation_id属性,这样客户端在回调队列中根据correlation_id字段的值就可以分辨此响应属于哪个请求。
流程说明:
-
当客户端启动的时候,它创建一个匿名独享的回调队列。
-
在 RPC 请求中,客户端发送带有两个属性的消息:一个是设置回调队列的 reply_to 属性,另一个是设置唯一值的 correlation_id 属性。
-
将请求发送到一个 rpc_queue 队列中。
-
服务器等待请求发送到这个队列中来。当请求出现的时候,它执行他的工作并且将带有执行结果的消息发送给 reply_to 字段指定的队列。
-
客户端等待回调队列里的数据。当有消息出现的时候,它会检查 correlation_id 属性。如果此属性的值与请求匹配,将它返回给应用
5、完整代码
(1)创建两个控制台程序,作为RPC Server和RPC Client,引用RabbitMQ.Client
(2) RPC Server
(3)RPC Client
(4)分别运行Server和Client
六、RabbitMQ高可用集群
RabbitMQ是用erlang开发的,集群非常方便,因为erlang天生就是一门分布式语言,但其本身并不支持负载均衡。Rabbit模式大概分为以下三种:单一模式、普通模式、镜像模式。
-
单一模式:最简单的情况,非集群模式。
没什么好说的。
-
普通模式:默认的集群模式。
对于Queue来说,消息实体只存在于其中一个节点,A、B两个节点仅有相同的元数据,即队列结构。
当消息进入A节点的Queue中后,consumer从B节点拉取时,RabbitMQ会临时在A、B间进行消息传输,把A中的消息实体取出并经过B发送给consumer。
所以consumer应尽量连接每一个节点,从中取消息。即对于同一个逻辑队列,要在多个节点建立物理Queue。否则无论consumer连A或B,出口总在A,会产生瓶颈。
该模式存在一个问题就是当A节点故障后,B节点无法取到A节点中还未消费的消息实体。
如果做了消息持久化,那么得等A节点恢复,然后才可被消费;如果没有持久化的话,然后就没有然后了……
-
镜像模式:把需要的队列做成镜像队列,存在于多个节点,属于RabbitMQ的HA方案。
该模式解决了上述问题,其实质和普通模式不同之处在于,消息实体会主动在镜像节点间同步,而不是在consumer取数据时临时拉取。
该模式带来的副作用也很明显,除了降低系统性能外,如果镜像队列数量过多,加之大量的消息进入,集群内部的网络带宽将会被这种同步通讯大大消耗掉。
所以在对可靠性要求较高的场合中适用(后面会详细介绍这种模式,目前我们搭建的环境属于该模式)。
1、集群中的基本概念
RabbitMQ的集群节点包括内存节点、磁盘节点。顾名思义内存节点就是将所有数据放在内存,磁盘节点将数据放在磁盘。不过,如前文所述,如果在投递消息时,打开了消息的持久化,那即使是内存节点,数据还是安全的放在磁盘。
一个RabbitMQ集群中可以共享user、vhost、queue、exchange等,所有的数据和状态都是必须在所有节点上复制的,一个例外是那些当前只属于创建它的节点的消息队列,尽管它们可见且可被所有节点读取。RabbitMQ节点可以动态地加入到集群中,一个节点它可以加入到集群中,也可以从集群环集群进行一个基本的负载均衡。
集群中有两种节点:
-
内存节点:只保存状态到内存(一个例外的情况是:持久的queue的持久内容将被保存到disk)
-
磁盘节点:保存状态到内存和磁盘。
内存节点虽然不写入磁盘,但是它执行比磁盘节点要好。集群中,只需要一个磁盘节点来保存状态 就足够了
如果集群中只有内存节点,那么不能停止它们,否则所有的状态,消息等都会丢失。
思路:
那么具体如何实现RabbitMQ高可用,我们先搭建一个普通集群模式,在这个模式基础上再配置镜像模式实现高可用,Rabbit集群前增加一个反向代理,生产者、消费者通过反向代理访问RabbitMQ集群。
架构图如下:
分别查看下每个节点
这时我们可以看到每个节点的集群信息,分别有两个内存节点一个磁盘节点。
Step 7:
往任意一台集群节点里写入消息队列,会复制到另一个节点上,我们看到两个节点的消息队列数一致:
-p参数为vhost名称
这样RabbitMQ集群就正常工作了。
这种模式更适合非持久化队列,只有该队列是非持久的,客户端才能重新连接到集群里的其他节点,并重新创建队列。假如该队列是持久化的,那么唯一办法是将故障节点恢复起来。
为什么RabbitMQ不将队列复制到集群里每个节点呢?这与它的集群的设计本意相冲突,集群的设计目的就是增加更多节点时,能线性的增加性能(CPU、内存)和容量(内存、磁盘)。理由如下:
1. storage space: If every cluster node had a full copy of every queue, adding nodes wouldn’t give you more storage capacity. For example, if one node could store 1GB of messages, adding two more nodes would simply give you two more copies of the same 1GB of messages.
2. performance: Publishing messages would require replicating those messages to every cluster node. For durable messages that would require triggering disk activity on all nodes for every message. Your network and disk load would increase every time you added a node, keeping the performance of the cluster the same (or possibly worse).
当然RabbitMQ新版本集群也支持队列复制(有个选项可以配置)。比如在有五个节点的集群里,可以指定某个队列的内容在2个节点上进行存储,从而在性能与高可用性之间取得一个平衡。
3、镜像模式配置
上面配置RabbitMQ默认集群模式,但并不保证队列的高可用性,尽管交换机、绑定这些可以复制到集群里的任何一个节点,但是队列内容不会复制,虽然该模式解决一部分节点压力,但队列节点宕机直接导致该队列无法使用,只能等待重启,所以要想在队列节点宕机或故障也能正常使用,就要复制队列内容到集群里的每个节点,需要创建镜像队列。
我们看看如何镜像模式来解决复制的问题,从而提高可用性 。
Step 1:增加负载均衡器
关于负载均衡器,商业的比如F5的BIG-IP,Radware的AppDirector,是硬件架构的产品,可以实现很高的处理能力。但这些产品昂贵的价格会让人止步,所以我们还有软件负载均衡方案。互联网公司常用的软件LB一般有LVS、HAProxy、Nginx等。LVS是一个内核层的产品,主要在第四层负责数据包转发,使用较复杂。HAProxy和Nginx是应用层的产品,但Nginx主要用于处理HTTP,所以这里选择HAProxy作为RabbitMQ前端的LB。
HAProxy的安装使用非常简单,在Centos下直接yum install haproxy,然后更改/etc/haproxy/haproxy.cfg 文件即可,文件内容大概如下:
负载均衡器会监听5672端口,轮询我们的两个内存节点172.16.3.107、172.16.3.108的5672端口,172.16.3.32为磁盘节点,只做备份不提供给生产者、消费者使用,当然如果我们服务器资源充足情况也可以配置多个磁盘节点
,这样磁盘节点除了故障也不会影响,除非同时出故障。
Step 2:配置策略
使用Rabbit镜像功能,需要基于RabbitMQ策略来实现,政策是用来控制和修改群集范围的某个vhost队列行为和Exchange行为。
在cluster中任意节点启用策略,策略会自动同步到集群节点
# rabbitmqctl set_policy -p hrsystem ha-allqueue"^" '{"ha-mode":"all"}'
这行命令在vhost名称为hrsystem创建了一个策略,策略名称为ha-allqueue,策略模式为 all 即复制到所有节点,包含新增节点,策略正则表达式为 “^” 表示所有匹配所有队列名称。
例如rabbitmqctl set_policy -p hrsystem ha-allqueue "^message" '{"ha-mode":"all"}'
注意:"^message" 这个规则要根据自己修改,这个是指同步"message"开头的队列名称,我们配置时使用的应用于所有队列,所以表达式为"^"
官方set_policy说明参见
set_policy [-p vhostpath] {name} {pattern} {definition} [priority]
(http://www.rabbitmq.com/man/rabbitmqctl.1.man.html)
ha-mode:
Step 3:
创建队列时需要指定ha 参数,如果不指定x-ha-prolicy 的话将无法复制。
下面为C#代码片段:
Step 4:
客户端使用负载服务器172.16.3.110 (panyuntao3)发送消息,队列会被复制到所有节点,当然策略也可以配置制定某几个节点,这时任何节点故障 、或者重启将不会影响我们正常使用某个队列。到这里我们完成了高可用配置(所有节点都宕机那没有办法了)。
使用RabbitMQ管理端可以看到集群镜像模式中对列状态
参考:
-
http://www.rabbitmq.com/clustering.html
-
http://www.rabbitmq.com/ha.html
-
http://www.rabbitmq.com/parameters.html#policies
-
http://www.nsbeta.info/archives/555
-
http://blog.csdn.net/linvo/article/details/7793706
-