热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

mq获取消息慢_rabbitmq消息发送卡顿的排查(springamqp)

问题描述线上系统出现SLOW慢请求告警,经排查,发现为rabbitmq的消息发送卡顿引起,卡顿时间几秒钟~几十秒不等。由于只卡顿了1分钟左

问题描述

线上系统出现SLOW慢请求告警,经排查,发现为rabbitmq的消息发送卡顿引起,卡顿时间几秒钟~几十秒不等。由于只卡顿了1分钟左右,笔数不多(30笔以内),且后续就没了,所以只是排查了下系统各项指标(JVM、mq状态、磁盘等),都正常。就没有继续深入了排查了。 但过了一段时间,又出现问题了~~(果然每个线上问题都要仔细追踪- -#)

发生频率:每次系统重启后,都会出现一次,且每个实例出现一次后,后续就不在出现了,直到下一次系统实例重启。 在出现卡顿时,当前实例仍有其他的消息发送是正常的。

单个消息大小:1KB以内。

使用情况描述

rabbitmq:3.7.17, erlang:22.0.7

采用spirng-amqp包进行封装与rabbitmq的交互,版本为1.7.3.RELEASE

使用Spring-amqp的CachingConnectionFactory缓存连接工厂

调整生产者的cache-mode为Connection模式(原先为CHANNEL)

调整生产者的核心连接数量为24个,最大连接数量为48个

设置生产者连接的channel.checkout-timeout为3秒(该参数的旨意是从CachingConnectionFactory获取connection/channel的等待时间,CachingConnectionFactory实际就是个connection或者channel的缓存池子)

排查一: rabbitmq的per-connection连接流控引起的客户端消息发送卡顿

由于通过日志已经能够确定是消息发送这一步产生了卡顿,那初步怀疑就是由于rabbitmq的流控机制引起的卡顿,但根据监控,当时卡顿时间内,rabbitmq的所有connection/channel的状态都是正常的,没有存在flow/block状态的情况。(rabbitmq的监控API:{rabbitmq-admin管理后台域名}/api/index.html)

所以排除是rabbitmq服务器的流控引起的

排查二: 应用客户端自己的问题

由于出现问题是某次spring-amqp的缓存模式cache-mode从channel改为了connection(PS:此处调整,是因为channel模式为单连接,所有的MQ消息,无论各种业务场景都是共用了这个连接,之前发生了一次per-connection的流控,导致这个连接下的所有消息都发不出去,为了降低业务影响,将单连接改造为多连接模式)

所以把目光转向了spring-amqp的使用上,经过代码排查,果然是spring-amqp的一个BUG。。

在使用CachingConnectionFactory的createConnection()创建rabbitmq连接的时候,会存在一个同步锁(由于该工厂就是个缓存池,所以需要同步锁来确保缓存的个数不超过配置的MAX个数),知道连接创建完毕后,才会释放该锁,那么如果存在某个连接创建速度过慢,就会导致后续的连接创建时间也卡顿,都在等待该同步锁的释放

如果cache-mode是connection模式,根据源码展示的,只要从连接缓存池没有拿到一个空闲的connection,就直接等待一个checkout-timeout的时间(配置的是3秒)。这就代表,每一个连接的创建,都必须等待3秒钟。。这就解释了为啥线上的卡顿基本都是3、6、9秒左右。

修复代码也很简单:

但升级spring-amqp包到已修复的版本是2.1.x 中间版本跨度太大, 且2.1.x需要依赖spring 5,我们spring的基础版本还是4.x,贸然升级风险太大

解决

结合业务场景,此处MQ的消息场景是用于将交易请求做异步化处理,达到一个削峰填谷的目的。 改造为多连接模式,是由于之前单连接模式发生了流控后,导致使用该连接的所有业务场景均block。虽然改为了多连接,但实际上还是存在各个业务共享某个连接的情况,隔离性还不是特别高,应该是按重要程度进行分级隔离使用各自的连接。

经过考量,决定使用Spring提供的RoutingConnectionFactory,通过将业务场景码来分隔实际的ConnectionFactory(实际还是CachingConectionFactory,采用单连接channel模式),来达到业务场景隔离连接。

PS:连接并不是越多越好,我们此处只是将业务高等级的区分各个连接,其余实时性要求不是那么高的,单独在共享使用另外一个连接

RoutingConnectionFactory,使用此方式,可根据Expression灵活将连接(connection)分为多个,并根据自己场景定制(例如生产和消费连接分离,生产,消费者又按照不同维度分多个连接),此时,ConnectionFactory的实现仍然使用CachingConnectionFactory的channel模式。这样,可以形成不同场景使用不同的连接(避免相互影响),同一连接又可为多个channel共享(提高性能)

附上CachingConnectionFactory的createConnection()方法源码

org.springframework.amqp.rabbit.connection.CachingConnectionFactory

@Override

public final Connection createConnection() throws AmqpException {

Assert.state(!this.stopped, "The ApplicationContext is closed and the ConnectionFactory can no longer create connections.");

synchronized (this.connectionMonitor) {

if (this.cacheMode == CacheMode.CHANNEL) {

if (this.connection.target == null) {

this.connection.target = super.createBareConnection();

// invoke the listener *after* this.connection is assigned

if (!this.checkoutPermits.containsKey(this.connection)) {

this.checkoutPermits.put(this.connection, new Semaphore(this.channelCacheSize));

}

this.connection.closeNotified.set(false);

getConnectionListener().onCreate(this.connection);

}

return this.connection;

}

else if (this.cacheMode == CacheMode.CONNECTION) {

ChannelCachingConnectionProxy connection = findIdleConnection();

long now = System.currentTimeMillis();

while (connection == null && System.currentTimeMillis() - now

if (countOpenConnections() >= this.connectionLimit) {

try {

this.connectionMonitor.wait(this.channelCheckoutTimeout);

connection = findIdleConnection();

}

catch (InterruptedException e) {

Thread.currentThread().interrupt();

throw new AmqpException("Interrupted while waiting for a connection", e);

}

}

}

if (connection == null) {

if (countOpenConnections() >= this.connectionLimit

&& System.currentTimeMillis() - now >= this.channelCheckoutTimeout) {

throw new AmqpTimeoutException("Timed out attempting to get a connection");

}

connection = new ChannelCachingConnectionProxy(super.createBareConnection());

if (logger.isDebugEnabled()) {

logger.debug("Adding new connection '" + connection + "'");

}

this.allocatedConnections.add(connection);

this.allocatedConnectionNonTransactionalChannels.put(connection, new LinkedList());

this.channelHighWaterMarks.put(ObjectUtils.getIdentityHexString(

this.allocatedConnectionNonTransactionalChannels.get(connection)), new AtomicInteger());

this.allocatedConnectionTransactionalChannels.put(connection, new LinkedList());

this.channelHighWaterMarks.put(

ObjectUtils.getIdentityHexString(this.allocatedConnectionTransactionalChannels.get(connection)),

new AtomicInteger());

this.checkoutPermits.put(connection, new Semaphore(this.channelCacheSize));

getConnectionListener().onCreate(connection);

}

else if (!connection.isOpen()) {

try {

refreshProxyConnection(connection);

}

catch (Exception e) {

this.idleConnections.addLast(connection);

}

}

else {

if (logger.isDebugEnabled()) {

logger.debug("Obtained connection '" + connection + "' from cache");

}

}

return connection;

}

}

return null;

}



推荐阅读
  • 解决Sharepoint 2013运行状况分析出现的“一个或多个服务器未响应”问题的方法
    本文介绍了解决Sharepoint 2013运行状况分析中出现的“一个或多个服务器未响应”问题的方法。对于有高要求的客户来说,系统检测问题的存在是不可接受的。文章详细描述了解决该问题的步骤,包括删除服务器、处理分布式缓存留下的记录以及使用代码等方法。同时还提供了相关关键词和错误提示信息,以帮助读者更好地理解和解决该问题。 ... [详细]
  • Spring框架《一》简介
    Spring框架《一》1.Spring概述1.1简介1.2Spring模板二、IOC容器和Bean1.IOC和DI简介2.三种通过类型获取bean3.给bean的属性赋值3.1依赖 ... [详细]
  • SpringBoot uri统一权限管理的实现方法及步骤详解
    本文详细介绍了SpringBoot中实现uri统一权限管理的方法,包括表结构定义、自动统计URI并自动删除脏数据、程序启动加载等步骤。通过该方法可以提高系统的安全性,实现对系统任意接口的权限拦截验证。 ... [详细]
  • Whatsthedifferencebetweento_aandto_ary?to_a和to_ary有什么区别? ... [详细]
  • 本文介绍了使用Spark实现低配版高斯朴素贝叶斯模型的原因和原理。随着数据量的增大,单机上运行高斯朴素贝叶斯模型会变得很慢,因此考虑使用Spark来加速运行。然而,Spark的MLlib并没有实现高斯朴素贝叶斯模型,因此需要自己动手实现。文章还介绍了朴素贝叶斯的原理和公式,并对具有多个特征和类别的模型进行了讨论。最后,作者总结了实现低配版高斯朴素贝叶斯模型的步骤。 ... [详细]
  • SpringBoot+SpringCache实现两级缓存(Redis+Caffeine)_java
    这篇文章主要介绍了SpringBoot+SpringCache实现两级缓存(Redis+Caffeine),文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价 ... [详细]
  • smarty(模板引擎,模板技术)使用smarty主要是为了实现逻辑和外在内容的分离;特点:1、速度快 ... [详细]
  • Yii数据库缓存实例分析【PHP】
    后端开发|php教程Yii,数据库,缓存后端开发-php教程源码zhijia,vscodec必备工具,ubuntu设置fat,tomcat链接被关闭,海淀爬虫,php5.6安装扩展 ... [详细]
  • 如何自行分析定位SAP BSP错误
    The“BSPtag”Imentionedintheblogtitlemeansforexamplethetagchtmlb:configCelleratorbelowwhichi ... [详细]
  • GetWindowLong函数
    今天在看一个代码里头写了GetWindowLong(hwnd,0),我当时就有点费解,靠,上网搜索函数原型说明,死活找不到第 ... [详细]
  • 计算机存储系统的层次结构及其优势
    本文介绍了计算机存储系统的层次结构,包括高速缓存、主存储器和辅助存储器三个层次。通过分层存储数据可以提高程序的执行效率。计算机存储系统的层次结构将各种不同存储容量、存取速度和价格的存储器有机组合成整体,形成可寻址存储空间比主存储器空间大得多的存储整体。由于辅助存储器容量大、价格低,使得整体存储系统的平均价格降低。同时,高速缓存的存取速度可以和CPU的工作速度相匹配,进一步提高程序执行效率。 ... [详细]
  • Java中包装类的设计原因以及操作方法
    本文主要介绍了Java中设计包装类的原因以及操作方法。在Java中,除了对象类型,还有八大基本类型,为了将基本类型转换成对象,Java引入了包装类。文章通过介绍包装类的定义和实现,解答了为什么需要包装类的问题,并提供了简单易用的操作方法。通过本文的学习,读者可以更好地理解和应用Java中的包装类。 ... [详细]
  • 在编写业务代码时,常常会遇到复杂的业务逻辑导致代码冗长混乱的情况。为了解决这个问题,可以利用中间件模式来简化代码逻辑。中间件模式可以帮助我们更好地设计架构和代码,提高代码质量。本文介绍了中间件模式的基本概念和用法。 ... [详细]
  • 一句话解决高并发的核心原则
    本文介绍了解决高并发的核心原则,即将用户访问请求尽量往前推,避免访问CDN、静态服务器、动态服务器、数据库和存储,从而实现高性能、高并发、高可扩展的网站架构。同时提到了Google的成功案例,以及适用于千万级别PV站和亿级PV网站的架构层次。 ... [详细]
  • express工程中的json调用方法
    本文介绍了在express工程中如何调用json数据,包括建立app.js文件、创建数据接口以及获取全部数据和typeid为1的数据的方法。 ... [详细]
author-avatar
夜翊灬瞳_398
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有