热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

KafkaBroker源码解析二:API层设计

一、简介版本:1.1.1API层,是一个Facade模式,封装了Kafka所有功能对外提供服务,通过请求中的ApiKeys,进行请求分发,调用对应的API进行处理API层,创建了个

一、简介



  • 版本:1.1.1

  • API层,是一个Facade模式,封装了Kafka所有功能对外提供服务,通过请求中的ApiKeys,进行请求分发,调用对应的API进行处理

  • API层,创建了个线程用于进行逻辑处理、IO操作,所有的API行为均线程中完成


二、整体架构


2.1 核心逻辑


相对于网络层,API层的实现相当简单,只是对底层实现的封装,得益于此,从API层几乎就能了解到Kafka broker所能提供的全部功能




  1. 启动num.io.threads个RequestHandler

  2. 每个RequestHandler都从全局requestQueue中poll Request

  3. 根据Request的ApiKeys调用对应的handle*Api进行实际的业务逻辑处理


2.2 核心类、方法介绍

KafkaRequestHandler
|-- run()
KafkaApis
|-- handle() // 所有请求的入口,根据ApiKeys分发请求

三、核心流程分析


3.1 启动流程


启动流程前半部分和网络层一致,这里不再赘述


// KafkaServer.scala
def startup() {
// 初始化api层
apis = new KafkaApis(socketServer.requestChannel, replicaManager, adminManager, groupCoordinator, transactionCoordinator,
kafkaController, zkClient, config.brokerId, config, metadataCache, metrics, authorizer, quotaManagers,
fetchManager, brokerTopicStats, clusterId, time, tokenManager)
// 初始化IO线程池
requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, time,
config.numIoThreads)
}
// KafkaRequestHandler.scala
class KafkaRequestHandlerPool(val brokerId: Int,
val requestChannel: RequestChannel,
val apis: KafkaApis,
time: Time,
numThreads: Int) extends Logging with KafkaMetricsGroup {
val runnables = new mutable.ArrayBuffer[KafkaRequestHandler](numThreads)
for (i <- 0 until numThreads) {
createHandler(i)
}

def createHandler(id: Int): Unit = synchronized {
runnables += new KafkaRequestHandler(id, brokerId, aggregateIdleMeter, threadPoolSize, requestChannel, apis, time)
KafkaThread.daemon("kafka-request-handler-" + id, runnables(id)).start()
}
}

实际就是实例化API层的入口,再启动num.io.threads个RequestHandler线程,用于实际处理请求,值得注意的是这里的线程池,其实不是真正意义上的线程池,线程数目是固定的,只有通过动态参数改变线程池大小时,才会重新调整线程数目

def resizeThreadPool(newSize: Int): Unit = synchronized {
val currentSize = threadPoolSize.get
info(s"Resizing request handler thread pool size from $currentSize to $newSize")
if (newSize > currentSize) {
// 补上差额的线程
for (i <- currentSize until newSize) {
createHandler(i)
}
} else if (newSize for (i <- 1 to (currentSize - newSize)) {
// 关闭部分线程直到达到目标值
runnables.remove(currentSize - i).stop()
}
}
threadPoolSize.set(newSize)
}

3.2 请求分发流程


RequestHandler线程启动后,每个线程都一直从网络层获取Request再交给KafkaApis进行请求分发


// KafkaRequestHandler.scala
def run() {
while (!stopped) {
// 从全局的阻塞队列里面拉取请求(网络层处理完并反序列化后转换成request)
val req = requestChannel.receiveRequest(300)
req match {
// 关闭命令
case RequestChannel.ShutdownRequest =>
shutdownComplete.countDown()
return
// 正常请求
case request: RequestChannel.Request =>
// 请求路由分发
apis.handle(request)
case null => // continue
}
}
shutdownComplete.countDown()
}
// RequestChannel.scala
private val requestQueue = new ArrayBlockingQueue[BaseRequest](queueSize)
def receiveRequest(timeout: Long): RequestChannel.BaseRequest =
requestQueue.poll(timeout, TimeUnit.MILLISECONDS)

