热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

提升RabbitMQ消费速度的一些实践

RabbitMQ是一个开源的消息中间件,自带管理界面友好、开发语言支持广泛、没有对其它中间件的依赖,而且社区非常活跃,特别适合中小型企业拿来就用。这篇文章主要探讨提升RabbitM

RabbitMQ是一个开源的消息中间件,自带管理界面友好、开发语言支持广泛、没有对其它中间件的依赖,而且社区非常活跃,特别适合中小型企业拿来就用。这篇文章主要探讨提升RabbitMQ消费速度的一些方法和实践,比如增加消费者、提高Prefetch count、多线程处理、批量Ack等。


增加消费者


这个道理比较容易理解,多个人搬砖的速度肯定比一个人要快很多。

不过实际情况中还需要面对一些技术挑战,比如后端处理能力、并发冲突,以及处理顺序。

后端处理能力:比如多个消费者都要操作数据库,那么数据库连接的并发数和读写吞吐量就是后端处理能力,如果达到了数据库的最大处理能力,增加再多的消费者也没有用,甚至会因为数据库拥塞导致整体消费速度的下降。这个问题还存在另一种情况,就是消费者是否真正的发挥了后端服务的处理能力,比如使用Redis时是否采用了多线程、IO复用等方式来进一步提升吞吐量。

并发冲突:比如两个消费者都要去修改用户的积分,单个消费者的做法可能就是取出来、改下字段的值、最后再update到数据库,多个消费者时如果同时取出了相同的数据,还这样处理的话就会出问题了。这时候可能需要修改下SQL语句,直接在SQL语句中修改积分,由数据库写入事务来处理并发冲突;或者搞一个分布式锁,对于具体的某个用户同时只能有一个消费者来处理其积分。

处理顺序:如果消息需要被顺序处理,那么各个消费者之间还需要增加一个同步机制。比如基于GPS定位的电子围栏,在出围栏的某个时段,先产生了围栏内定位消息、然后产生了围栏外定位消息;如果围栏外定位消息先被一个消费者处理,则判定为出围栏,这没有问题;然后围栏内定位消息被另一个消费者处理,则会被判定为入围栏,这个就属于误判了。这时候可能要同步一个已处理定位时间,早于这个时间的定位就抛弃掉;或者同一个设备的定位消息通过某种算法控制只能由某个消费者进行处理。

解决后边两个问题的方法不可避免的要引入多个消费者之间的协商机制,如果这些协商机制设计不好会对处理速度带来很大影响。因此多人搬砖速度快的前提是多个人搬砖时不需要大家频繁的坐下来协商谁搬哪块砖,否则就会浪费很多时间在相互协调上,反而不能提升搬砖的速度。

所以通过增加消费者提升消费速度得以成立的前提是消费者业务并发处理能力要足够,消费者依赖的后端服务处理能力也要足够。这是此种方式的关键点。


提高Prefetch count


消息消费速度主要受到发送消息时间、消费者处理时间、消息Ack时间这几个时间的影响,如果一个消息走完这个流程再发送另一个的话,效率将会非常低。可以让消息在这几个时间内恰当的分配,让消息总是连续不断的被消费者接收处理,就可以提升消费者的消费速度。

根据如上描述,有些消息可能正在被消费者处理,有些可能在等待消费者处理,有的消息可能还在网络传输中,而如果不限制传输的数量,消费者端可能因处理能力补足会堆积大量的消息,首先内存使用将不可控制,其次此时也无法将这些消息再分配给别的消费者。因此才有了Prefetch count,用于控制消息发送给消费者的速度;这个方案需要配合Ack使用,消费者回复消息Ack后,RabbitMQ才会继续发送同等数量的消息到消费者。提高Prefetch count到一个合适的值可以提升消息的消费速度。这个值的设定可能还要实时参考上边提到的三个时间,这有点类似TCP的流控措施。这个值的计算方法请看下文:


RabbitMQ关于吞吐量,延迟和带宽的一些理论


参考文档:https://blog.csdn.net/gbbqrglvir3dyi82/article/details/78663828

多线程处理


多线程处理和增加消费者有异曲同工之妙。多线程处理不需要建立多个到RabbitMQ的连接,它在收到队列消息后将其放入不同的线程中进行处理,这样进程中就会有多个消息同时处理,增加了消费吞吐量,从而提升了消费速度。

来看一个例子:

consumer.Received += (o, e) =>
{
ThreadPool.QueueUserWorkItem(new WaitCallback(ProcessSingleContextMessage), e);
};

 

在这个例子中波斯码将收到的消息放入线程池队列进行处理,注意这里需要配合上一节提到的Prefetch count,设置一个合适的值,消费者就可以同时处理多条消息了。

多线程处理也存在多消费者处理时的问题,只不过在一个进程中处理并发冲突和消息顺序的成本可能更低一些。下边的代码片段展示了一个解决消息顺序处理问题的方案:

