消息中间件是每个做后台同学必须要掌握的一类框架,这主要取决于其广泛应用于互联网项目。消息中间件在这些系统中扮演着很重要的角色,它们的主要作用是消息异步,系统解耦,高并发削峰,分布式事务等等。目前主要的消息中间件有rabbitMQ、kafka、rocketMQ、ActiveMQ等,本系列文章总结的是kafka,也算是当前市面上比较流行的消息中间件,后续的文章会从kafka的生产者、消费者、broker等来总结。除了在实际应用中,消息中间件是一个常用的框架,在面试中,消息中间件也是必问内容。由于个人能力有限,文中难免有理解不到位的地方,还请留言指导,在此谢过。本系列文章kafka版本使用最新的2.8.0。
HeartbeatThread分析
Coordinator中最核心的内容是之前提到的重平衡过程,如果对之前的文章还有印象的话可以知道,重平衡发生的原因之一是心跳超时。消费者心跳的作用是协调者用来检查该消费者是否在正常,如果不正常的话,可以立马替换消费者来消费该分区。那么本文主要介绍消费者心跳线程(HeartbeatThread)的执行逻辑和相关源码操作。
启动心跳线程
HeartbeatThread是AbstractCoordinator的一个内部类,继承了kafkaThread,所以本质上是一个线程。从之前的文章中我们有提到,HeartbeatThread的创建是在第一次poll的时候创建的,并不是在消费者初始化的时候创建,这是因为消费者需要找到对应的协调者才能发送心跳线程,创建函数的调用方是AbstractCoordinator类中的ensureActiveGroup函数。
boolean ensureActiveGroup(final Timer timer) {
//...
//启动心跳
startHeartbeatThreadIfNeeded();
return joinGroupIfNeeded(timer);
}
//如果不存在心跳,创建。所以这里是加锁进行判断的,保证线程安全
private synchronized void startHeartbeatThreadIfNeeded() {
if (heartbeatThread == null) {
heartbeatThread = new HeartbeatThread();
heartbeatThread.start();
}
}
心跳逻辑
心跳线程启动之后,在每次消费者poll的时候,就会唤醒心跳线程进行心跳的发送,调用函数是pollHeartbeat。心跳本身是一个线程,其核心在run函数的实现。现在我们看下这个类的实现。
HeartbeatThread类有三个成员变量,
private class HeartbeatThread extends KafkaThread implements AutoCloseable {
//是否允许心跳标志位,写入要加锁
private boolean enabled = false;
//是否关闭标志位,写入要加锁
private boolean closed = false;
//心跳失败的原因,初始化为null,使用原子整型
private final AtomicReference failed = new AtomicReference<>(null);
}
了解完这几个变量的作用之后,我们看下run的实现逻辑:
从上面的逻辑中可以看到,主要的实现是判断心跳异常处理,如果不是关闭的话,都会重新进入循环体,只有在关闭的情况下才会退出循环。下面看下具体的代码实现。
public void run() {
//...
//主循环体,心跳线程是一个死循环
while (true) {
synchronized (AbstractCoordinator.this) {
//是否关闭
if (closed)
return;
//是否禁止
if (!enabled) {
AbstractCoordinator.this.wait();
continue;
}
//状态不对或者心跳失败
if (state.hasNotJoinedGroup() || hasFailed()) {
disable();
continue;
}
//唤醒客服端
client.pollNoWakeup();
long now = time.milliseconds();
//协调者未知
if (coordinatorUnknown()) {
if (findCoordinatorFuture != null) {
clearFindCoordinatorFuture();
AbstractCoordinator.this.wait(rebalanceConfig.retryBackoffMs);
} else {
lookupCoordinator();
}
//获取服务端心跳响应超时,置为协调者未知,等待下一轮
} else if (heartbeat.sessionTimeoutExpired(now)) {
markCoordinatorUnknown("session timed out without receiving a "
+ "heartbeat response");
//心跳超时,处理上次消息的时间过长,导致长时间没有poll
} else if (heartbeat.pollTimeoutExpired(now)) {
//发送离开请求,这个会导致reblance
maybeLeaveGroup(leaveReason);
//是否到了心跳时间
} else if (!heartbeat.shouldHeartbeat(now)) {
//阻塞到下一次重试
AbstractCoordinator.this.wait(rebalanceConfig.retryBackoffMs);
} else {
//设置心跳的时间为当前时间,这个是更新本地心跳的时间
heartbeat.sentHeartbeat(now);
//发送心跳请求并获取异步future对象
final RequestFuture<Void> heartbeatFuture = sendHeartbeatRequest();
//添加监听器
heartbeatFuture.addListener(new RequestFutureListener<Void>() {
@Override
//心跳成功
public void onSuccess(Void value) {
synchronized (AbstractCoordinator.this) {
heartbeat.receiveHeartbeat();
}
}
@Override
public void onFailure(RuntimeException e) {
synchronized (AbstractCoordinator.this) {
//正在reblance当做正常响应
if (e instanceof RebalanceInProgressException) {
heartbeat.receiveHeartbeat();
} else if (e instanceof FencedInstanceIdException) {
heartbeatThread.failed.set(e);
} else {
heartbeat.failHeartbeat();
// 唤醒线程,等待下一轮
AbstractCoordinator.this.notify();
}
}
}
});
}
}
}
}
//...
//异常处理逻辑
}
心跳发送和响应
在上面的心跳逻辑中,心跳的发送函数是sendHeartbeatRequest,在这里面会组装心跳消息并发送心跳,然后组装响应的结果。下面看下这段逻辑的实现:
sendHeartbeatRequest:发送消息
synchronized RequestFuture sendHeartbeatRequest() {
//组装心跳消息,包含消费者组信息,消费者信息
HeartbeatRequest.Builder requestBuilder =
new HeartbeatRequest.Builder(new HeartbeatRequestData()
.setGroupId(rebalanceConfig.groupId)
.setMemberId(this.generation.memberId)
.setGroupInstanceId(this.rebalanceConfig.groupInstanceId.orElse(null))
.setGenerationId(this.generation.generationId));
//发送消息
return client.send(coordinator, requestBuilder)
.compose(new HeartbeatResponseHandler(generation));
}
处理结果是用一个新的类HeartbeatResponseHandler来实现,现在可以看下这个类的handle方法:
public void handle(HeartbeatResponse heartbeatResponse, RequestFuture future) {
//处理的异常
Errors error = heartbeatResponse.error();
//没有异常,future成功返回
if (error == Errors.NONE) {
future.complete(null);
//协调者未知
} else if (error == Errors.COORDINATOR_NOT_AVAILABLE
|| error == Errors.NOT_COORDINATOR) {
markCoordinatorUnknown(error);
future.raise(error);
//正在reblance
} else if (error == Errors.REBALANCE_IN_PROGRESS) {
if (state == MemberState.STABLE) {
//如果是稳定状态,请求重新加入
requestRejoin();
future.raise(error);
} else {
//如果不是stable状态,忽视
future.complete(null);
}
//leader版本已经替换
} else if (error == Errors.ILLEGAL_GENERATION ||
error == Errors.UNKNOWN_MEMBER_ID ||
error == Errors.FENCED_INSTANCE_ID) {
if (generationUnchanged()) {
resetGenerationOnResponseError(ApiKeys.HEARTBEAT, error);
future.raise(error);
} else {
future.complete(null);
}
//没权限
} else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
future.raise(GroupAuthorizationException.forGroupId(rebalanceConfig.groupId));
} else {
//其他异常
future.raise(new KafkaException("Unexpected error in heartbeat response: " + error.message()));
}
}
在这里本身就是一个对异常的处理类,服务器返回的不同异常要分类返回给调用者,调用者根据不同的异常类型判断是否重试等操作。在实际开发项目中,也是要求对所有的异常要进行分类,这样方便针对不同的异常做不同的逻辑处理。
本文主要介绍了消费者源码中重要的一个环境--心跳线程。心跳是kafka用来管理消费者是否状态ok的情况。在这里一个很重要的问题就是如果我们拉取的消息过多,导致处理消息的时间过长,那消费者就会过久时间没有进行poll,这样就会导致上面的消费者离开集群并发生reblance的过程。这个在实际中还是有可能会出现的,出现这种情况的话,一个可能是消息有问题,处理的线程挂了,这种情况下我们需要将这条消息直接过滤掉;另一个就是消息确实需要处理很久的时间,这种情况下,我们需要设置更长的消息处理时间保证心跳check是ok的,如果最长的时间还是不够的话,我们可能需要用一个异步线程进行消息处理。心跳是一个很重要的内容,我们需要重点了解,不然在实际中可能会出现较多棘手的问题。
本文的内容就这么多,如果你觉得对你的学习和面试有些帮助,帮忙点个赞或者转发一下哈,谢谢。