// KafkaApis.scala
def handle(request: RequestChannel.Request) {
// 只是根据request中的apiKey分发请求,实际处理逻辑由对应的handle*方法实现
request.header.apiKey match {
case ApiKeys.PRODUCE => handleProduceRequest(request)
case ApiKeys.FETCH => handleFetchRequest(request)
case ApiKeys.LIST_OFFSETS => handleListOffsetRequest(request)
case ApiKeys.METADATA => handleTopicMetadataRequest(request)
case ApiKeys.LEADER_AND_ISR => handleLeaderAndIsrRequest(request)
case ApiKeys.STOP_REPLICA => handleStopReplicaRequest(request)
case ApiKeys.UPDATE_METADATA => handleUpdateMetadataRequest(request)
case ApiKeys.CONTROLLED_SHUTDOWN => handleControlledShutdownRequest(request)
case ApiKeys.OFFSET_COMMIT => handleOffsetCommitRequest(request)
case ApiKeys.OFFSET_FETCH => handleOffsetFetchRequest(request)
case ApiKeys.FIND_COORDINATOR => handleFindCoordinatorRequest(request)
case ApiKeys.JOIN_GROUP => handleJoinGroupRequest(request)
case ApiKeys.HEARTBEAT => handleHeartbeatRequest(request)
case ApiKeys.LEAVE_GROUP => handleLeaveGroupRequest(request)
case ApiKeys.SYNC_GROUP => handleSyncGroupRequest(request)
case ApiKeys.DESCRIBE_GROUPS => handleDescribeGroupRequest(request)
case ApiKeys.LIST_GROUPS => handleListGroupsRequest(request)
case ApiKeys.SASL_HANDSHAKE => handleSaslHandshakeRequest(request)
case ApiKeys.API_VERSIOnS=> handleApiVersionsRequest(request)
case ApiKeys.CREATE_TOPICS => handleCreateTopicsRequest(request)
case ApiKeys.DELETE_TOPICS => handleDeleteTopicsRequest(request)
case ApiKeys.DELETE_RECORDS => handleDeleteRecordsRequest(request)
case ApiKeys.INIT_PRODUCER_ID => handleInitProducerIdRequest(request)
case ApiKeys.OFFSET_FOR_LEADER_EPOCH => handleOffsetForLeaderEpochRequest(request)
case ApiKeys.ADD_PARTITIONS_TO_TXN => handleAddPartitionToTxnRequest(request)
case ApiKeys.ADD_OFFSETS_TO_TXN => handleAddOffsetsToTxnRequest(request)
case ApiKeys.END_TXN => handleEndTxnRequest(request)
case ApiKeys.WRITE_TXN_MARKERS => handleWriteTxnMarkersRequest(request)
case ApiKeys.TXN_OFFSET_COMMIT => handleTxnOffsetCommitRequest(request)
case ApiKeys.DESCRIBE_ACLS => handleDescribeAcls(request)
case ApiKeys.CREATE_ACLS => handleCreateAcls(request)
case ApiKeys.DELETE_ACLS => handleDeleteAcls(request)
case ApiKeys.ALTER_COnFIGS=> handleAlterConfigsRequest(request)
case ApiKeys.DESCRIBE_COnFIGS=> handleDescribeConfigsRequest(request)
case ApiKeys.ALTER_REPLICA_LOG_DIRS => handleAlterReplicaLogDirsRequest(request)
case ApiKeys.DESCRIBE_LOG_DIRS => handleDescribeLogDirsRequest(request)
case ApiKeys.SASL_AUTHENTICATE => handleSaslAuthenticateRequest(request)
case ApiKeys.CREATE_PARTITIOnS=> handleCreatePartitionsRequest(request)
case ApiKeys.CREATE_DELEGATION_TOKEN => handleCreateTokenRequest(request)
case ApiKeys.RENEW_DELEGATION_TOKEN => handleRenewTokenRequest(request)
case ApiKeys.EXPIRE_DELEGATION_TOKEN => handleExpireTokenRequest(request)
case ApiKeys.DESCRIBE_DELEGATION_TOKEN => handleDescribeTokensRequest(request)
case ApiKeys.DELETE_GROUPS => handleDeleteGroupsRequest(request)
}
}


  1. RequestHandler从全局阻塞队列获取网络层组装完的Request,并调用KafkaApi的handle方法

  2. handle方法类似web开发中的dispatch,根据ApiKeys调用对应的handle*执行实际业务逻辑

