热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

从消费者看rebalance

kafkajava客户端发送请求,大量使用RequestFuture,因此先说明下该类。RequestFuture类的成员属性listeners是RequestFutureList

kafka java 客户端发送请求,大量使用 RequestFuture,因此先说明下该类。

RequestFuture 类的成员属性 listeners 是 RequestFutureListener 的集合,调用 complete 方法,会触发 listener 的 onSuccess 方法。

public void complete(T value) {
try {
if (value instanceof RuntimeException)
throw new IllegalArgumentException("The argument to complete can not be an instance of RuntimeException");
if (!result.compareAndSet(INCOMPLETE_SENTINEL, value))
throw new IllegalStateException("Invalid attempt to complete a request future which is already complete");
fireSuccess();
} finally {
completedLatch.countDown();
}
}
private void fireSuccess() {
T value = value();
while (true) {
RequestFutureListener listener = listeners.poll();
if (listener == null)
break;
listener.onSuccess(value);
}
}

值得关注的是 compose 和 chain 方法,这两个方法均是为当前 RequestFuture 添加 listener,listener 的 onSuccess 又是调用另一个 RequestFuture 的方法。

public RequestFuture compose(final RequestFutureAdapter adapter) {
// 创建新的 RequestFuture 对象
final RequestFuture adapted = new RequestFuture<>();
// 为旧的 RequestFuture 添加 listener
addListener(new RequestFutureListener() {
@Override
public void onSuccess(T value) {
adapter.onSuccess(value, adapted);
}
@Override
public void onFailure(RuntimeException e) {
adapter.onFailure(e, adapted);
}
});
// 返回新的 RequestFuture 对象
return adapted;
}
public void chain(final RequestFuture future) {
// 为当前 RequestFuture 添加 listener
addListener(new RequestFutureListener() {
@Override
public void onSuccess(T value) {
future.complete(value);
}
@Override
public void onFailure(RuntimeException e) {
future.raise(e);
}
});
}

rebalance 入口在 ConsumerCoordinator#poll

客户端判断是否需要重新加入组,即 rebalance

//ConsumerCoordinator#needRejoin
public boolean needRejoin() {
if (!subscriptions.partitionsAutoAssigned())
return false;
// 所订阅 topic 的分区数量发生变化
// we need to rejoin if we performed the assignment and metadata has changed
if (assignmentSnapshot != null && !assignmentSnapshot.equals(metadataSnapshot))
return true;
// 所订阅的 topic 发生变化
// we need to join if our subscription has changed since the last join
if (joinedSubscription != null && !joinedSubscription.equals(subscriptions.subscription()))
return true;
// 消费者加入组,或退出组,由心跳线程设置 rejoinNeeded = true
return super.needRejoin();
}

消费者开始 rebalance

// AbstractCoordinator#joinGroupIfNeeded
void joinGroupIfNeeded() {
while (needRejoin() || rejoinIncomplete()) {
ensureCoordinatorReady();
if (needsJoinPrepare) {
// 调用用户传入的 ConsumerRebalanceListener
onJoinPrepare(generation.generationId, generation.memberId);
needsJoinPrepare = false;
}
// 发送 join group 的请求
RequestFuture future = initiateJoinGroup();
client.poll(future);
if (future.succeeded()) {
onJoinComplete(generation.generationId, generation.memberId, generation.protocol, future.value());
resetJoinGroupFuture();
needsJoinPrepare = true;
} else {
resetJoinGroupFuture();
RuntimeException exception = future.exception();
if (exception instanceof UnknownMemberIdException ||
exception instanceof RebalanceInProgressException ||
exception instanceof IllegalGenerationException)
continue;
else if (!future.isRetriable())
throw exception;
time.sleep(retryBackoffMs);
}
}
}

AbstractCoordinator#initiateJoinGroup

private synchronized RequestFuture initiateJoinGroup() {
if (joinFuture == null) {
disableHeartbeatThread();
state = MemberState.REBALANCING;
joinFuture = sendJoinGroupRequest();
joinFuture.addListener(new RequestFutureListener() {
@Override
public void onSuccess(ByteBuffer value) {
// handle join completion in the callback so that the callback will be invoked
// even if the consumer is woken up before finishing the rebalance
synchronized (AbstractCoordinator.this) {
log.info("Successfully joined group with generation {}", generation.generationId);
state = MemberState.STABLE;
rejoinNeeded = false;
if (heartbeatThread != null)
heartbeatThread.enable();
}
}
@Override
public void onFailure(RuntimeException e) {
// we handle failures below after the request finishes. if the join completes
// after having been woken up, the exception is ignored and we will rejoin
synchronized (AbstractCoordinator.this) {
state = MemberState.UNJOINED;
}
}
});
}
return joinFuture;
}

