热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

KafkaBroker源码解析二:API层设计

一、简介版本:1.1.1API层,是一个Facade模式,封装了Kafka所有功能对外提供服务,通过请求中的ApiKeys,进行请求分发,调用对应的API进行处理API层,创建了个

一、简介



  • 版本:1.1.1

  • API层,是一个Facade模式,封装了Kafka所有功能对外提供服务,通过请求中的ApiKeys,进行请求分发,调用对应的API进行处理

  • API层,创建了个线程用于进行逻辑处理、IO操作,所有的API行为均线程中完成


二、整体架构


2.1 核心逻辑


相对于网络层,API层的实现相当简单,只是对底层实现的封装,得益于此,从API层几乎就能了解到Kafka broker所能提供的全部功能




  1. 启动num.io.threads个RequestHandler

  2. 每个RequestHandler都从全局requestQueue中poll Request

  3. 根据Request的ApiKeys调用对应的handle*Api进行实际的业务逻辑处理


2.2 核心类、方法介绍

KafkaRequestHandler
|-- run()
KafkaApis
|-- handle() // 所有请求的入口,根据ApiKeys分发请求

三、核心流程分析


3.1 启动流程


启动流程前半部分和网络层一致,这里不再赘述


// KafkaServer.scala
def startup() {
// 初始化api层
apis = new KafkaApis(socketServer.requestChannel, replicaManager, adminManager, groupCoordinator, transactionCoordinator,
kafkaController, zkClient, config.brokerId, config, metadataCache, metrics, authorizer, quotaManagers,
fetchManager, brokerTopicStats, clusterId, time, tokenManager)
// 初始化IO线程池
requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, time,
config.numIoThreads)
}
// KafkaRequestHandler.scala
class KafkaRequestHandlerPool(val brokerId: Int,
val requestChannel: RequestChannel,
val apis: KafkaApis,
time: Time,
numThreads: Int) extends Logging with KafkaMetricsGroup {
val runnables = new mutable.ArrayBuffer[KafkaRequestHandler](numThreads)
for (i <- 0 until numThreads) {
createHandler(i)
}

def createHandler(id: Int): Unit = synchronized {
runnables += new KafkaRequestHandler(id, brokerId, aggregateIdleMeter, threadPoolSize, requestChannel, apis, time)
KafkaThread.daemon("kafka-request-handler-" + id, runnables(id)).start()
}
}

实际就是实例化API层的入口,再启动num.io.threads个RequestHandler线程,用于实际处理请求,值得注意的是这里的线程池,其实不是真正意义上的线程池,线程数目是固定的,只有通过动态参数改变线程池大小时,才会重新调整线程数目

def resizeThreadPool(newSize: Int): Unit = synchronized {
val currentSize = threadPoolSize.get
info(s"Resizing request handler thread pool size from $currentSize to $newSize")
if (newSize > currentSize) {
// 补上差额的线程
for (i <- currentSize until newSize) {
createHandler(i)
}
} else if (newSize for (i <- 1 to (currentSize - newSize)) {
// 关闭部分线程直到达到目标值
runnables.remove(currentSize - i).stop()
}
}
threadPoolSize.set(newSize)
}

3.2 请求分发流程


RequestHandler线程启动后,每个线程都一直从网络层获取Request再交给KafkaApis进行请求分发


// KafkaRequestHandler.scala
def run() {
while (!stopped) {
// 从全局的阻塞队列里面拉取请求(网络层处理完并反序列化后转换成request)
val req = requestChannel.receiveRequest(300)
req match {
// 关闭命令
case RequestChannel.ShutdownRequest =>
shutdownComplete.countDown()
return
// 正常请求
case request: RequestChannel.Request =>
// 请求路由分发
apis.handle(request)
case null => // continue
}
}
shutdownComplete.countDown()
}
// RequestChannel.scala
private val requestQueue = new ArrayBlockingQueue[BaseRequest](queueSize)
def receiveRequest(timeout: Long): RequestChannel.BaseRequest =
requestQueue.poll(timeout, TimeUnit.MILLISECONDS)

