热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

深入理解Kafka服务端请求队列中请求的处理

本文深入分析了Kafka服务端请求队列中请求的处理过程,详细介绍了请求的封装和放入请求队列的过程,以及处理请求的线程池的创建和容量设置。通过场景分析、图示说明和源码分析,帮助读者更好地理解Kafka服务端的工作原理。
一、场景分析

    在《深入理解Kafka服务端之Processor线程是如何工作的》中,通过分析得知Processor线程最终将接收到的客户端请求封装成Request对象,放入了RequestChannel的requestQueue请求队列中。那么这些队列中的请求是如何被处理的呢?这篇进行详细分析。

二、图示说明

ddbf5d3b03b83311ca3356b7f66fdcdc.png

三、源码分析

    既然请求都被封装成Request对象放到了请求队列中,那么就肯定会有一个线程去获取这些请求对象,进行相应的处理。在之前Acceptor线程启动的过程中,我们从服务端程序入口(即Kafka.main()方法)开始,一直找到了KafkaServer.startup()方法,当时提到过,整个Kafka服务端的功能都在这个startup()方法中启动,那么继续从startup方法中查找处理请求队列的相应代码,结果如下:

//TODO 创建处理Request请求的线程池,这里的numIoThreads就是线程池的容量,由服务端参数num.io.threads决定,默认为8dataPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.dataPlaneRequestChannel, dataPlaneRequestProcessor, time, config.numIoThreads, s"${SocketServer.DataPlaneMetricPrefix}RequestHandlerAvgIdlePercent", SocketServer.DataPlaneThreadPrefix)前面分析Kafka网络架构采用了主从Reactor多线程模型时,提到真正的网络I/O操作会交给I/O线程池中的I/O线程完成,那么这里的KafkaRequestHandlerPool就是处理请求的I/O线程池。

    1. 查看的KafkaRequestHandlerPool主构造函数如下:

class KafkaRequestHandlerPool(val brokerId: Int, val requestChannel: RequestChannel, val apis: KafkaApis, time: Time, numThreads: Int, requestHandlerAvgIdleMetricName: String, logAndThreadNamePrefix : String) extends Logging with KafkaMetricsGroup { //线程池中线程数量  private val threadPoolSize: AtomicInteger = new AtomicInteger(numThreads) private val aggregateIdleMeter = newMeter(requestHandlerAvgIdleMetricName, "percent", TimeUnit.NANOSECONDS) this.logIdent = "[" + logAndThreadNamePrefix + " Kafka Request Handler on Broker " + brokerId + "], " //管理线程的数组 val runnables = new mutable.ArrayBuffer[KafkaRequestHandler](numThreads) for (i 0 until numThreads) { //创建IO处理线程 createHandler(i) } ...这里重点看几个参数:

  • requestChannel:SocketServer 中的请求通道对象。由于KafkaRequestHandlerPool是线程池对象,那么内部线程处理的请求来源在哪儿?请求恰恰是保存在 RequestChannel 中的请求队列requestQueue中,因此,Kafka 在构造 KafkaRequestHandlerPool实例时,必须关联 SocketServer 组件中的 RequestChannel 实例,让 I/O 线程能够找到请求被保存的地方。

  • apis:KafkaApis对象。IO线程会将拿到的请求对象Request交给KafkaApis去执行真正的逻辑处理。

  • numThreads:线程池中线程的数量。初始化线程池时要创建多少个线程由这个参数决定。而这个参数又是由broker端的参数num.io.threads决定的,默认为8。

除了这几个参数,在构造KafkaRequestHandlerPool实例时,会根据线程数创建KafkaRequestHandler线程,并将这些线程放入线程池管理线程的数组runnables中。

    除了主构造函数,再来看几个KafkaRequestHandlerPool的方法:

    a. createHandler(id:Int):创建线程的方法

  • 创建KafkaRequestHandler线程对象

  • 将线程对象放入runnables数组