// 接收消息存入列表,当接收数量达到prefetchCount/2时就加入处理队列;
// 1/2是考虑了消息从RabbitMQ到消费者的传输时间,不需要等所有的消息都到达了才开始处理。
consumer.Received += (o, e) =>
{
lock(receiveLocker){
basicDeliverEventArgsList.Add(e);
if (basicDeliverEventArgsList.Count >= prefetchCount/2)
{
var deliverEventArgs = basicDeliverEventArgsList.ToArray();
basicDeliverEventArgsList.Clear();
EnProcessQueue(deliverEventArgs);
}
}
};
// 此处省略数据出队列的代码,请自行脑补
....
// 然后这个方法是用来处理消息的,将消息根据数据Key分成若干组,放到多个任务中并行处理;
// 相同数据Key的消息将分配到一个组中,在这个组中数据被顺序处理
private void Process(BasicDeliverEventArgs[] args)
{
if (args.Length <= 0)
{
return;
}
try
{
var tasks = CreateParallelProcessTasksByDataKey(args);
Task.WaitAll(tasks);
}
catch (Exception ex)
{
ToLog("处理任务发生异常", ex);
}
}
// 创建并行处理多条消息的任务
private Task[] CreateParallelProcessTasksByDataKey(BasicDeliverEventArgs[] args)
{
// 根据dataKey进行分组,dataKey可以放到消息的header中进行传输,这里就不给出具体的分组方法了
Dictionary> eDic = GetMessgeGroupByDataKey(args);
// 任务数量
var paralleTaskNum = this.parallelNum;
if (paralleTaskNum > eDic.Count)
{
paralleTaskNum = eDic.Count;
}
// 每个任务处理的消息数量
var perTaskNum = (int)Math.Ceiling(args.Length / (double)paralleTaskNum);
// 任务数组
List tasks = new List();
var taskArgs = new List();
for (int j = eDic.Count - 1; j >= 0; j--)
{
var currentElement = eDic.ElementAt(j);
taskArgs.AddRange(currentElement.Value);
eDic.Remove(currentElement.Key);
if (taskArgs.Count >= perTaskNum || j == 0)
{
// 创建任务,并处理分配的消息
var taskList = taskArgs.Select(d => d).ToList();
taskArgs.Clear();
var task = Task.Factory.StartNew(() =>
{
// 这这里处理分组中的消息
...
});
tasks.Add(task);
}
}
return tasks.ToArray();
}

 

上边这段代码中解决问题的关键就是将消息进行分组,同组内的消息顺序处理,分组间并行处理,既通过多线程提升了消息整体的处理速度,又能支持消息的顺序处理。


批量Ack


这种方式有效的原理是:每条消息分别Ack的情况下,RabbitMQ收到一个Ack才发送一条消息,这中间就会有很多的时间在等待Ack回来,通过批量Ack的方式,减少了很多Ack传输的时间。注意这里隐含的方式是RabbitMQ通过设置的Prefetch count连续向消费者发送多条消息,否则这个批量就没意义了。

下边的代码片段给出其使用方式:

channel.BasicAck(e.DeliveryTag, true);

 

第2个参数为true就是指示采用批量Ack的方式,凡是delivery-­tag比第1个参数小的消息都会被Ack。

这里需要注意:如果消费者在处理某条消息时失败了,业务上又要求不能丢失任何消息,这时就不能对所有的消息进行批量Ack,否则RabbitMQ就不会再次投递这条消息了,这需要根据自己的实际情况进行取舍。解决此问题的一个简单方法是,跟踪所有消息的处理结果,如果全部成功则使用批量Ack,如果部分成功则有两个选择:如果不关注顺序则退化为每个消息发送Ack或Reject的方式;如果关注顺序则本次接收到Prefetch count数量的消息全部nack,否则reject的消息再次投递时顺序就不对了,这时候业务还要做好处理重复数据的逻辑。


总结

通过分析上边的这些方法,在使用RabbitMQ消费时可以遵循这样一个路径:



  1. 启用Prefetch count设置;

  2. 先1个消费者,1次只接收1条,处理完毕后再传输下一条,这样可以避免并发冲突和消息顺序问题;

  3. 如果消费速度不满足要求,则1次接收多条,按接收顺序处理;

  4. 如果消费速度还是不满足要求,则1次接收多条,并行处理;

  5. 如果消费速度还是不满足要求,则启动多个消费者,并行处理。

  6. 如果消费速度还是不满足要求,改需求,或者换别的中间件。

在这个过程中需要始终关注优化消费者及后端程序处理能力,比如优化SQL语句、使用缓存、使用负载均衡等等,加快处理速度就能提升消费速度,而且很多时候就是程序处理太耗时了。