// KafkaApis.scala
def handle(request: RequestChannel.Request) {
// 只是根据request中的apiKey分发请求,实际处理逻辑由对应的handle*方法实现
request.header.apiKey match {
case ApiKeys.PRODUCE => handleProduceRequest(request)
case ApiKeys.FETCH => handleFetchRequest(request)
case ApiKeys.LIST_OFFSETS => handleListOffsetRequest(request)
case ApiKeys.METADATA => handleTopicMetadataRequest(request)
case ApiKeys.LEADER_AND_ISR => handleLeaderAndIsrRequest(request)
case ApiKeys.STOP_REPLICA => handleStopReplicaRequest(request)
case ApiKeys.UPDATE_METADATA => handleUpdateMetadataRequest(request)
case ApiKeys.CONTROLLED_SHUTDOWN => handleControlledShutdownRequest(request)
case ApiKeys.OFFSET_COMMIT => handleOffsetCommitRequest(request)
case ApiKeys.OFFSET_FETCH => handleOffsetFetchRequest(request)
case ApiKeys.FIND_COORDINATOR => handleFindCoordinatorRequest(request)
case ApiKeys.JOIN_GROUP => handleJoinGroupRequest(request)
case ApiKeys.HEARTBEAT => handleHeartbeatRequest(request)
case ApiKeys.LEAVE_GROUP => handleLeaveGroupRequest(request)
case ApiKeys.SYNC_GROUP => handleSyncGroupRequest(request)
case ApiKeys.DESCRIBE_GROUPS => handleDescribeGroupRequest(request)
case ApiKeys.LIST_GROUPS => handleListGroupsRequest(request)
case ApiKeys.SASL_HANDSHAKE => handleSaslHandshakeRequest(request)
case ApiKeys.API_VERSIOnS=> handleApiVersionsRequest(request)
case ApiKeys.CREATE_TOPICS => handleCreateTopicsRequest(request)
case ApiKeys.DELETE_TOPICS => handleDeleteTopicsRequest(request)
case ApiKeys.DELETE_RECORDS => handleDeleteRecordsRequest(request)
case ApiKeys.INIT_PRODUCER_ID => handleInitProducerIdRequest(request)
case ApiKeys.OFFSET_FOR_LEADER_EPOCH => handleOffsetForLeaderEpochRequest(request)
case ApiKeys.ADD_PARTITIONS_TO_TXN => handleAddPartitionToTxnRequest(request)
case ApiKeys.ADD_OFFSETS_TO_TXN => handleAddOffsetsToTxnRequest(request)
case ApiKeys.END_TXN => handleEndTxnRequest(request)
case ApiKeys.WRITE_TXN_MARKERS => handleWriteTxnMarkersRequest(request)
case ApiKeys.TXN_OFFSET_COMMIT => handleTxnOffsetCommitRequest(request)
case ApiKeys.DESCRIBE_ACLS => handleDescribeAcls(request)
case ApiKeys.CREATE_ACLS => handleCreateAcls(request)
case ApiKeys.DELETE_ACLS => handleDeleteAcls(request)
case ApiKeys.ALTER_COnFIGS=> handleAlterConfigsRequest(request)
case ApiKeys.DESCRIBE_COnFIGS=> handleDescribeConfigsRequest(request)
case ApiKeys.ALTER_REPLICA_LOG_DIRS => handleAlterReplicaLogDirsRequest(request)
case ApiKeys.DESCRIBE_LOG_DIRS => handleDescribeLogDirsRequest(request)
case ApiKeys.SASL_AUTHENTICATE => handleSaslAuthenticateRequest(request)
case ApiKeys.CREATE_PARTITIOnS=> handleCreatePartitionsRequest(request)
case ApiKeys.CREATE_DELEGATION_TOKEN => handleCreateTokenRequest(request)
case ApiKeys.RENEW_DELEGATION_TOKEN => handleRenewTokenRequest(request)
case ApiKeys.EXPIRE_DELEGATION_TOKEN => handleExpireTokenRequest(request)
case ApiKeys.DESCRIBE_DELEGATION_TOKEN => handleDescribeTokensRequest(request)
case ApiKeys.DELETE_GROUPS => handleDeleteGroupsRequest(request)
}
}


  1. RequestHandler从全局阻塞队列获取网络层组装完的Request,并调用KafkaApi的handle方法

  2. handle方法类似web开发中的dispatch,根据ApiKeys调用对应的handle*执行实际业务逻辑

可以看到API层只是对执行线程、根据ApiKeys进行请求路由的封装。实际逻辑由KafkaApis类中的handle*方法实现,而其中最核心的方法为PRODUCE与FETCH请求,分别对应消息的生产与消费,下面的文章将专门针对这两种API进行源码分析


