热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

深入理解Kafka服务端请求队列中请求的处理

本文深入分析了Kafka服务端请求队列中请求的处理过程,详细介绍了请求的封装和放入请求队列的过程,以及处理请求的线程池的创建和容量设置。通过场景分析、图示说明和源码分析,帮助读者更好地理解Kafka服务端的工作原理。
一、场景分析

    在《深入理解Kafka服务端之Processor线程是如何工作的》中,通过分析得知Processor线程最终将接收到的客户端请求封装成Request对象,放入了RequestChannel的requestQueue请求队列中。那么这些队列中的请求是如何被处理的呢?这篇进行详细分析。

二、图示说明

ddbf5d3b03b83311ca3356b7f66fdcdc.png

三、源码分析

    既然请求都被封装成Request对象放到了请求队列中,那么就肯定会有一个线程去获取这些请求对象,进行相应的处理。在之前Acceptor线程启动的过程中,我们从服务端程序入口(即Kafka.main()方法)开始,一直找到了KafkaServer.startup()方法,当时提到过,整个Kafka服务端的功能都在这个startup()方法中启动,那么继续从startup方法中查找处理请求队列的相应代码,结果如下:

//TODO 创建处理Request请求的线程池,这里的numIoThreads就是线程池的容量,由服务端参数num.io.threads决定,默认为8dataPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.dataPlaneRequestChannel, dataPlaneRequestProcessor, time, config.numIoThreads, s"${SocketServer.DataPlaneMetricPrefix}RequestHandlerAvgIdlePercent", SocketServer.DataPlaneThreadPrefix)前面分析Kafka网络架构采用了主从Reactor多线程模型时,提到真正的网络I/O操作会交给I/O线程池中的I/O线程完成,那么这里的KafkaRequestHandlerPool就是处理请求的I/O线程池。

    1. 查看的KafkaRequestHandlerPool主构造函数如下:

class KafkaRequestHandlerPool(val brokerId: Int, val requestChannel: RequestChannel, val apis: KafkaApis, time: Time, numThreads: Int, requestHandlerAvgIdleMetricName: String, logAndThreadNamePrefix : String) extends Logging with KafkaMetricsGroup { //线程池中线程数量  private val threadPoolSize: AtomicInteger = new AtomicInteger(numThreads) private val aggregateIdleMeter = newMeter(requestHandlerAvgIdleMetricName, "percent", TimeUnit.NANOSECONDS) this.logIdent = "[" + logAndThreadNamePrefix + " Kafka Request Handler on Broker " + brokerId + "], " //管理线程的数组 val runnables = new mutable.ArrayBuffer[KafkaRequestHandler](numThreads) for (i 0 until numThreads) { //创建IO处理线程 createHandler(i) } ...这里重点看几个参数:

  • requestChannel:SocketServer 中的请求通道对象。由于KafkaRequestHandlerPool是线程池对象,那么内部线程处理的请求来源在哪儿?请求恰恰是保存在 RequestChannel 中的请求队列requestQueue中,因此,Kafka 在构造 KafkaRequestHandlerPool实例时,必须关联 SocketServer 组件中的 RequestChannel 实例,让 I/O 线程能够找到请求被保存的地方。

  • apis:KafkaApis对象。IO线程会将拿到的请求对象Request交给KafkaApis去执行真正的逻辑处理。

  • numThreads:线程池中线程的数量。初始化线程池时要创建多少个线程由这个参数决定。而这个参数又是由broker端的参数num.io.threads决定的,默认为8。

除了这几个参数,在构造KafkaRequestHandlerPool实例时,会根据线程数创建KafkaRequestHandler线程,并将这些线程放入线程池管理线程的数组runnables中。

    除了主构造函数,再来看几个KafkaRequestHandlerPool的方法:

    a. createHandler(id:Int):创建线程的方法

  • 创建KafkaRequestHandler线程对象

  • 将线程对象放入runnables数组