AbstractCoordinator#sendJoinGroupRequest

private RequestFuture sendJoinGroupRequest() {
if (coordinatorUnknown())
return RequestFuture.coordinatorNotAvailable();
// send a join group request to the coordinator
log.info("(Re-)joining group");
JoinGroupRequest.Builder requestBuilder = new JoinGroupRequest.Builder(
groupId,
this.sessionTimeoutMs,
this.generation.memberId,
protocolType(),
metadata()).setRebalanceTimeout(this.rebalanceTimeoutMs);
log.debug("Sending JoinGroup ({}) to coordinator {}", requestBuilder, this.coordinator);
return client.send(coordinator, requestBuilder)
.compose(new JoinGroupResponseHandler());
}

重点关注 client.send(coordinator, requestBuilder).compose(new JoinGroupResponseHandler());
为老的 RequestFuture 添加 listener,返回新的 RequestFuture

ConsumerNetworkClient#send

public RequestFuture send(Node node, AbstractRequest.Builder requestBuilder) {
long now = time.milliseconds();
// 使用 RequestFutureCompletionHandler 作为回调函数
RequestFutureCompletionHandler completiOnHandler= new RequestFutureCompletionHandler();
ClientRequest clientRequest = client.newClientRequest(node.idString(), requestBuilder, now, true,
completionHandler);
unsent.put(node, clientRequest);
// wakeup the client in case it is blocking in poll so that we can send the queued request
client.wakeup();
return completionHandler.future;
}

JoinGroupResponseHandler#handle

public void handle(JoinGroupResponse joinResponse, RequestFuture future) {
Errors error = joinResponse.error();
if (error == Errors.NONE) {
log.debug("Received successful JoinGroup response: {}", joinResponse);
sensors.joinLatency.record(response.requestLatencyMs());
synchronized (AbstractCoordinator.this) {
if (state != MemberState.REBALANCING) {
// if the consumer was woken up before a rebalance completes, we may have already left
// the group. In this case, we do not want to continue with the sync group.
future.raise(new UnjoinedGroupException());
} else {
AbstractCoordinator.this.generation = new Generation(joinResponse.generationId(),
joinResponse.memberId(), joinResponse.groupProtocol());
if (joinResponse.isLeader()) {
onJoinLeader(joinResponse).chain(future);
} else {
onJoinFollower().chain(future);
}
}
}
} else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS) {
log.debug("Attempt to join group rejected since coordinator {} is loading the group.", coordinator());
// backoff and retry
future.raise(error);
} else if (error == Errors.UNKNOWN_MEMBER_ID) {
// reset the member id and retry immediately
resetGeneration();
log.debug("Attempt to join group failed due to unknown member id.");
future.raise(Errors.UNKNOWN_MEMBER_ID);
} else if (error == Errors.COORDINATOR_NOT_AVAILABLE
|| error == Errors.NOT_COORDINATOR) {
// re-discover the coordinator and retry with backoff
markCoordinatorUnknown();
log.debug("Attempt to join group failed due to obsolete coordinator information: {}", error.message());
future.raise(error);
} else if (error == Errors.INCONSISTENT_GROUP_PROTOCOL
|| error == Errors.INVALID_SESSION_TIMEOUT
|| error == Errors.INVALID_GROUP_ID) {
// log the error and re-throw the exception
log.error("Attempt to join group failed due to fatal error: {}", error.message());
future.raise(error);
} else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
future.raise(new GroupAuthorizationException(groupId));
} else {
// unexpected error, throw the exception
future.raise(new KafkaException("Unexpected error in join group response: " + error.message()));
}
}

收到响应后,最终的执行流是 RequestFutureCompletionHandler -> JoinGroupResponseHandler#handle

