热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

深入理解Kafka服务端请求队列中请求的处理

本文深入分析了Kafka服务端请求队列中请求的处理过程,详细介绍了请求的封装和放入请求队列的过程,以及处理请求的线程池的创建和容量设置。通过场景分析、图示说明和源码分析,帮助读者更好地理解Kafka服务端的工作原理。
一、场景分析

    在《深入理解Kafka服务端之Processor线程是如何工作的》中,通过分析得知Processor线程最终将接收到的客户端请求封装成Request对象,放入了RequestChannel的requestQueue请求队列中。那么这些队列中的请求是如何被处理的呢?这篇进行详细分析。

二、图示说明

ddbf5d3b03b83311ca3356b7f66fdcdc.png

三、源码分析

    既然请求都被封装成Request对象放到了请求队列中,那么就肯定会有一个线程去获取这些请求对象,进行相应的处理。在之前Acceptor线程启动的过程中,我们从服务端程序入口(即Kafka.main()方法)开始,一直找到了KafkaServer.startup()方法,当时提到过,整个Kafka服务端的功能都在这个startup()方法中启动,那么继续从startup方法中查找处理请求队列的相应代码,结果如下:

//TODO 创建处理Request请求的线程池,这里的numIoThreads就是线程池的容量,由服务端参数num.io.threads决定,默认为8dataPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.dataPlaneRequestChannel, dataPlaneRequestProcessor, time, config.numIoThreads, s"${SocketServer.DataPlaneMetricPrefix}RequestHandlerAvgIdlePercent", SocketServer.DataPlaneThreadPrefix)前面分析Kafka网络架构采用了主从Reactor多线程模型时,提到真正的网络I/O操作会交给I/O线程池中的I/O线程完成,那么这里的KafkaRequestHandlerPool就是处理请求的I/O线程池。

    1. 查看的KafkaRequestHandlerPool主构造函数如下:

class KafkaRequestHandlerPool(val brokerId: Int, val requestChannel: RequestChannel, val apis: KafkaApis, time: Time, numThreads: Int, requestHandlerAvgIdleMetricName: String, logAndThreadNamePrefix : String) extends Logging with KafkaMetricsGroup { //线程池中线程数量  private val threadPoolSize: AtomicInteger = new AtomicInteger(numThreads) private val aggregateIdleMeter = newMeter(requestHandlerAvgIdleMetricName, "percent", TimeUnit.NANOSECONDS) this.logIdent = "[" + logAndThreadNamePrefix + " Kafka Request Handler on Broker " + brokerId + "], " //管理线程的数组 val runnables = new mutable.ArrayBuffer[KafkaRequestHandler](numThreads) for (i 0 until numThreads) { //创建IO处理线程 createHandler(i) } ...这里重点看几个参数:

  • requestChannel:SocketServer 中的请求通道对象。由于KafkaRequestHandlerPool是线程池对象,那么内部线程处理的请求来源在哪儿?请求恰恰是保存在 RequestChannel 中的请求队列requestQueue中,因此,Kafka 在构造 KafkaRequestHandlerPool实例时,必须关联 SocketServer 组件中的 RequestChannel 实例,让 I/O 线程能够找到请求被保存的地方。

  • apis:KafkaApis对象。IO线程会将拿到的请求对象Request交给KafkaApis去执行真正的逻辑处理。

  • numThreads:线程池中线程的数量。初始化线程池时要创建多少个线程由这个参数决定。而这个参数又是由broker端的参数num.io.threads决定的,默认为8。

除了这几个参数,在构造KafkaRequestHandlerPool实例时,会根据线程数创建KafkaRequestHandler线程,并将这些线程放入线程池管理线程的数组runnables中。

    除了主构造函数,再来看几个KafkaRequestHandlerPool的方法:

    a. createHandler(id:Int):创建线程的方法

  • 创建KafkaRequestHandler线程对象

  • 将线程对象放入runnables数组

