热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

mq获取消息慢_rabbitmq消息发送卡顿的排查(springamqp)

问题描述线上系统出现SLOW慢请求告警,经排查,发现为rabbitmq的消息发送卡顿引起,卡顿时间几秒钟~几十秒不等。由于只卡顿了1分钟左

问题描述

线上系统出现SLOW慢请求告警,经排查,发现为rabbitmq的消息发送卡顿引起,卡顿时间几秒钟~几十秒不等。由于只卡顿了1分钟左右,笔数不多(30笔以内),且后续就没了,所以只是排查了下系统各项指标(JVM、mq状态、磁盘等),都正常。就没有继续深入了排查了。 但过了一段时间,又出现问题了~~(果然每个线上问题都要仔细追踪- -#)

发生频率:每次系统重启后,都会出现一次,且每个实例出现一次后,后续就不在出现了,直到下一次系统实例重启。 在出现卡顿时,当前实例仍有其他的消息发送是正常的。

单个消息大小:1KB以内。

使用情况描述

rabbitmq:3.7.17, erlang:22.0.7

采用spirng-amqp包进行封装与rabbitmq的交互,版本为1.7.3.RELEASE

使用Spring-amqp的CachingConnectionFactory缓存连接工厂

调整生产者的cache-mode为Connection模式(原先为CHANNEL)

调整生产者的核心连接数量为24个,最大连接数量为48个

设置生产者连接的channel.checkout-timeout为3秒(该参数的旨意是从CachingConnectionFactory获取connection/channel的等待时间,CachingConnectionFactory实际就是个connection或者channel的缓存池子)

排查一: rabbitmq的per-connection连接流控引起的客户端消息发送卡顿

由于通过日志已经能够确定是消息发送这一步产生了卡顿,那初步怀疑就是由于rabbitmq的流控机制引起的卡顿,但根据监控,当时卡顿时间内,rabbitmq的所有connection/channel的状态都是正常的,没有存在flow/block状态的情况。(rabbitmq的监控API:{rabbitmq-admin管理后台域名}/api/index.html)

所以排除是rabbitmq服务器的流控引起的

排查二: 应用客户端自己的问题

由于出现问题是某次spring-amqp的缓存模式cache-mode从channel改为了connection(PS:此处调整,是因为channel模式为单连接,所有的MQ消息,无论各种业务场景都是共用了这个连接,之前发生了一次per-connection的流控,导致这个连接下的所有消息都发不出去,为了降低业务影响,将单连接改造为多连接模式)

所以把目光转向了spring-amqp的使用上,经过代码排查,果然是spring-amqp的一个BUG。。

在使用CachingConnectionFactory的createConnection()创建rabbitmq连接的时候,会存在一个同步锁(由于该工厂就是个缓存池,所以需要同步锁来确保缓存的个数不超过配置的MAX个数),知道连接创建完毕后,才会释放该锁,那么如果存在某个连接创建速度过慢,就会导致后续的连接创建时间也卡顿,都在等待该同步锁的释放

如果cache-mode是connection模式,根据源码展示的,只要从连接缓存池没有拿到一个空闲的connection,就直接等待一个checkout-timeout的时间(配置的是3秒)。这就代表,每一个连接的创建,都必须等待3秒钟。。这就解释了为啥线上的卡顿基本都是3、6、9秒左右。

修复代码也很简单:

但升级spring-amqp包到已修复的版本是2.1.x 中间版本跨度太大, 且2.1.x需要依赖spring 5,我们spring的基础版本还是4.x,贸然升级风险太大

解决

结合业务场景,此处MQ的消息场景是用于将交易请求做异步化处理,达到一个削峰填谷的目的。 改造为多连接模式,是由于之前单连接模式发生了流控后,导致使用该连接的所有业务场景均block。虽然改为了多连接,但实际上还是存在各个业务共享某个连接的情况,隔离性还不是特别高,应该是按重要程度进行分级隔离使用各自的连接。

经过考量,决定使用Spring提供的RoutingConnectionFactory,通过将业务场景码来分隔实际的ConnectionFactory(实际还是CachingConectionFactory,采用单连接channel模式),来达到业务场景隔离连接。

PS:连接并不是越多越好,我们此处只是将业务高等级的区分各个连接,其余实时性要求不是那么高的,单独在共享使用另外一个连接

RoutingConnectionFactory,使用此方式,可根据Expression灵活将连接(connection)分为多个,并根据自己场景定制(例如生产和消费连接分离,生产,消费者又按照不同维度分多个连接),此时,ConnectionFactory的实现仍然使用CachingConnectionFactory的channel模式。这样,可以形成不同场景使用不同的连接(避免相互影响),同一连接又可为多个channel共享(提高性能)

附上CachingConnectionFactory的createConnection()方法源码

org.springframework.amqp.rabbit.connection.CachingConnectionFactory

@Override

public final Connection createConnection() throws AmqpException {

Assert.state(!this.stopped, "The ApplicationContext is closed and the ConnectionFactory can no longer create connections.");

synchronized (this.connectionMonitor) {

if (this.cacheMode == CacheMode.CHANNEL) {

if (this.connection.target == null) {

this.connection.target = super.createBareConnection();

// invoke the listener *after* this.connection is assigned

if (!this.checkoutPermits.containsKey(this.connection)) {

this.checkoutPermits.put(this.connection, new Semaphore(this.channelCacheSize));

}

this.connection.closeNotified.set(false);

getConnectionListener().onCreate(this.connection);

}

return this.connection;

}

else if (this.cacheMode == CacheMode.CONNECTION) {

ChannelCachingConnectionProxy connection = findIdleConnection();

long now = System.currentTimeMillis();

while (connection == null && System.currentTimeMillis() - now

if (countOpenConnections() >= this.connectionLimit) {

try {

this.connectionMonitor.wait(this.channelCheckoutTimeout);

connection = findIdleConnection();

}

catch (InterruptedException e) {

Thread.currentThread().interrupt();

throw new AmqpException("Interrupted while waiting for a connection", e);

}

}

}

if (connection == null) {

if (countOpenConnections() >= this.connectionLimit

&& System.currentTimeMillis() - now >= this.channelCheckoutTimeout) {

throw new AmqpTimeoutException("Timed out attempting to get a connection");

}

connection = new ChannelCachingConnectionProxy(super.createBareConnection());

if (logger.isDebugEnabled()) {

logger.debug("Adding new connection '" + connection + "'");

}

this.allocatedConnections.add(connection);

this.allocatedConnectionNonTransactionalChannels.put(connection, new LinkedList());

this.channelHighWaterMarks.put(ObjectUtils.getIdentityHexString(

this.allocatedConnectionNonTransactionalChannels.get(connection)), new AtomicInteger());

this.allocatedConnectionTransactionalChannels.put(connection, new LinkedList());

this.channelHighWaterMarks.put(

ObjectUtils.getIdentityHexString(this.allocatedConnectionTransactionalChannels.get(connection)),

new AtomicInteger());

this.checkoutPermits.put(connection, new Semaphore(this.channelCacheSize));

getConnectionListener().onCreate(connection);

}

else if (!connection.isOpen()) {

try {

refreshProxyConnection(connection);

}

catch (Exception e) {

this.idleConnections.addLast(connection);

}

}

else {

if (logger.isDebugEnabled()) {

logger.debug("Obtained connection '" + connection + "' from cache");

}

}

return connection;

}

}

return null;

}



推荐阅读
  • 本文详细探讨了在Web开发中常见的UTF-8编码问题及其解决方案,包括HTML页面、PHP脚本、MySQL数据库以及JavaScript和Flash应用中的乱码问题。 ... [详细]
  • 服务器虚拟化存储设计,完美规划储存与资源,部署高性能虚拟化桌面
    规划部署虚拟桌面环境前,必须先估算目前所使用实体桌面环境的工作负载与IOPS性能,并慎选储存设备。唯有谨慎估算贴近实际的IOPS性能,才能 ... [详细]
  • 本文探讨了一个Web工程项目的需求,即允许用户随时添加定时任务,并通过Quartz框架实现这些任务的自动化调度。文章将介绍如何设计任务表以存储任务信息和执行周期,以及如何通过一个定期扫描机制自动识别并加载新任务到调度系统中。 ... [详细]
  • 本文由公众号【数智物语】(ID: decision_engine)发布,关注获取更多干货。文章探讨了从数据收集到清洗、建模及可视化的全过程,介绍了41款实用工具,旨在帮助数据科学家和分析师提升工作效率。 ... [详细]
  • 本文详细介绍了如何在Mac操作系统中为IntelliJ IDEA配置更高的内存限制,以提高开发效率和性能。 ... [详细]
  • 在使用 Nginx 作为服务器时,发现 Chrome 能正确从缓存中读取 CSS 和 JS 文件,而 Firefox 却无法有效利用缓存,导致加载速度显著变慢。 ... [详细]
  • 入门指南:使用FastRPC技术连接Qualcomm Hexagon DSP
    本文旨在为初学者提供关于如何使用FastRPC技术连接Qualcomm Hexagon DSP的基础知识。FastRPC技术允许开发者在本地客户端实现远程调用,从而简化Hexagon DSP的开发和调试过程。 ... [详细]
  • 本文提供了一个SQL脚本,用于在Microsoft SQL Server中创建一个数据字典视图,该视图详细列出了表名、表描述、字段名称、字段描述、字段类型、字段大小、字段精度、是否可为空、默认值以及是否为标识或主键等信息。 ... [详细]
  • Hadoop集群搭建:实现SSH无密码登录
    本文介绍了如何在CentOS 7 64位操作系统环境下配置Hadoop集群中的SSH无密码登录,包括环境准备、用户创建、密钥生成及配置等步骤。 ... [详细]
  • 本文探讨了Android系统中联系人数据库的设计,特别是AbstractContactsProvider类的作用与实现。文章提供了对源代码的详细分析,并解释了该类如何支持跨数据库操作及事务处理。源代码可从官方Android网站下载。 ... [详细]
  • 本文详细介绍了如何使用Linux下的mysqlshow命令来查询MySQL数据库的相关信息,包括数据库、表以及字段的详情。通过本文的学习,读者可以掌握mysqlshow命令的基本语法及其常用选项。 ... [详细]
  • STM32代码编写STM32端不需要写关于连接MQTT服务器的代码,连接的工作交给ESP8266来做,STM32只需要通过串口接收和发送数据,间接的与服务器交互。串口三配置串口一已 ... [详细]
  • 在测试软件或进行系统维护时,有时会遇到电脑蓝屏的情况,即便使用了沙盒环境也无法完全避免。本文将详细介绍常见的蓝屏错误代码及其解决方案,帮助用户快速定位并解决问题。 ... [详细]
  • 本文详细介绍了如何利用 Bootstrap Table 实现数据展示与操作,包括数据加载、表格配置及前后端交互等关键步骤。 ... [详细]
  • Logging all MySQL queries into the Slow Log
    MySQLoptionallylogsslowqueriesintotheSlowQueryLog–orjustSlowLog,asfriendscallit.However,Thereareseveralreasonstologallqueries.Thislistisnotexhaustive:Belowyoucanfindthevariablestochange,astheyshouldbewritteninth ... [详细]
author-avatar
夜翊灬瞳_398
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有