热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

kafka消费者心跳线程

消息中间件是每个做后台同学必须要掌握的一类框架,这主要取决于其广泛应用于互联网项目。消息

消息中间件是每个做后台同学必须要掌握的一类框架,这主要取决于其广泛应用于互联网项目。消息中间件在这些系统中扮演着很重要的角色,它们的主要作用是消息异步,系统解耦,高并发削峰,分布式事务等等。目前主要的消息中间件有rabbitMQkafkarocketMQActiveMQ等,本系列文章总结的是kafka,也算是当前市面上比较流行的消息中间件,后续的文章会从kafka的生产者、消费者、broker等来总结。除了在实际应用中,消息中间件是一个常用的框架,在面试中,消息中间件也是必问内容。由于个人能力有限,文中难免有理解不到位的地方,还请留言指导,在此谢过。本系列文章kafka版本使用最新的2.8.0

 

HeartbeatThread分析

Coordinator中最核心的内容是之前提到的重平衡过程,如果对之前的文章还有印象的话可以知道,重平衡发生的原因之一是心跳超时。消费者心跳的作用是协调者用来检查该消费者是否在正常,如果不正常的话,可以立马替换消费者来消费该分区。那么本文主要介绍消费者心跳线程(HeartbeatThread)的执行逻辑和相关源码操作。

 

启动心跳线程

