一、场景分析 在《深入理解Kafka服务端之Processor线程是如何工作的》中,通过分析得知Processor线程最终将接收到的客户端请求封装成Request对象,放入了RequestChannel的requestQueue请求队列中。那么这些队列中的请求是如何被处理的呢?这篇进行详细分析。
二、图示说明三、源码分析
既然请求都被封装成Request对象放到了请求队列中,那么就肯定会有一个线程去获取这些请求对象,进行相应的处理。在之前Acceptor线程启动的过程中,我们从服务端程序入口(即Kafka.main()方法)开始,一直找到了KafkaServer.startup()方法,当时提到过,整个Kafka服务端的功能都在这个startup()方法中启动,那么继续从startup方法中查找处理请求队列的相应代码,结果如下:
//TODO 创建处理Request请求的线程池,这里的numIoThreads就是线程池的容量,由服务端参数num.io.threads决定,默认为8dataPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.dataPlaneRequestChannel, dataPlaneRequestProcessor, time, config.numIoThreads, s"${SocketServer.DataPlaneMetricPrefix}RequestHandlerAvgIdlePercent", SocketServer.DataPlaneThreadPrefix)
前面分析Kafka网络架构采用了主从Reactor多线程模型时,提到真正的网络I/O操作会交给I/O线程池中的I/O线程完成,那么这里的KafkaRequestHandlerPool就是处理请求的I/O线程池。
1. 查看的KafkaRequestHandlerPool主构造函数如下:
class KafkaRequestHandlerPool(val brokerId: Int, val requestChannel: RequestChannel, val apis: KafkaApis, time: Time, numThreads: Int, requestHandlerAvgIdleMetricName: String, logAndThreadNamePrefix : String) extends Logging with KafkaMetricsGroup { //线程池中线程数量 private val threadPoolSize: AtomicInteger = new AtomicInteger(numThreads) private val aggregateIdleMeter = newMeter(requestHandlerAvgIdleMetricName, "percent", TimeUnit.NANOSECONDS) this.logIdent = "[" + logAndThreadNamePrefix + " Kafka Request Handler on Broker " + brokerId + "], " //管理线程的数组 val runnables = new mutable.ArrayBuffer[KafkaRequestHandler](numThreads) for (i 0 until numThreads) { //创建IO处理线程 createHandler(i) } ...
这里重点看几个参数:
requestChannel:SocketServer 中的请求通道对象。由于KafkaRequestHandlerPool是线程池对象,那么内部线程处理的请求来源在哪儿?请求恰恰是保存在 RequestChannel 中的请求队列requestQueue中,因此,Kafka 在构造 KafkaRequestHandlerPool实例时,必须关联 SocketServer 组件中的 RequestChannel 实例,让 I/O 线程能够找到请求被保存的地方。
apis:KafkaApis对象。IO线程会将拿到的请求对象Request交给KafkaApis去执行真正的逻辑处理。
numThreads:线程池中线程的数量。初始化线程池时要创建多少个线程由这个参数决定。而这个参数又是由broker端的参数num.io.threads决定的,默认为8。
除了这几个参数,在构造KafkaRequestHandlerPool实例时,会根据线程数创建KafkaRequestHandler线程,并将这些线程放入线程池管理线程的数组runnables中。
除了主构造函数,再来看几个KafkaRequestHandlerPool的方法:
a. createHandler(id:Int):创建线程的方法
def createHandler(id: Int): Unit = synchronized { //创建KafkaRequestHandler线程,并放入runnables数组 runnables += new KafkaRequestHandler(id, brokerId, aggregateIdleMeter, threadPoolSize, requestChannel, apis, time) //设置线程为守护线程,并启动 KafkaThread.daemon(logAndThreadNamePrefix + "-kafka-request-handler-" + id, runnables(id)).start()}
b. resizeThreadPool(newSize: Int):根据给定值调整线程池的线程数量
def resizeThreadPool(newSize: Int): Unit = synchronized { val currentSize = threadPoolSize.get info(s"Resizing request handler thread pool size from $currentSize to $newSize") //如果给定的值大于当前线程池的容量,则创建差值个新的线程,放入线程池并启动 if (newSize > currentSize) { for (i createHandler(i) } //如果给定的值小于当前线程池容量,则将多余的线程从线程池中移除,并关闭 } else if (newSize 2. 这里我们再看一下KafkaRequestHandler的主构造函数:
class KafkaRequestHandler(id: Int, //IO线程编号 brokerId: Int, val aggregateIdleMeter: Meter, val totalHandlerThreads: AtomicInteger,//线程池中线程数量 val requestChannel: RequestChannel,//请求处理的通道,里面包含了requestQueue apis: KafkaApis,//KafkaApis类,用于真正实现请求处理逻辑的类 time: Time) extends Runnable with Logging { this.logIdent = "[Kafka Request Handler " + id + " on Broker " + brokerId + "], " //定义一个CountDownLatch对象,便于管理线程的执行 private val shutdownComplete = new CountDownLatch(1) @volatile private var stopped = false ...
参数apis、requestChannel和中KafkaRequestHandlerPool的一样:
既然KafkaRequestHandler是一个线程类,那么它的工作逻辑就在run()方法中:def run() { //只要线程未关闭,就不断地进行循环 while (!stopped) { val startSelectTime = time.nanoseconds //从requestQueue中获取一个Request对象 val req = requestChannel.receiveRequest(300) val endTime = time.nanoseconds //统计线程的空闲时间 val idleTime = endTime - startSelectTime //更新线程空闲百分比指标 aggregateIdleMeter.mark(idleTime / totalHandlerThreads.get) //判断Request请求的类型 req match { //如果是关闭线程的请求 case RequestChannel.ShutdownRequest => debug(s"Kafka request handler $id on broker $brokerId received shut down command") //关闭线程 shutdownComplete.countDown() return //如果是普通请求 case request: RequestChannel.Request => try { request.requestDequeueTimeNanos = endTime trace(s"Kafka request handler $id on broker $brokerId handling request $request") //由KafkaApis.handle方法执行相应处理逻辑 apis.handle(request) } catch { //出现严重错误,立即关闭连接 case e: FatalExitError => shutdownComplete.countDown() Exit.exit(e.statusCode) //如果是普通异常,则记录日志 case e: Throwable => error("Exception when handling request", e) } finally { //释放Request占用的缓冲区资源 request.releaseBuffer() } case null => // 继续循环 } } shutdownComplete.countDown()}
整个run()方法的流程图如下:
这里最重要的就是调用KafkaApis的handle方法处理请求://由KafkaApis.handle方法执行相应处理逻辑apis.handle(request)
这个方法会根据不同的请求类型进行最终的逻辑处理,之后封装Response对象并返回给Processor线程的responseQueue队列。
KafkaApis的处理逻辑较多,下一篇再进行分析。
总结:
请求队列中的Request对象的处理逻辑分以下几步:
- 服务端程序启动时,会创建KafkaRequestHandlerPool线程池对象
- 构建线程池对象时,根据num.io.threads参数创建KafkaRequestHandler线程,默认创建8个
- KafkaRequestHandler线程不断地从requestQueue请求队列中获取请求
- 如果是普通请求,则调用KafkaApis的handle方法进行处理
- 如果是关闭线程的请求,则关闭线程
更新线程空闲百分比指标参考资料:https://time.geekbang.org/column/article/233233