  • 设置线程为守护线程并启动

def createHandler(id: Int): Unit = synchronized { //创建KafkaRequestHandler线程,并放入runnables数组 runnables += new KafkaRequestHandler(id, brokerId, aggregateIdleMeter, threadPoolSize, requestChannel, apis, time) //设置线程为守护线程,并启动 KafkaThread.daemon(logAndThreadNamePrefix + "-kafka-request-handler-" + id, runnables(id)).start()}

    b. resizeThreadPool(newSize: Int):根据给定值调整线程池的线程数量

  • 如果给定的值大于当前线程池中的线程数,则创建差值个新的线程,放入线程池并启动

  • 如果给定的值小于当前线程池中的线程数,则将多余的线程从线程池中移除,并关闭移除的线程

  • 更新线程池中线程的数量

def resizeThreadPool(newSize: Int): Unit = synchronized { val currentSize = threadPoolSize.get info(s"Resizing request handler thread pool size from $currentSize to $newSize") //如果给定的值大于当前线程池的容量,则创建差值个新的线程,放入线程池并启动 if (newSize > currentSize) { for (i createHandler(i) } //如果给定的值小于当前线程池容量,则将多余的线程从线程池中移除,并关闭 } else if (newSize

    2. 这里我们再看一下KafkaRequestHandler的主构造函数:

class KafkaRequestHandler(id: Int, //IO线程编号 brokerId: Int, val aggregateIdleMeter: Meter, val totalHandlerThreads: AtomicInteger,//线程池中线程数量 val requestChannel: RequestChannel,//请求处理的通道,里面包含了requestQueue apis: KafkaApis,//KafkaApis类,用于真正实现请求处理逻辑的类 time: Time) extends Runnable with Logging { this.logIdent = "[Kafka Request Handler " + id + " on Broker " + brokerId + "], "  //定义一个CountDownLatch对象,便于管理线程的执行 private val shutdownComplete = new CountDownLatch(1) @volatile private var stopped = false ...参数apis、requestChannel和中KafkaRequestHandlerPool的一样:

  • id:这里的id是线程池中的线程序号
  • totalHandlerThreads:线程池中的线程总数

既然KafkaRequestHandler是一个线程类,那么它的工作逻辑就在run()方法中:

def run() { //只要线程未关闭,就不断地进行循环  while (!stopped) { val startSelectTime = time.nanoseconds //从requestQueue中获取一个Request对象 val req = requestChannel.receiveRequest(300) val endTime = time.nanoseconds //统计线程的空闲时间 val idleTime = endTime - startSelectTime //更新线程空闲百分比指标 aggregateIdleMeter.mark(idleTime / totalHandlerThreads.get) //判断Request请求的类型 req match { //如果是关闭线程的请求 case RequestChannel.ShutdownRequest => debug(s"Kafka request handler $id on broker $brokerId received shut down command") //关闭线程 shutdownComplete.countDown() return //如果是普通请求 case request: RequestChannel.Request => try { request.requestDequeueTimeNanos = endTime trace(s"Kafka request handler $id on broker $brokerId handling request $request") //由KafkaApis.handle方法执行相应处理逻辑 apis.handle(request) } catch { //出现严重错误,立即关闭连接 case e: FatalExitError => shutdownComplete.countDown() Exit.exit(e.statusCode) //如果是普通异常,则记录日志 case e: Throwable => error("Exception when handling request", e) } finally { //释放Request占用的缓冲区资源 request.releaseBuffer() } case null => // 继续循环 } } shutdownComplete.countDown()}

整个run()方法的流程图如下:

db3d9f10aa7693383126cda0b16e279a.png

这里最重要的就是调用KafkaApis的handle方法处理请求:

//由KafkaApis.handle方法执行相应处理逻辑apis.handle(request)这个方法会根据不同的请求类型进行最终的逻辑处理,之后封装Response对象并返回给Processor线程的responseQueue队列。

    KafkaApis的处理逻辑较多,下一篇再进行分析。

总结:

请求队列中的Request对象的处理逻辑分以下几步:

  1. 服务端程序启动时,会创建KafkaRequestHandlerPool线程池对象
  2. 构建线程池对象时,根据num.io.threads参数创建KafkaRequestHandler线程,默认创建8个
  3. KafkaRequestHandler线程不断地从requestQueue请求队列中获取请求
  • 如果是普通请求,则调用KafkaApis的handle方法进行处理
  • 如果是关闭线程的请求,则关闭线程
更新线程空闲百分比指标参考资料:https://time.geekbang.org/column/article/233233



推荐阅读
  • Java 类成员初始化顺序与数组创建
    本文探讨了Java中类成员的初始化顺序、静态引入、可变参数以及finalize方法的应用。通过具体的代码示例,详细解释了这些概念及其在实际编程中的使用。 ... [详细]
  • 本文详细探讨了KMP算法中next数组的构建及其应用,重点分析了未改良和改良后的next数组在字符串匹配中的作用。通过具体实例和代码实现,帮助读者更好地理解KMP算法的核心原理。 ... [详细]
  • 本文介绍了Java并发库中的阻塞队列(BlockingQueue)及其典型应用场景。通过具体实例,展示了如何利用LinkedBlockingQueue实现线程间高效、安全的数据传递,并结合线程池和原子类优化性能。 ... [详细]
  • MQTT技术周报:硬件连接与协议解析
    本周开发笔记重点介绍了在新项目中使用MQTT协议进行硬件连接的技术细节,涵盖其特性、原理及实现步骤。 ... [详细]
  • andr ... [详细]
  • 本文提供了使用Java实现Bellman-Ford算法解决POJ 3259问题的代码示例,详细解释了如何通过该算法检测负权环来判断时间旅行的可能性。 ... [详细]
  • 实体映射最强工具类:MapStruct真香 ... [详细]
  • 本文详细探讨了HTML表单中GET和POST请求的区别,包括它们的工作原理、数据传输方式、安全性及适用场景。同时,通过实例展示了如何在Servlet中处理这两种请求。 ... [详细]
  • 深入解析Redis内存对象模型
    本文详细介绍了Redis内存对象模型的关键知识点,包括内存统计、内存分配、数据存储细节及优化策略。通过实际案例和专业分析,帮助读者全面理解Redis内存管理机制。 ... [详细]
  • 本题探讨了一种字符串变换方法,旨在判断两个给定的字符串是否可以通过特定的字母替换和位置交换操作相互转换。核心在于找到这些变换中的不变量,从而确定转换的可能性。 ... [详细]
  • 本文详细介绍如何使用Python进行配置文件的读写操作,涵盖常见的配置文件格式(如INI、JSON、TOML和YAML),并提供具体的代码示例。 ... [详细]
  • Java 中 Writer flush()方法,示例 ... [详细]
  • 本文介绍了如何使用 Spring Boot DevTools 实现应用程序在开发过程中自动重启。这一特性显著提高了开发效率,特别是在集成开发环境(IDE)中工作时,能够提供快速的反馈循环。默认情况下,DevTools 会监控类路径上的文件变化,并根据需要触发应用重启。 ... [详细]
  • Java 中的 BigDecimal pow()方法,示例 ... [详细]
  • 主要用了2个类来实现的,话不多说,直接看运行结果,然后在奉上源代码1.Index.javaimportjava.awt.Color;im ... [详细]
author-avatar
houxue
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有