HeartbeatThread是AbstractCoordinator的一个内部类,继承了kafkaThread,所以本质上是一个线程。从之前的文章中我们有提到,HeartbeatThread的创建是在第一次poll的时候创建的,并不是在消费者初始化的时候创建,这是因为消费者需要找到对应的协调者才能发送心跳线程,创建函数的调用方是AbstractCoordinator类中的ensureActiveGroup函数。

    boolean ensureActiveGroup(final Timer timer) {
    //...
    //启动心跳
    startHeartbeatThreadIfNeeded();
    return joinGroupIfNeeded(timer);
    }
    //如果不存在心跳,创建。所以这里是加锁进行判断的,保证线程安全
    private synchronized void startHeartbeatThreadIfNeeded() {
    if (heartbeatThread == null) {
    heartbeatThread = new HeartbeatThread();
    heartbeatThread.start();
    }
    }

     

    心跳逻辑

    心跳线程启动之后,在每次消费者poll的时候,就会唤醒心跳线程进行心跳的发送,调用函数是pollHeartbeat。心跳本身是一个线程,其核心在run函数的实现。现在我们看下这个类的实现。

    HeartbeatThread类有三个成员变量,

      private class HeartbeatThread extends KafkaThread implements AutoCloseable {
      //是否允许心跳标志位,写入要加锁
      private boolean enabled = false;
      //是否关闭标志位,写入要加锁
      private boolean closed = false;
      //心跳失败的原因,初始化为null,使用原子整型
      private final AtomicReference failed = new AtomicReference<>(null);
      }

      了解完这几个变量的作用之后,我们看下run的实现逻辑:

       

      从上面的逻辑中可以看到,主要的实现是判断心跳异常处理,如果不是关闭的话,都会重新进入循环体,只有在关闭的情况下才会退出循环。下面看下具体的代码实现。

        public void run() {
        //...
        //主循环体,心跳线程是一个死循环
        while (true) {
        synchronized (AbstractCoordinator.this) {
        //是否关闭
        if (closed)
        return;
        //是否禁止
        if (!enabled) {
        AbstractCoordinator.this.wait();
        continue;
        }
        //状态不对或者心跳失败
        if (state.hasNotJoinedGroup() || hasFailed()) {
        disable();
        continue;
        }
        //唤醒客服端
        client.pollNoWakeup();
        long now = time.milliseconds();
        //协调者未知
        if (coordinatorUnknown()) {
        if (findCoordinatorFuture != null) {
        clearFindCoordinatorFuture();
        AbstractCoordinator.this.wait(rebalanceConfig.retryBackoffMs);
        } else {
        lookupCoordinator();
        }
        //获取服务端心跳响应超时,置为协调者未知,等待下一轮
        } else if (heartbeat.sessionTimeoutExpired(now)) {
        markCoordinatorUnknown("session timed out without receiving a "
        + "heartbeat response");
        //心跳超时,处理上次消息的时间过长,导致长时间没有poll
        } else if (heartbeat.pollTimeoutExpired(now)) {
        //发送离开请求,这个会导致reblance
        maybeLeaveGroup(leaveReason);
        //是否到了心跳时间
        } else if (!heartbeat.shouldHeartbeat(now)) {
        //阻塞到下一次重试
        AbstractCoordinator.this.wait(rebalanceConfig.retryBackoffMs);
        } else {
        //设置心跳的时间为当前时间,这个是更新本地心跳的时间
        heartbeat.sentHeartbeat(now);
        //发送心跳请求并获取异步future对象
        final RequestFuture<Void> heartbeatFuture = sendHeartbeatRequest();
        //添加监听器
        heartbeatFuture.addListener(new RequestFutureListener<Void>() {
        @Override
        //心跳成功
        public void onSuccess(Void value) {
        synchronized (AbstractCoordinator.this) {
        heartbeat.receiveHeartbeat();
        }
        }

        @Override
        public void onFailure(RuntimeException e) {
        synchronized (AbstractCoordinator.this) {
        //正在reblance当做正常响应
        if (e instanceof RebalanceInProgressException) {
        heartbeat.receiveHeartbeat();
        } else if (e instanceof FencedInstanceIdException) {
        heartbeatThread.failed.set(e);
        } else {
        heartbeat.failHeartbeat();
        // 唤醒线程,等待下一轮
        AbstractCoordinator.this.notify();
        }
        }
        }
        });
        }
        }
        }
        }
        //...
        //异常处理逻辑
        }

         

        心跳发送和响应

        在上面的心跳逻辑中,心跳的发送函数是sendHeartbeatRequest,在这里面会组装心跳消息并发送心跳,然后组装响应的结果。下面看下这段逻辑的实现:

        sendHeartbeatRequest:发送消息

          synchronized RequestFuture sendHeartbeatRequest() {
          //组装心跳消息,包含消费者组信息,消费者信息
          HeartbeatRequest.Builder requestBuilder =
          new HeartbeatRequest.Builder(new HeartbeatRequestData()
          .setGroupId(rebalanceConfig.groupId)
          .setMemberId(this.generation.memberId)
          .setGroupInstanceId(this.rebalanceConfig.groupInstanceId.orElse(null))
          .setGenerationId(this.generation.generationId));
          //发送消息
          return client.send(coordinator, requestBuilder)
          .compose(new HeartbeatResponseHandler(generation));
          }
          处理结果是用一个新的类HeartbeatResponseHandler来实现,现在可以看下这个类的handle方法:
          public void handle(HeartbeatResponse heartbeatResponse, RequestFuture future) {
          //处理的异常
          Errors error = heartbeatResponse.error();
          //没有异常,future成功返回
          if (error == Errors.NONE) {
          future.complete(null);
          //协调者未知
          } else if (error == Errors.COORDINATOR_NOT_AVAILABLE
          || error == Errors.NOT_COORDINATOR) {
          markCoordinatorUnknown(error);
          future.raise(error);
          //正在reblance
          } else if (error == Errors.REBALANCE_IN_PROGRESS) {
          if (state == MemberState.STABLE) {
          //如果是稳定状态,请求重新加入
          requestRejoin();
          future.raise(error);
          } else {
          //如果不是stable状态,忽视
          future.complete(null);
          }
          //leader版本已经替换
          } else if (error == Errors.ILLEGAL_GENERATION ||
          error == Errors.UNKNOWN_MEMBER_ID ||
          error == Errors.FENCED_INSTANCE_ID) {
          if (generationUnchanged()) {
          resetGenerationOnResponseError(ApiKeys.HEARTBEAT, error);
          future.raise(error);
          } else {
          future.complete(null);
          }
          //没权限
          } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
          future.raise(GroupAuthorizationException.forGroupId(rebalanceConfig.groupId));
          } else {
          //其他异常
          future.raise(new KafkaException("Unexpected error in heartbeat response: " + error.message()));
          }
          }

          在这里本身就是一个对异常的处理类,服务器返回的不同异常要分类返回给调用者,调用者根据不同的异常类型判断是否重试等操作。在实际开发项目中,也是要求对所有的异常要进行分类,这样方便针对不同的异常做不同的逻辑处理

           

          本文主要介绍了消费者源码中重要的一个环境--心跳线程。心跳是kafka用来管理消费者是否状态ok的情况。在这里一个很重要的问题就是如果我们拉取的消息过多,导致处理消息的时间过长,那消费者就会过久时间没有进行poll,这样就会导致上面的消费者离开集群并发生reblance的过程。这个在实际中还是有可能会出现的,出现这种情况的话,一个可能是消息有问题,处理的线程挂了,这种情况下我们需要将这条消息直接过滤掉;另一个就是消息确实需要处理很久的时间,这种情况下,我们需要设置更长的消息处理时间保证心跳check是ok的,如果最长的时间还是不够的话,我们可能需要用一个异步线程进行消息处理。心跳是一个很重要的内容,我们需要重点了解,不然在实际中可能会出现较多棘手的问题。

          本文的内容就这么多,如果你觉得对你的学习和面试有些帮助,帮忙点个赞或者转发一下哈,谢谢。

           

           

           

           

           

           



          推荐阅读
          • Nginx入门指南:从零开始掌握基础配置与优化技巧
            Nginx入门指南:从零开始掌握基础配置与优化技巧 ... [详细]
          • 本文提供了 RabbitMQ 3.7 的快速上手指南,详细介绍了环境搭建、生产者和消费者的配置与使用。通过官方教程的指引,读者可以轻松完成初步测试和实践,快速掌握 RabbitMQ 的核心功能和基本操作。 ... [详细]
          • 深入解析Tomcat:开发者的实用指南
            深入解析Tomcat:开发者的实用指南 ... [详细]
          • 如何在Java中高效构建WebService
            本文介绍了如何利用XFire框架在Java中高效构建WebService。XFire是一个轻量级、高性能的Java SOAP框架,能够简化WebService的开发流程。通过结合MyEclipse集成开发环境,开发者可以更便捷地进行项目配置和代码编写,从而提高开发效率。此外,文章还详细探讨了XFire的关键特性和最佳实践,为读者提供了实用的参考。 ... [详细]
          • 本课程详细解析了Spring AOP的核心概念及其增强机制,涵盖前置增强、后置增强和环绕增强等类型。通过具体示例,深入探讨了如何在实际开发中有效运用这些增强技术,以提升代码的模块化和可维护性。此外,还介绍了Spring AOP在异常处理和性能监控等场景中的应用,帮助开发者更好地理解和掌握这一强大工具。 ... [详细]
          • 本文介绍了一种专为清洁工人设计的自定义文本烟花效果。通过该功能,用户可以输入特定的感谢或祝福语句,系统将生成绚丽的烟花动画,以表达对清洁工人的敬意和感激之情。该特效不仅美观,还能增强用户的互动体验,提升公共场合的氛围。 ... [详细]
          • IIS 7及7.5版本中应用程序池的最佳配置策略与实践
            在IIS 7及7.5版本中,优化应用程序池的配置是提升Web站点性能的关键步骤。具体操作包括:首先定位到目标Web站点的应用程序池,然后通过“应用程序池”菜单找到对应的池,右键选择“高级设置”。在一般优化方案中,建议调整以下几个关键参数:1. **基本设置**: - **队列长度**:默认值为1000,可根据实际需求调整队列长度,以提高处理请求的能力。此外,还可以进一步优化其他参数,如处理器使用限制、回收策略等,以确保应用程序池的高效运行。这些优化措施有助于提升系统的稳定性和响应速度。 ... [详细]
          • 初次接触AJAX是在去年,当时主要是通过手动编写客户端代码来实现,还需处理被请求的页面,过程相当繁琐。尽管之前就听说过AJAX.NET,但一直没有机会深入了解。本文将作为初学者的指南,详细介绍AJAX.NET的基本概念、核心功能及其在实际项目中的应用技巧,帮助读者快速上手并掌握这一强大的开发工具。 ... [详细]
          • 使用PyQt5与OpenCV实现电脑摄像头的图像捕捉功能
            本文介绍了如何使用Python中的PyQt5和OpenCV库来实现电脑摄像头的图像捕捉功能。通过结合这两个强大的工具,用户可以轻松地打开摄像头并进行实时图像采集和处理。代码示例展示了如何初始化摄像头、捕获图像并将其显示在PyQt5的图形界面中。此外,还提供了详细的步骤说明和代码注释,帮助开发者快速上手并实现相关功能。 ... [详细]
          • 【VMware vSAN 6.6】1.1.企业级超融合基础设施存储方案:提供全面的软硬件集成支持
            ### 摘要VMware vSAN 6.6 提供了一种全面的企业级超融合基础设施(HCI)存储解决方案,支持广泛的软硬件集成。该方案通过在 vSphere Hypervisor 中内置存储功能,实现了高效的数据管理和资源利用。vSAN 6.6 的架构设计包括带有本地存储的服务器,以及优化的存储控制器虚拟系统,有效克服了传统存储系统的局限性,为企业提供了灵活、可靠的存储环境。 ... [详细]
          • 深入解析 org.hibernate.event.spi.EventSource.getFactory() 方法及其应用实例 ... [详细]
          • 在 Golang 应用中,频繁出现的 TIME_WAIT 和 ESTABLISHED 状态可能会导致性能瓶颈。本文探讨了这些状态产生的原因,并提出了优化与解决策略。通过调整内核参数、优化连接管理和使用连接池技术,可以有效减少 TIME_WAIT 的数量,提高应用的并发处理能力。同时,对于 ESTABLISHED 状态,可以通过合理的超时设置和错误处理机制,确保连接的高效利用和快速释放。 ... [详细]
          • 在Linux系统中Nginx环境下SSL证书的安装步骤与WordPress CDN的高级配置指南
            在Linux系统中,Nginx环境下安装SSL证书的具体步骤及WordPress CDN的高级配置指南。首先,安装SSL证书需要准备两个关键配置文件,并建议在操作前备份相关服务器配置文件,以确保数据安全。随后,本文将详细介绍如何在Nginx中正确配置SSL证书,以及如何优化WordPress的CDN设置,提升网站性能和安全性。 ... [详细]
          • 在 CentOS 7 上部署和配置 RabbitMQ 消息队列系统时,首先需要安装 Erlang,因为 RabbitMQ 是基于 Erlang 语言开发的。具体步骤包括:安装必要的依赖项,下载 Erlang 源码包(可能需要一些时间,请耐心等待),解压源码包,解决可能出现的错误,验证安装是否成功,并将 Erlang 添加到环境变量中。接下来,下载 RabbitMQ 的 tar.xz 压缩包,并进行解压和安装。确保每一步都按顺序执行,以保证系统的稳定性和可靠性。 ... [详细]
          • 2017-09-07前端日报精选JavaScriptEventLoop机制详解与Vue.js中实践应用Redux基础与实践如何用js获取虚拟键盘高度?( ... [详细]
          author-avatar
          min_xie_964
          这个家伙很懒,什么也没留下!
          PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
          Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有