队列结构
通常队列由rabbit_amqqueue_process 和backing_queue这两部分组成
:
rabbit_amqqueue_process
负责协议相关的消息处理,即接收生产者发布的消息、向消费者交付消息、处理消息的确认(包括生产端的confirm 和消费端的ack)等。backing_queue
是消息存储的具体形式和引擎,并向rabbit_amqqueue_process提供相关的接口以供调用。
如果消息投递的目的队列是空的,并且有消费者订阅了这个队列,那么该消息会直接发送给消费者,不会经过队列这一步。
而当消息无法直接投递给消费者时,需要暂时将消息存入队列,以便重新投递。
消息存入队列后,不是固定不变的,它会随着系统的负载在队列中不断地流动,消息的状态会不断发生变化。RabbitMQ 中的队列消息可能会处于以下4种状态:
alpha
: 消息内容(包括消息体、属性和headers)和消息索引都存储在内存中。beta
: 消息内容保存在磁盘中,消息索引保存在内存中。gamma
: 消息内容保存在磁盘中,消息索引在磁盘和内存中都有。delta
: 消息内容和索引都在磁盘中。
RabbitMQ在运行时会根据统计的消息传送速度定期计算一个当前内存中能够保存的最大消息数量(target_ram_count)
:
如果alpha状态的消息数量大于此值时,就会引起消息的状态转换,多余的消息可能会转换到beta状态、gamma状态或者delta状态。
区分这4种状态的主要作用是满足不同的内存和CPU需求:
- alpha状态最耗内存,但很少消耗CPU
- delta状态基本不消耗内存,但是需要消耗更多的CPU和磁盘I/O操作。
- delta状态需要执行两次I/O操作才能读取到消息,一次是读消息索引(从rabbit_queue_index 中),一次是读消息内容(从rabbit_ msg_store 中);
- beta和gamma状态都只需要一次IO操作就可以读取到消息(从rabbit_msg_store 中)。
对于普通的没有设置优先级和镜像的队列来说,backing_queue 的默认实现是rabbit_variable_queue, 其内部通过5个子队列Q1、Q2、Delta、Q3和Q4来体现消息的各个状态。
整个队列包括rabbit_amqqueue_process 和backing_queue 的各个子队列,队列的结构可以参考下图:
- Q1、Q4只包含alpha状态的消息
- Q2和Q3包含beta和gamma状态的消息
- Delta 只包含delta状态的消息。
一般情况下,消息按照Q1→Q2 -> Delta→Q3→Q4这样的顺序步骤进行流动,但并不是每一条消 息都一定会经历所有的状态,这个取决于当前系统的负载状况
。从Q1至Q4基本经历内存到磁盘,再由磁盘到内存这样的一个过程,如此可以在队列负载很高的情况下,能够通过将一部分消息由磁盘保存来节省内存空间,而在负载降低的时候,这部分消息又渐渐回到内存被消费者获取,使得整个队列具有很好的弹性。
消费者获取消息也会引起消息的状态转换。当消费者获取消息时,首先会从Q4中获取消息,如果获取成功则返回。如果Q4为空,则尝试从Q3中获取消息,系统首先会判断Q3是否为空,如果为空则返回队列为空,即此时队列中无消息。如果Q3不为空,则取出Q3中的消息,进而再判断此时Q3和Delta中的长度,如果都为空,则可以认为Q2、Delta、Q3、Q4全部为空,此时将Q1中的消息直接转移至Q4,下次直接从Q4中获取消息。如果Q3为空,Delta 不为空,则将Delta 的消息转移至Q3中,下次可以直接从Q3中获取消息。
在将消息从Delta转移到Q3的过程中,是按照索引分段读取的,首先读取某一段,然后判断读取的消息的个数与Delta中消息的个数是否相等,如果相等,则可以判定此时Delta中已无消息,则直接将Q2和刚读取到的消息一并放入到Q3中;如果不相等,仅将此次读取到的消息转移到Q3。
这里就有两处疑问,第一个疑问是:为什么Q3为空则可以认定整个队列为空?
试想一下,如果Q3为空,Delta不为空,那么在Q3取出最后一条消息的时候,Delta上的消息就会被转移到Q3,这样与Q3为空矛盾;如果Delta为空且Q2不为空,则在Q3取出最后一条消息时会将Q2的消息并入到Q3中,这样也与Q3为空矛盾;在Q3取出最后一条消息之后,如果Q2、Delta、Q3都为空,且Q1不为空时,则Q1的消息会被转移到Q4,这与Q4为空矛盾。其实这一番论述也解释了另一个问题:为什么Q3和Delta都为空时,则可以认为Q2、Delta、 Q3、Q4全部为空。
- 通常在负载正常时,如果消息被消费的速度不小于接收新消息的速度,对于不需要保证可靠不丢失的消息来说,极有可能只会处于alpha状态。
- 对于durable属性设置为true的消息,它一定会进入gamma状态,并且在开启publisher confirm 机制时,只有到了 gammma状态时才会确认该消息已被接收,若消息消费速度足够快、内存也充足,这些消息也不会继续走到下一个状态。
在系统负载较高时,已接收到的消息若不能很快被消费掉,这些消息就会进入到很深的队列中去,这样会增加处理每个消息的平均开销。因为要花更多的时间和资源处理“堆积”的消息,如此用来处理新流入的消息的能力就会降低,使得后流入的消息又被积压到很深的队列中继续增大处理每个消息的平均开销,继而情况变得越来越恶化,使得系统的处理能力大大降低。
应对这一问题一般有3种措施:
- 增加prefetch_count的值,即一次发送多条消息给消费者,加快消息被消费的速度
- 采用multiple ack,降低处理ack带来的开销
- 流量控制
惰性队列
RabbitMQ从3.6.0版本开始引入了惰性队列(Lazy Queue)的概念。惰性队列会尽可能地将消息存入磁盘中,而在消费者消费到相应的消息时才会被加载到内存中,它的一个重要的设计目标是能够支持更长的队列,即支持更多的消息存储。当消费者由于各种各样的原因(比如消费者下线、宕机,或者由于维护而关闭等)致使长时间内不能消费消息而造成堆积时,惰性队列就很有必要了。
默认情况下,当生产者将消息发送到RabbitMQ的时候,队列中的消息会尽可能地存储在内存之中,这样可以更加快速地将消息发送给消费者
。即使是持久化的消息,在被写入磁盘的同时也会在内存中驻留一份备份。当RabbitMQ需要释放内存的时候,会将内存中的消息换页至磁盘中,这个操作会耗费较长的时间,也会阻塞队列的操作,进而无法接收新的消息。虽然RabbitMQ的开发者们一直在升级相关的算法,但是效果始终不太理想,尤其是在消息量特别大的时候。
惰性队列会将接收到的消息直接存入文件系统中,而不管是持久化的或者是非持久化的
,这样可以减少了内存的消耗,但是会增加I/O的使用,如果消息是持久化的,那么这样的I/O操作不可避免,惰性队列和持久化的消息可谓是“最佳拍档”。注意如果惰性队列中存储的是非持久化的消息,内存的使用率会一直很稳定,但是重启之后消息一样会丢失。
队列具备两种模式: default和lazy。默认的为default模式,在3.6.0之前的版本无须做任何变更
。lazy模式即为惰性队列的模式,可以通过调用channel.queueDeclare方法的时候在参数中设置,也可以通过Policy的方式设置,如果一个队列同时使用这两种方式设置,那么Policy的方式具备更高的优先级。如果要通过声明的方式改变已有队列的模式,那么只能先删除队列,然后再重新声明一个新的。
在队列声明的时候可以通过x-queue-mode参数来设置队列的模式,取值为default和lazy。下面示例演示了一个惰性队列的声明细节:
Map<String, Object> args &#61; new HashMap<>();
args.put("x-queue-mode", "lazy");
channel.queueDeclare("myqueue", false, false, false, args);
对应的Policy设置方式为&#xff1a;
rabbitmqctl set_policy Lazy "^myqueue$" &#39;{"queue-mode":"lazy"}&#39; --apply-to-queues
惰性队列和普通队列相比&#xff0c;只有很小的内存开销。这里很难对每种情况给出一个具体的数值&#xff0c;但是我们可以类比一下:发送1千万条消息&#xff0c;每条消息的大小为1KB&#xff0c;并且此时没有任何的消费者&#xff0c;那么普通队列会消耗1.2GB的内存&#xff0c;而惰性队列只消耗1.5MB的内存。
据官方测试数据显示&#xff0c;对于普通队列&#xff0c;如果要发送1千万条消息&#xff0c;需要耗费801秒&#xff0c;平均发送速度约为13000条/秒。如果使用惰性队列&#xff0c;那么发送同样多的消息时&#xff0c;耗时是421秒&#xff0c;平均发送速度约为24000条/秒。出现性能偏差的原因是普通队列会由于内存不足而不得不将消息换页至磁盘。
如果有消费者消费时&#xff0c;惰性队列会耗费将近40MB的空间来发送消息&#xff0c;对于一个消费者的情况&#xff0c;平均的消费速度约为14000条/秒。
如果要将普通队列转变为惰性队列&#xff0c;那么我们需要忍受同样的性能损耗&#xff0c;首先需要将缓存中的消息换页至磁盘中&#xff0c;然后才能接收新的消息。反之&#xff0c;当将一个惰性队列转变为普通队列的时候&#xff0c;和恢复一个队列执行同样的操作&#xff0c;会将磁盘中的消息批量地导入到内存中。