private RequestFuture onJoinLeader(JoinGroupResponse joinResponse) {
try {
// perform the leader synchronization and send back the assignment for the group
Map groupAssignment = performAssignment(joinResponse.leaderId(), joinResponse.groupProtocol(),
joinResponse.members());
SyncGroupRequest.Builder requestBuilder =
new SyncGroupRequest.Builder(groupId, generation.generationId, generation.memberId, groupAssignment);
log.debug("Sending leader SyncGroup to coordinator {}: {}", this.coordinator, requestBuilder);
return sendSyncGroupRequest(requestBuilder);
} catch (RuntimeException e) {
return RequestFuture.failure(e);
}
}
private RequestFuture onJoinFollower() {
// send follower's sync group with an empty assignment
SyncGroupRequest.Builder requestBuilder =
new SyncGroupRequest.Builder(groupId, generation.generationId, generation.memberId,
Collections.emptyMap());
log.debug("Sending follower SyncGroup to coordinator {}: {}", this.coordinator, requestBuilder);
return sendSyncGroupRequest(requestBuilder);
}
private RequestFuture sendSyncGroupRequest(SyncGroupRequest.Builder requestBuilder) {
if (coordinatorUnknown())
return RequestFuture.coordinatorNotAvailable();
return client.send(coordinator, requestBuilder)
.compose(new SyncGroupResponseHandler());
}

用 RequestFuture 把 JoinGroupResponseHandler 和 SyncGroupResponseHandler 串联起来了

private class SyncGroupResponseHandler extends CoordinatorResponseHandler {
@Override
public void handle(SyncGroupResponse syncResponse,
RequestFuture future) {
Errors error = syncResponse.error();
if (error == Errors.NONE) {
sensors.syncLatency.record(response.requestLatencyMs());
future.complete(syncResponse.memberAssignment());
} else {
requestRejoin();
if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
future.raise(new GroupAuthorizationException(groupId));
} else if (error == Errors.REBALANCE_IN_PROGRESS) {
log.debug("SyncGroup failed because the group began another rebalance");
future.raise(error);
} else if (error == Errors.UNKNOWN_MEMBER_ID
|| error == Errors.ILLEGAL_GENERATION) {
log.debug("SyncGroup failed: {}", error.message());
resetGeneration();
future.raise(error);
} else if (error == Errors.COORDINATOR_NOT_AVAILABLE
|| error == Errors.NOT_COORDINATOR) {
log.debug("SyncGroup failed: {}", error.message());
markCoordinatorUnknown();
future.raise(error);
} else {
future.raise(new KafkaException("Unexpected error from SyncGroup: " + error.message()));
}
}
}
}

 rebalance 过程最后的 listener

joinFuture.addListener(new RequestFutureListener() {
@Override
public void onSuccess(ByteBuffer value) {
// handle join completion in the callback so that the callback will be invoked
// even if the consumer is woken up before finishing the rebalance
synchronized (AbstractCoordinator.this) {
log.info("Successfully joined group with generation {}", generation.generationId);
state = MemberState.STABLE;
rejoinNeeded = false;
if (heartbeatThread != null)
heartbeatThread.enable();
}
}
@Override
public void onFailure(RuntimeException e) {
// we handle failures below after the request finishes. if the join completes
// after having been woken up, the exception is ignored and we will rejoin
synchronized (AbstractCoordinator.this) {
state = MemberState.UNJOINED;
}
}
});

 



