问题描述
线上系统出现SLOW慢请求告警,经排查,发现为rabbitmq的消息发送卡顿引起,卡顿时间几秒钟~几十秒不等。由于只卡顿了1分钟左右,笔数不多(30笔以内),且后续就没了,所以只是排查了下系统各项指标(JVM、mq状态、磁盘等),都正常。就没有继续深入了排查了。 但过了一段时间,又出现问题了~~(果然每个线上问题都要仔细追踪- -#)
发生频率:每次系统重启后,都会出现一次,且每个实例出现一次后,后续就不在出现了,直到下一次系统实例重启。 在出现卡顿时,当前实例仍有其他的消息发送是正常的。
单个消息大小:1KB以内。
使用情况描述
rabbitmq:3.7.17, erlang:22.0.7
采用spirng-amqp包进行封装与rabbitmq的交互,版本为1.7.3.RELEASE
使用Spring-amqp的CachingConnectionFactory缓存连接工厂
调整生产者的cache-mode为Connection模式(原先为CHANNEL)
调整生产者的核心连接数量为24个,最大连接数量为48个
设置生产者连接的channel.checkout-timeout为3秒(该参数的旨意是从CachingConnectionFactory获取connection/channel的等待时间,CachingConnectionFactory实际就是个connection或者channel的缓存池子)
排查一: rabbitmq的per-connection连接流控引起的客户端消息发送卡顿
由于通过日志已经能够确定是消息发送这一步产生了卡顿,那初步怀疑就是由于rabbitmq的流控机制引起的卡顿,但根据监控,当时卡顿时间内,rabbitmq的所有connection/channel的状态都是正常的,没有存在flow/block状态的情况。(rabbitmq的监控API:{rabbitmq-admin管理后台域名}/api/index.html)
所以排除是rabbitmq服务器的流控引起的
排查二: 应用客户端自己的问题
由于出现问题是某次spring-amqp的缓存模式cache-mode从channel改为了connection(PS:此处调整,是因为channel模式为单连接,所有的MQ消息,无论各种业务场景都是共用了这个连接,之前发生了一次per-connection的流控,导致这个连接下的所有消息都发不出去,为了降低业务影响,将单连接改造为多连接模式)
所以把目光转向了spring-amqp的使用上,经过代码排查,果然是spring-amqp的一个BUG。。
在使用CachingConnectionFactory的createConnection()创建rabbitmq连接的时候,会存在一个同步锁(由于该工厂就是个缓存池,所以需要同步锁来确保缓存的个数不超过配置的MAX个数),知道连接创建完毕后,才会释放该锁,那么如果存在某个连接创建速度过慢,就会导致后续的连接创建时间也卡顿,都在等待该同步锁的释放
如果cache-mode是connection模式,根据源码展示的,只要从连接缓存池没有拿到一个空闲的connection,就直接等待一个checkout-timeout的时间(配置的是3秒)。这就代表,每一个连接的创建,都必须等待3秒钟。。这就解释了为啥线上的卡顿基本都是3、6、9秒左右。
修复代码也很简单:
但升级spring-amqp包到已修复的版本是2.1.x 中间版本跨度太大, 且2.1.x需要依赖spring 5,我们spring的基础版本还是4.x,贸然升级风险太大
解决
结合业务场景,此处MQ的消息场景是用于将交易请求做异步化处理,达到一个削峰填谷的目的。 改造为多连接模式,是由于之前单连接模式发生了流控后,导致使用该连接的所有业务场景均block。虽然改为了多连接,但实际上还是存在各个业务共享某个连接的情况,隔离性还不是特别高,应该是按重要程度进行分级隔离使用各自的连接。
经过考量,决定使用Spring提供的RoutingConnectionFactory,通过将业务场景码来分隔实际的ConnectionFactory(实际还是CachingConectionFactory,采用单连接channel模式),来达到业务场景隔离连接。
PS:连接并不是越多越好,我们此处只是将业务高等级的区分各个连接,其余实时性要求不是那么高的,单独在共享使用另外一个连接
RoutingConnectionFactory,使用此方式,可根据Expression灵活将连接(connection)分为多个,并根据自己场景定制(例如生产和消费连接分离,生产,消费者又按照不同维度分多个连接),此时,ConnectionFactory的实现仍然使用CachingConnectionFactory的channel模式。这样,可以形成不同场景使用不同的连接(避免相互影响),同一连接又可为多个channel共享(提高性能)
附上CachingConnectionFactory的createConnection()方法源码
org.springframework.amqp.rabbit.connection.CachingConnectionFactory
@Override
public final Connection createConnection() throws AmqpException {
Assert.state(!this.stopped, "The ApplicationContext is closed and the ConnectionFactory can no longer create connections.");
synchronized (this.connectionMonitor) {
if (this.cacheMode == CacheMode.CHANNEL) {
if (this.connection.target == null) {
this.connection.target = super.createBareConnection();
// invoke the listener *after* this.connection is assigned
if (!this.checkoutPermits.containsKey(this.connection)) {
this.checkoutPermits.put(this.connection, new Semaphore(this.channelCacheSize));
}
this.connection.closeNotified.set(false);
getConnectionListener().onCreate(this.connection);
}
return this.connection;
}
else if (this.cacheMode == CacheMode.CONNECTION) {
ChannelCachingConnectionProxy connection = findIdleConnection();
long now = System.currentTimeMillis();
while (connection == null && System.currentTimeMillis() - now if (countOpenConnections() >= this.connectionLimit) {
try {
this.connectionMonitor.wait(this.channelCheckoutTimeout);
connection = findIdleConnection();
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new AmqpException("Interrupted while waiting for a connection", e);
}
}
}
if (connection == null) {
if (countOpenConnections() >= this.connectionLimit
&& System.currentTimeMillis() - now >= this.channelCheckoutTimeout) {
throw new AmqpTimeoutException("Timed out attempting to get a connection");
}
connection = new ChannelCachingConnectionProxy(super.createBareConnection());
if (logger.isDebugEnabled()) {
logger.debug("Adding new connection '" + connection + "'");
}
this.allocatedConnections.add(connection);
this.allocatedConnectionNonTransactionalChannels.put(connection, new LinkedList());
this.channelHighWaterMarks.put(ObjectUtils.getIdentityHexString(
this.allocatedConnectionNonTransactionalChannels.get(connection)), new AtomicInteger());
this.allocatedConnectionTransactionalChannels.put(connection, new LinkedList());
this.channelHighWaterMarks.put(
ObjectUtils.getIdentityHexString(this.allocatedConnectionTransactionalChannels.get(connection)),
new AtomicInteger());
this.checkoutPermits.put(connection, new Semaphore(this.channelCacheSize));
getConnectionListener().onCreate(connection);
}
else if (!connection.isOpen()) {
try {
refreshProxyConnection(connection);
}
catch (Exception e) {
this.idleConnections.addLast(connection);
}
}
else {
if (logger.isDebugEnabled()) {
logger.debug("Obtained connection '" + connection + "' from cache");
}
}
return connection;
}
}
return null;
}