可以看到API层只是对执行线程、根据ApiKeys进行请求路由的封装。实际逻辑由KafkaApis类中的handle*方法实现,而其中最核心的方法为PRODUCE与FETCH请求,分别对应消息的生产与消费,下面的文章将专门针对这两种API进行源码分析


同类文章



  • Kafka Broker源码解析一:网络层设计

  • Kafka Broker源码解析三:PRODUCE请求 TODO

  • Kafka Broker源码解析四:FETCH请求求 TODO


推荐阅读
  • 我的读书清单(持续更新)201705311.《一千零一夜》2006(四五年级)2.《中华上下五千年》2008(初一)3.《鲁滨孙漂流记》2008(初二)4.《钢铁是怎样炼成的》20 ... [详细]
  • Windows操作系统提供了Encrypting File System (EFS)作为内置的数据加密工具,特别适用于对NTFS分区上的文件和文件夹进行加密处理。本文将详细介绍如何使用EFS加密文件夹,以及加密过程中的注意事项。 ... [详细]
  • 如何在PHP中安装Xdebug扩展
    本文介绍了如何从PECL下载并编译安装Xdebug扩展,以及如何配置PHP和PHPStorm以启用调试功能。 ... [详细]
  • 本文探讨了在一个物理隔离的环境中构建数据交换平台所面临的挑战,包括但不限于数据加密、传输监控及确保文件交换的安全性和可靠性。同时,作者结合自身项目经验,分享了项目规划、实施过程中的关键决策及其背后的思考。 ... [详细]
  • importjava.io.*;importjava.util.*;publicclass五子棋游戏{staticintm1;staticintn1;staticfinalintS ... [详细]
  • 1#include2#defineM1000103#defineRGregister4#defineinf0x3f3f3f3f5usingnamespacestd;6boolrev ... [详细]
  • 在 Django 模型中,ForeignKey 的 on_delete 参数定义了当关联对象被删除时,当前模型实例的行为。本文详细解释了 on_delete 的各个选项及其具体作用。 ... [详细]
  • 题目编号:2049 [SDOI2008]Cave Exploration。题目描述了一种动态图操作场景,涉及三种基本操作:断开两个节点间的连接(destroy(a,b))、建立两个节点间的连接(connect(a,b))以及查询两节点是否连通(query(a,b))。所有操作均确保图中无环存在。 ... [详细]
  • 嵌套列表的扁平化处理
    本文介绍了一种方法,用于遍历嵌套列表中的每个元素。如果元素是整数,则将其添加到结果数组中;如果元素是一个列表,则递归地遍历这个列表。此方法特别适用于处理复杂数据结构中的嵌套列表。 ... [详细]
  • 本文详细探讨了BCTF竞赛中窃密木马题目的解题策略,重点分析了该题目在漏洞挖掘与利用方面的技巧。 ... [详细]
  • SQL Server 存储过程实践任务(第二部分)
    本文档详细介绍了三个SQL Server存储过程的创建与使用方法,包括统计特定类型客房的入住人数、根据房间号查询客房详情以及删除特定类型的客房记录。 ... [详细]
  • 在编程实践中,正确管理和释放资源是非常重要的。本文将探讨 Python 中的 'with' 关键字及其背后的上下文管理器机制,以及它们如何帮助我们更安全、高效地管理资源。 ... [详细]
  • 材料光学属性集
    材料光学属性集概述了材料在不同光谱下的光学行为,包括可见光透射率、太阳光透射率等关键参数。 ... [详细]
  • 本文提供了《汇编语言 第3版》中检测点11.2的详细参考答案,包括了各指令执行后的状态标志分析。 ... [详细]
  • 使用Echarts for Weixin 小程序实现中国地图及区域点击事件
    本文介绍了如何使用Echarts for Weixin在微信小程序中构建中国地图,并实现区域点击事件。包括效果展示、条件准备和逻辑实现的具体步骤。 ... [详细]
author-avatar
杀手也热血_949
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有