推荐阅读
  • Java Socket 关键参数详解与优化建议
    Java Socket 的 API 虽然被广泛使用,但其关键参数的用途却鲜为人知。本文详细解析了 Java Socket 中的重要参数,如 backlog 参数,它用于控制服务器等待连接请求的队列长度。此外,还探讨了其他参数如 SO_TIMEOUT、SO_REUSEADDR 等的配置方法及其对性能的影响,并提供了优化建议,帮助开发者提升网络通信的稳定性和效率。 ... [详细]
  • Netty框架中运用Protobuf实现高效通信协议
    在Netty框架中,通过引入Protobuf来实现高效的通信协议。为了使用Protobuf,需要先准备好环境,包括下载并安装Protobuf的代码生成器`protoc`以及相应的源码包。具体资源可从官方下载页面获取,确保版本兼容性以充分发挥其性能优势。此外,配置好开发环境后,可以通过定义`.proto`文件来自动生成Java类,从而简化数据序列化和反序列化的操作,提高通信效率。 ... [详细]
  • 本文详细介绍了在 CentOS 7 系统中配置 fstab 文件以实现开机自动挂载 NFS 共享目录的方法,并解决了常见的配置失败问题。 ... [详细]
  • com.sun.javadoc.PackageDoc.exceptions()方法的使用及代码示例 ... [详细]
  • 在JavaWeb开发中,文件上传是一个常见的需求。无论是通过表单还是其他方式上传文件,都必须使用POST请求。前端部分通常采用HTML表单来实现文件选择和提交功能。后端则利用Apache Commons FileUpload库来处理上传的文件,该库提供了强大的文件解析和存储能力,能够高效地处理各种文件类型。此外,为了提高系统的安全性和稳定性,还需要对上传文件的大小、格式等进行严格的校验和限制。 ... [详细]
  • MySQL Decimal 类型的最大值解析及其在数据处理中的应用艺术
    在关系型数据库中,表的设计与SQL语句的编写对性能的影响至关重要,甚至可占到90%以上。本文将重点探讨MySQL中Decimal类型的最大值及其在数据处理中的应用技巧,通过实例分析和优化建议,帮助读者深入理解并掌握这一重要知识点。 ... [详细]
  • 在Cisco IOS XR系统中,存在提供服务的服务器和使用这些服务的客户端。本文深入探讨了进程与线程状态转换机制,分析了其在系统性能优化中的关键作用,并提出了改进措施,以提高系统的响应速度和资源利用率。通过详细研究状态转换的各个环节,本文为开发人员和系统管理员提供了实用的指导,旨在提升整体系统效率和稳定性。 ... [详细]
  • Python 伦理黑客技术:深入探讨后门攻击(第三部分)
    在《Python 伦理黑客技术:深入探讨后门攻击(第三部分)》中,作者详细分析了后门攻击中的Socket问题。由于TCP协议基于流,难以确定消息批次的结束点,这给后门攻击的实现带来了挑战。为了解决这一问题,文章提出了一系列有效的技术方案,包括使用特定的分隔符和长度前缀,以确保数据包的准确传输和解析。这些方法不仅提高了攻击的隐蔽性和可靠性,还为安全研究人员提供了宝贵的参考。 ... [详细]
  • 本文详细介绍了一种利用 ESP8266 01S 模块构建 Web 服务器的成功实践方案。通过具体的代码示例和详细的步骤说明,帮助读者快速掌握该模块的使用方法。在疫情期间,作者重新审视并研究了这一未被充分利用的模块,最终成功实现了 Web 服务器的功能。本文不仅提供了完整的代码实现,还涵盖了调试过程中遇到的常见问题及其解决方法,为初学者提供了宝贵的参考。 ... [详细]
  • 本文介绍了如何利用ObjectMapper实现JSON与JavaBean之间的高效转换。ObjectMapper是Jackson库的核心组件,能够便捷地将Java对象序列化为JSON格式,并支持从JSON、XML以及文件等多种数据源反序列化为Java对象。此外,还探讨了在实际应用中如何优化转换性能,以提升系统整体效率。 ... [详细]
  • 尽管我们尽最大努力,任何软件开发过程中都难免会出现缺陷。为了更有效地提升对支持部门的协助与支撑,本文探讨了多种策略和最佳实践,旨在通过改进沟通、增强培训和支持流程来减少这些缺陷的影响,并提高整体服务质量和客户满意度。 ... [详细]
  • 投融资周报 | Circle 达成 4 亿美元融资协议,唯一艺术平台 A 轮融资超千万美元 ... [详细]
  • 如何高效启动大数据应用之旅?
    在前一篇文章中,我探讨了大数据的定义及其与数据挖掘的区别。本文将重点介绍如何高效启动大数据应用项目,涵盖关键步骤和最佳实践,帮助读者快速踏上大数据之旅。 ... [详细]
  • 本文介绍如何通过 Python 的 `unittest` 和 `functools` 模块封装一个依赖方法,用于管理测试用例之间的依赖关系。该方法能够确保在某个测试用例失败时,依赖于它的其他测试用例将被跳过。 ... [详细]
  • 在C#中开发MP3播放器时,我正在考虑如何高效存储元数据以便快速检索。选择合适的数据结构,如字典或数组,对于优化性能至关重要。字典能够提供快速的键值对查找,而数组则在连续存储和遍历方面表现优异。根据具体需求,合理选择数据结构将显著提升应用的响应速度和用户体验。 ... [详细]
author-avatar
飘泊的牛小盆友
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有