  • 设置线程为守护线程并启动

def createHandler(id: Int): Unit = synchronized { //创建KafkaRequestHandler线程,并放入runnables数组 runnables += new KafkaRequestHandler(id, brokerId, aggregateIdleMeter, threadPoolSize, requestChannel, apis, time) //设置线程为守护线程,并启动 KafkaThread.daemon(logAndThreadNamePrefix + "-kafka-request-handler-" + id, runnables(id)).start()}

    b. resizeThreadPool(newSize: Int):根据给定值调整线程池的线程数量

  • 如果给定的值大于当前线程池中的线程数,则创建差值个新的线程,放入线程池并启动

  • 如果给定的值小于当前线程池中的线程数,则将多余的线程从线程池中移除,并关闭移除的线程

  • 更新线程池中线程的数量

def resizeThreadPool(newSize: Int): Unit = synchronized { val currentSize = threadPoolSize.get info(s"Resizing request handler thread pool size from $currentSize to $newSize") //如果给定的值大于当前线程池的容量,则创建差值个新的线程,放入线程池并启动 if (newSize > currentSize) { for (i createHandler(i) } //如果给定的值小于当前线程池容量,则将多余的线程从线程池中移除,并关闭 } else if (newSize

    2. 这里我们再看一下KafkaRequestHandler的主构造函数:

class KafkaRequestHandler(id: Int, //IO线程编号 brokerId: Int, val aggregateIdleMeter: Meter, val totalHandlerThreads: AtomicInteger,//线程池中线程数量 val requestChannel: RequestChannel,//请求处理的通道,里面包含了requestQueue apis: KafkaApis,//KafkaApis类,用于真正实现请求处理逻辑的类 time: Time) extends Runnable with Logging { this.logIdent = "[Kafka Request Handler " + id + " on Broker " + brokerId + "], "  //定义一个CountDownLatch对象,便于管理线程的执行 private val shutdownComplete = new CountDownLatch(1) @volatile private var stopped = false ...参数apis、requestChannel和中KafkaRequestHandlerPool的一样:

  • id:这里的id是线程池中的线程序号
  • totalHandlerThreads:线程池中的线程总数

既然KafkaRequestHandler是一个线程类,那么它的工作逻辑就在run()方法中:

def run() { //只要线程未关闭,就不断地进行循环  while (!stopped) { val startSelectTime = time.nanoseconds //从requestQueue中获取一个Request对象 val req = requestChannel.receiveRequest(300) val endTime = time.nanoseconds //统计线程的空闲时间 val idleTime = endTime - startSelectTime //更新线程空闲百分比指标 aggregateIdleMeter.mark(idleTime / totalHandlerThreads.get) //判断Request请求的类型 req match { //如果是关闭线程的请求 case RequestChannel.ShutdownRequest => debug(s"Kafka request handler $id on broker $brokerId received shut down command") //关闭线程 shutdownComplete.countDown() return //如果是普通请求 case request: RequestChannel.Request => try { request.requestDequeueTimeNanos = endTime trace(s"Kafka request handler $id on broker $brokerId handling request $request") //由KafkaApis.handle方法执行相应处理逻辑 apis.handle(request) } catch { //出现严重错误,立即关闭连接 case e: FatalExitError => shutdownComplete.countDown() Exit.exit(e.statusCode) //如果是普通异常,则记录日志 case e: Throwable => error("Exception when handling request", e) } finally { //释放Request占用的缓冲区资源 request.releaseBuffer() } case null => // 继续循环 } } shutdownComplete.countDown()}

整个run()方法的流程图如下:

db3d9f10aa7693383126cda0b16e279a.png

这里最重要的就是调用KafkaApis的handle方法处理请求:

//由KafkaApis.handle方法执行相应处理逻辑apis.handle(request)这个方法会根据不同的请求类型进行最终的逻辑处理,之后封装Response对象并返回给Processor线程的responseQueue队列。

    KafkaApis的处理逻辑较多,下一篇再进行分析。

总结:

请求队列中的Request对象的处理逻辑分以下几步:

  1. 服务端程序启动时,会创建KafkaRequestHandlerPool线程池对象
  2. 构建线程池对象时,根据num.io.threads参数创建KafkaRequestHandler线程,默认创建8个
  3. KafkaRequestHandler线程不断地从requestQueue请求队列中获取请求
  • 如果是普通请求,则调用KafkaApis的handle方法进行处理
  • 如果是关闭线程的请求,则关闭线程
更新线程空闲百分比指标参考资料:https://time.geekbang.org/column/article/233233



推荐阅读
author-avatar
houxue
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有