同类文章



  • Kafka Broker源码解析一:网络层设计

  • Kafka Broker源码解析三:PRODUCE请求 TODO

  • Kafka Broker源码解析四:FETCH请求求 TODO


推荐阅读
  • Python 内存管理机制详解
    本文深入探讨了Python的内存管理机制,涵盖了垃圾回收、引用计数和内存池机制。通过具体示例和专业解释,帮助读者理解Python如何高效地管理和释放内存资源。 ... [详细]
  • C#设计模式学习笔记:观察者模式解析
    本文将探讨观察者模式的基本概念、应用场景及其在C#中的实现方法。通过借鉴《Head First Design Patterns》和维基百科等资源,详细介绍该模式的工作原理,并提供具体代码示例。 ... [详细]
  • Appium + Java 自动化测试中处理页面空白区域点击问题
    在进行移动应用自动化测试时,有时会遇到某些页面没有返回按钮,只能通过点击空白区域返回的情况。本文将探讨如何在Appium + Java环境中有效解决此类问题,并提供详细的解决方案。 ... [详细]
  • 利用Selenium与ChromeDriver实现豆瓣网页全屏截图
    本文介绍了一种使用Selenium和ChromeDriver结合Python代码,轻松实现对豆瓣网站进行完整页面截图的方法。该方法不仅简单易行,而且解决了新版Selenium不再支持PhantomJS的问题。 ... [详细]
  • 解决TensorFlow CPU版本安装中的依赖问题
    本文记录了在安装CPU版本的TensorFlow过程中遇到的依赖问题及解决方案,特别是numpy版本不匹配和动态链接库(DLL)错误。通过详细的步骤说明和专业建议,帮助读者顺利安装并使用TensorFlow。 ... [详细]
  • 探索新一代API文档工具,告别Swagger的繁琐
    对于后端开发者而言,编写和维护API文档既繁琐又不可或缺。本文将介绍一款全新的API文档工具,帮助团队更高效地协作,简化API文档生成流程。 ... [详细]
  • 探讨 HDU 1536 题目,即 S-Nim 游戏的博弈策略。通过 SG 函数分析游戏胜负的关键,并介绍如何编程实现解决方案。 ... [详细]
  • 深入解析动态代理模式:23种设计模式之三
    在设计模式中,动态代理模式是应用最为广泛的一种代理模式。它允许我们在运行时动态创建代理对象,并在调用方法时进行增强处理。本文将详细介绍动态代理的实现机制及其应用场景。 ... [详细]
  • 通常情况下,修改my.cnf配置文件后需要重启MySQL服务才能使新参数生效。然而,通过特定命令可以在不重启服务的情况下实现配置的即时更新。本文将详细介绍如何在线调整MySQL配置,并验证其有效性。 ... [详细]
  • Python自动化测试入门:Selenium环境搭建
    本文详细介绍如何在Python环境中安装和配置Selenium,包括开发工具PyCharm的安装、Python环境的设置以及Selenium包的安装方法。此外,还提供了编写和运行第一个自动化测试脚本的步骤。 ... [详细]
  • 本文探讨了在 SQL Server 中使用 JDBC 插入数据时遇到的问题。通过详细分析代码和数据库配置,提供了解决方案并解释了潜在的原因。 ... [详细]
  • 本题要求在一组数中反复取出两个数相加,并将结果放回数组中,最终求出最小的总加法代价。这是一个经典的哈夫曼编码问题,利用贪心算法可以有效地解决。 ... [详细]
  • 本文探讨了C++编程中理解代码执行期间复杂度的挑战,特别是编译器在程序运行时生成额外指令以确保对象构造、内存管理、类型转换及临时对象创建的安全性。 ... [详细]
  • 本文详细介绍了如何解决 Microsoft SQL Server 中用户 'sa' 登录失败的问题。错误代码为 18470,提示该帐户已被禁用。我们将通过 Windows 身份验证方式登录,并启用 'sa' 帐户以恢复其访问权限。 ... [详细]
  • 嵌入式开发环境搭建与文件传输指南
    本文详细介绍了如何为嵌入式应用开发搭建必要的软硬件环境,并提供了通过串口和网线两种方式将文件传输到开发板的具体步骤。适合Linux开发初学者参考。 ... [详细]
author-avatar
杀手也热血_949
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有