关于重复数据、并发冲突、顺序处理问题的处理:



  • 随时做好处理重复数据的准备,因为不只消费者端可能会触发消息的重复投递,发送端也可能重复发送消息,这个很难避免。

  • 对于并发冲突问题,消费者进程内可以使用锁,跨消费者引入第三方机制来处理,比如使用Redis原子操作、数据库原子操作或者分布式锁。

  • 对于顺序处理问题,最好没有这个需求;在同一个消费者内可以分组处理;在多个消费者时使用队列分组,每个队列绑定不同的Route key,不同Route key代表的消息之间没有顺序关联。波斯码再次提醒还要注意处理失败时的逻辑,避免重新投递消息的顺序问题。



推荐阅读
  • 本文介绍了进程的基本概念及其在操作系统中的重要性,探讨了进程与程序的区别,以及如何通过多进程实现并发和并行。文章还详细讲解了Python中的multiprocessing模块,包括Process类的使用方法、进程间的同步与异步调用、阻塞与非阻塞操作,并通过实例演示了进程池的应用。 ... [详细]
  • 本文通过对OkHttp源码的详细解读,旨在帮助读者理解其核心执行流程,特别是同步与异步请求的处理方式。文中不仅涵盖了基本的使用示例,还深入探讨了OkHttp的核心功能——拦截器链的工作原理。 ... [详细]
  • 深入解析线程池的工作原理与实际应用
    本文详细探讨了线程池的核心概念、工作原理及其在实际开发中的应用,包括不同类型的线程池创建方式及其适用场景。 ... [详细]
  • Oracle中打开10046Trace的各种方法10046trace的跟踪等级10046是一个Oracle的内部事件(event),通过设置这个事件可以得到Oracl ... [详细]
  • 本文探讨了在Qt框架下实现TCP多线程服务器端的方法,解决了一个常见的问题:服务器端仅能与最后一个连接的客户端通信。通过继承QThread类并利用socketDescriptor标识符,实现了多个客户端与服务器端的同时通信。 ... [详细]
  • 深入浅出:Hadoop架构详解
    Hadoop作为大数据处理的核心技术,包含了一系列组件如HDFS(分布式文件系统)、YARN(资源管理框架)和MapReduce(并行计算模型)。本文将通过实例解析Hadoop的工作原理及其优势。 ... [详细]
  • 基于OpenCV的小型图像检索系统开发指南
    本文详细介绍了如何利用OpenCV构建一个高效的小型图像检索系统,涵盖从图像特征提取、视觉词汇表构建到图像数据库创建及在线检索的全过程。 ... [详细]
  • 构建Python自助式数据查询系统
    在现代数据密集型环境中,业务团队频繁需要从数据库中提取特定信息。为了提高效率并减少IT部门的工作负担,本文探讨了一种利用Python语言实现的自助数据查询工具的设计与实现。 ... [详细]
  • 深入解析mt_allocator内存分配器(二):多线程与单线程场景下的实现
    本文详细介绍了mt_allocator内存分配器在多线程和单线程环境下的实现机制。该分配器以2的幂次方字节为单位分配内存,支持灵活的配置和高效的性能。文章分为内存池特性描述、内存池实现、单线程内存池实现、内存池策略类实现及多线程内存池实现等部分,深入探讨了内存池的初始化、内存分配与回收的具体实现。 ... [详细]
  • 深入解析C++ Atomic编程中的内存顺序
    在多线程环境中,为了防止多个线程同时修改同一数据导致的竞争条件,通常会使用内核级同步对象,如事件、互斥锁和信号量等。然而,这些方法往往伴随着高昂的上下文切换成本。本文将探讨如何利用C++11中的原子操作和内存顺序来优化多线程编程,减少不必要的开销。 ... [详细]
  • 本文介绍了推荐系统的基本概念及其在个性化服务中的重要作用,重点探讨了协同过滤算法的工作原理,包括基于用户的协同过滤和基于物品的协同过滤两种方式,并详细解释了几种常见的相似度计算方法。 ... [详细]
  • 本文探讨了在Python中多线程与多进程的性能差异,特别是在处理CPU密集型任务和I/O密集型任务时的表现。由于全局解释器锁(GIL)的存在,多线程在利用多核CPU方面表现不佳,而多进程则能有效利用多核资源。 ... [详细]
  • 本文详细介绍了如何在PHP中使用Memcached进行数据缓存,包括服务器连接、数据操作、高级功能等。 ... [详细]
  • Android 开发技巧:使用 AsyncTask 实现后台任务与 UI 交互
    本文详细介绍了如何在 Android 应用中利用 AsyncTask 来执行后台任务,并及时将任务进展反馈给用户界面,提高用户体验。 ... [详细]
  • SSE图像算法优化系列三:超高速导向滤波实现过程纪要(欢迎挑战)
    自从何凯明提出导向滤波后,因为其算法的简单性和有效性,该算法得到了广泛的应用,以至于新版的matlab都将其作为标准自带的函数之一了&#x ... [详细]
author-avatar
SCY瑶_450
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有