  • 设置线程为守护线程并启动

def createHandler(id: Int): Unit = synchronized { //创建KafkaRequestHandler线程,并放入runnables数组 runnables += new KafkaRequestHandler(id, brokerId, aggregateIdleMeter, threadPoolSize, requestChannel, apis, time) //设置线程为守护线程,并启动 KafkaThread.daemon(logAndThreadNamePrefix + "-kafka-request-handler-" + id, runnables(id)).start()}

    b. resizeThreadPool(newSize: Int):根据给定值调整线程池的线程数量

  • 如果给定的值大于当前线程池中的线程数,则创建差值个新的线程,放入线程池并启动

  • 如果给定的值小于当前线程池中的线程数,则将多余的线程从线程池中移除,并关闭移除的线程

  • 更新线程池中线程的数量

def resizeThreadPool(newSize: Int): Unit = synchronized { val currentSize = threadPoolSize.get info(s"Resizing request handler thread pool size from $currentSize to $newSize") //如果给定的值大于当前线程池的容量,则创建差值个新的线程,放入线程池并启动 if (newSize > currentSize) { for (i createHandler(i) } //如果给定的值小于当前线程池容量,则将多余的线程从线程池中移除,并关闭 } else if (newSize

    2. 这里我们再看一下KafkaRequestHandler的主构造函数:

class KafkaRequestHandler(id: Int, //IO线程编号 brokerId: Int, val aggregateIdleMeter: Meter, val totalHandlerThreads: AtomicInteger,//线程池中线程数量 val requestChannel: RequestChannel,//请求处理的通道,里面包含了requestQueue apis: KafkaApis,//KafkaApis类,用于真正实现请求处理逻辑的类 time: Time) extends Runnable with Logging { this.logIdent = "[Kafka Request Handler " + id + " on Broker " + brokerId + "], "  //定义一个CountDownLatch对象,便于管理线程的执行 private val shutdownComplete = new CountDownLatch(1) @volatile private var stopped = false ...参数apis、requestChannel和中KafkaRequestHandlerPool的一样:

  • id:这里的id是线程池中的线程序号
  • totalHandlerThreads:线程池中的线程总数

既然KafkaRequestHandler是一个线程类,那么它的工作逻辑就在run()方法中:

def run() { //只要线程未关闭,就不断地进行循环  while (!stopped) { val startSelectTime = time.nanoseconds //从requestQueue中获取一个Request对象 val req = requestChannel.receiveRequest(300) val endTime = time.nanoseconds //统计线程的空闲时间 val idleTime = endTime - startSelectTime //更新线程空闲百分比指标 aggregateIdleMeter.mark(idleTime / totalHandlerThreads.get) //判断Request请求的类型 req match { //如果是关闭线程的请求 case RequestChannel.ShutdownRequest => debug(s"Kafka request handler $id on broker $brokerId received shut down command") //关闭线程 shutdownComplete.countDown() return //如果是普通请求 case request: RequestChannel.Request => try { request.requestDequeueTimeNanos = endTime trace(s"Kafka request handler $id on broker $brokerId handling request $request") //由KafkaApis.handle方法执行相应处理逻辑 apis.handle(request) } catch { //出现严重错误,立即关闭连接 case e: FatalExitError => shutdownComplete.countDown() Exit.exit(e.statusCode) //如果是普通异常,则记录日志 case e: Throwable => error("Exception when handling request", e) } finally { //释放Request占用的缓冲区资源 request.releaseBuffer() } case null => // 继续循环 } } shutdownComplete.countDown()}

整个run()方法的流程图如下:

db3d9f10aa7693383126cda0b16e279a.png

这里最重要的就是调用KafkaApis的handle方法处理请求:

//由KafkaApis.handle方法执行相应处理逻辑apis.handle(request)这个方法会根据不同的请求类型进行最终的逻辑处理,之后封装Response对象并返回给Processor线程的responseQueue队列。

    KafkaApis的处理逻辑较多,下一篇再进行分析。

总结:

请求队列中的Request对象的处理逻辑分以下几步:

  1. 服务端程序启动时,会创建KafkaRequestHandlerPool线程池对象
  2. 构建线程池对象时,根据num.io.threads参数创建KafkaRequestHandler线程,默认创建8个
  3. KafkaRequestHandler线程不断地从requestQueue请求队列中获取请求
  • 如果是普通请求,则调用KafkaApis的handle方法进行处理
  • 如果是关闭线程的请求,则关闭线程
更新线程空闲百分比指标参考资料:https://time.geekbang.org/column/article/233233



推荐阅读
  • 流处理中的计数挑战与解决方案
    本文探讨了在流处理中进行计数的各种技术和挑战,并基于作者在2016年圣何塞举行的Hadoop World大会上的演讲进行了深入分析。文章不仅介绍了传统批处理和Lambda架构的局限性,还详细探讨了流处理架构的优势及其在现代大数据应用中的重要作用。 ... [详细]
  • Flutter 核心技术与混合开发模式深入解析
    本文深入探讨了 Flutter 的核心技术,特别是其混合开发模式,包括统一管理模式和三端分离模式,以及混合栈原理。通过对比不同模式的优缺点,帮助开发者选择最适合项目的混合开发策略。 ... [详细]
  • 本文总结了近年来在实际项目中使用消息中间件的经验和常见问题,旨在为Java初学者和中级开发者提供实用的参考。文章详细介绍了消息中间件在分布式系统中的作用,以及如何通过消息中间件实现高可用性和可扩展性。 ... [详细]
  • 在处理大规模数据数组时,优化分页组件对于提高页面加载速度和用户体验至关重要。本文探讨了如何通过高效的分页策略,减少数据渲染的负担,提升应用性能。具体方法包括懒加载、虚拟滚动和数据预取等技术,这些技术能够显著降低内存占用和提升响应速度。通过实际案例分析,展示了这些优化措施的有效性和可行性。 ... [详细]
  • 基于Dubbo与Zipkin的微服务调用链路监控解决方案
    本文提出了一种基于Dubbo与Zipkin的微服务调用链路监控解决方案。通过抽象配置层,支持HTTP和Kafka两种数据上报方式,实现了灵活且高效的调用链路追踪。该方案不仅提升了系统的可维护性和扩展性,还为故障排查提供了强大的支持。 ... [详细]
  • 从理想主义者的内心深处萌发的技术信仰,推动了云原生技术在全球范围内的快速发展。本文将带你深入了解阿里巴巴在开源领域的贡献与成就。 ... [详细]
  • 2017年软件开发领域的七大变革
    随着技术的不断进步,2017年对软件开发人员而言将充满挑战与机遇。本文探讨了开发人员需要适应的七个关键变化,包括人工智能、聊天机器人、容器技术、应用程序版本控制、云测试环境、大众开发者崛起以及系统管理的云迁移。 ... [详细]
  • 大华股份2013届校园招聘软件算法类试题D卷
    一、填空题(共17题,每题3分,总共51分)1.设有inta5,*b,**c,执行语句c&b,b&a后,**c的值为________答:5 ... [详细]
  • 驱动程序的基本结构1、Windows驱动程序中重要的数据结构1.1、驱动对象(DRIVER_OBJECT)每个驱动程序会有唯一的驱动对象与之对应,并且这个驱动对象是在驱 ... [详细]
  • ABP框架是ASP.NET Boilerplate的简称,它不仅是一个开源且文档丰富的应用程序框架,还提供了一套基于领域驱动设计(DDD)的最佳实践架构模型。本文将详细介绍ABP框架的特点、项目结构及其在Web API优先架构中的应用。 ... [详细]
  • iOS snow animation
    CTSnowAnimationView.hCTMyCtripCreatedbyalexon1614.Copyright©2016年ctrip.Allrightsreserved.# ... [详细]
  • 使用Tkinter构建51Ape无损音乐爬虫UI
    本文介绍了如何使用Python的内置模块Tkinter来构建一个简单的用户界面,用于爬取51Ape网站上的无损音乐百度云链接。虽然Tkinter入门相对简单,但在实际开发过程中由于文档不足可能会带来一些不便。 ... [详细]
  • C语言编写线程池的简单实现方法
    2019独角兽企业重金招聘Python工程师标准好文章,一起分享——有时我们会需要大量线程来处理一些相互独立的任务,为了避免频繁的申请释放线程所带 ... [详细]
  • JUC(三):深入解析AQS
    本文详细介绍了Java并发工具包中的核心类AQS(AbstractQueuedSynchronizer),包括其基本概念、数据结构、源码分析及核心方法的实现。 ... [详细]
  • 未定义的打字稿记录:探索其成因与解决方案 ... [详细]
author-avatar
houxue
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有