热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Flink遇到KafkaFlinkKafkaConsumer使用详解

Flink是新一代的流处理计算引擎。通过轻量级的checkpoint,Flink可以在高吞吐量的情况下保证exactly-once(这需要数据源能够提供回溯消费的能力

Flink是新一代的流处理计算引擎。通过轻量级的checkpoint,Flink可以在高吞吐量的情况下保证exactly-once(这需要数据源能够提供回溯消费的能力)。Flink支持众多的source(从中读取数据)和sink(向其写入数据),列表如下:

在这里插入图片描述

Kafka作为目前非常流行的消息中间件,它不仅能够提供极大的吞吐量,还能够配合Flink在消费端达到exactly-once。

本文将详细介绍如何配置Flink读取Kafka,运行机制和exactly-once是如何保证的,最后,还会给出监控Flink消费Kafka的方案。(注: 本文的使用的是Flink 1.3.1-release和 Kafka 0.8)

Flink 是通过Connector与具体的source 和 sink进行通信的,具体到Kafka 0.8,相应的Connector是 FlinkKafkaConsumer08和FlinkKafkaProducer08。

我们首先介绍FlinkKafkaConsumer08的配置:

一、 Kafka Consumer的配置

FlinkKafkaConsumer08可以消费一个或多个Kafka topic的数据,它的构造器需要接收以下参数:

  1. topic名或 topic名的列表

  2. 反序列化约束,以便于Flink决定如何反序列化从Kafka获得的数据

  3. Kafka consumer的属性配置,下面两个属性配置是必须的:

· “zookeeper.connect” (Zookeeper servers的地址列表,以逗号分隔)

· “group.id” (consumer group)

· “bootstrap.servers” (Kafka brokers的地址列表,以逗号分隔)

示例代码:
在这里插入图片描述

以下几个参数是需要我们重点关注的。

(一) 反序列化shema

Flink Kafka Consumer 需要知道如何将来自Kafka的二进制数据转换为Java/Scala对象。DeserializationSchema接口允许程序员指定这个序列化的实现。该接口的 T deserialize(byte[]message) 会在收到每一条Kafka的消息的时候被调用。

我们通常会实现 AbstractDeserializationSchema,它可以描述被序列化的Java/Scala类型到Flink的类型(TypeInformation)的映射。如果用户的代码实现了DeserializationSchema,那么就需要自己实现getProducedType(…) 方法。

为了方便使用,Flink提供了一些已实现的schema:

  1. TypeInformationSerializationSchema (andTypeInformationKeyValueSerializationSchema) ,他们会基于Flink的TypeInformation来创建schema。这对于那些从Flink写入,又从Flink读出的数据是很有用的。这种Flink-specific的反序列化会比其他通用的序列化方式带来更高的性能。

  2. JsonDeserializationSchema (andJSONKeyValueDeserializationSchema) 可以把序列化后的Json反序列化成ObjectNode,ObjectNode可以通过objectNode.get(“field”).as(Int/String/…)() 来访问指定的字段。

  3. SimpleStringSchema可以将消息反序列化为字符串。当我们接收到消息并且反序列化失败的时候,会出现以下两种情况: 1) Flink从deserialize(…)方法中抛出异常,这会导致job的失败,然后job会重启;2) 在deserialize(…) 方法出现失败的时候返回null,这会让Flink Kafka consumer默默的忽略这条消息。请注意,如果配置了checkpoint 为enable,由于consumer的失败容忍机制,失败的消息会被继续消费,因此还会继续失败,这就会导致job被不断自动重启。

(二) Kafka Consumers 起始offset配置

FlinkKafkaConsumer 允许我们配置Kafka partition被消费的offset的起始位,示例代码如下:

在这里插入图片描述

所有版本的Flink KafkaConsumer都支持以上的配置,下面对这些配置进行详细的说明:

setStartFromGroupOffsets(默认):采用consumer group的offset来作为起始位,这个offset从Kafka brokers(0.9以上版本) 或 Zookeeper(Kafka 0.8)中获取。如果从Kafka brokers或者Zookeeper中找不到这个consumer group对应的partition的offset,那么auto.offset.reset这个配置就会被启用。

setStartFromEarliest() /setStartFromLatest(): 即从最早的/最新的消息开始消费。

当然,也可以指定具体的某个offset作为某个partition的起始消费位置:
在这里插入图片描述

上述的代码配置了myTopic的partition 0,1,2在被Flink job消费的起始位置。假设myTopic总共有5个partition,那么剩下的两个partition没有被配置具体的offset的起始位,所以Flink会对这两个partition的采用默认的offset起始位的配置(setStartFromGroupOffsets)。

注意,如果你在这个job中配置了enableCheckpointing() 或者从某个savepoint来启动这个job,那么起始位会优先从savepoint或者checkpoint中获取。

(三) 容错机制

当Flink的job开启了checkpoint的时候,Flink会一边消费topic的数据,一边定时的将offset和其他operator的状态记录到checkpoint中。如果遇到了job失败的情况,那么Flink将会重启job,从最后一个checkpoint中来恢复job的所有状态,然后从checkpoint中记录的offset开始重新对Kafka 的topic进行消费。记录offset的间隔决定了程序在失败的情况下需要回溯的最大程度。

为了使用Flink Kafkaconsumer的容错机制,我们需要在程序中作如下的配置:

在这里插入图片描述

还有一点需要注意的是,Flink只有在task slot的数量足够的情况下才可以成功的重启job,所以如果job是因为TaskManager down掉(或者无法连接到集群)导致task slot不足而失败,那么必须要恢复增加足够的task slot才能让job重启。而Flink on YARN 支持自动的重启丢失的YARN containers。

(四) offset提交行为的配置

Flink KafkaConsumer允许配置向 Kafka brokers(或者向Zookeeper)提交offset的行为。需要注意的是,Flink Kafka Consumer并不依赖于这些提交回Kafka或Zookeeper的offset来保证容错。这些被提交的offset只是意味着Flink将消费的状态暴露在外以便于监控。

Checkpointingdisabled: 此时, Flink Kafka Consumer依赖于它使用的具体的Kafka client的自动定期提交offset的行为,相应的设置是 Kafka properties中的 enable.auto.commit (或者 auto.commit.enable 对于Kafka 0.8) 以及 auto.commit.interval.ms。

Checkpointingenabled: 在这种情况下,Flink Kafka Consumer会将offset存到checkpoint中当checkpoint 处于completed的状态时。这保证了在Kafka brokers中的committed offset和checkpointed states中的offset保持一致。通过调用setCommitOffsetOnCheckpoints(boolean)来调整 offset自动提交是否开启(默认情况下是true,即开启自动提交)。请注意,在这种情况下,配置在properties 中的offset的定时自动提交行为将会被忽略。

二、Flink Kafka Consumer的运行机制
在这里插入图片描述

上图简要概括了FlinkKafkaConsumer08的运行机制。每个消费Kafka source的operator的subTask线程都持有一个FlinkKafkaConsumer08实例,这个实例负责分配这个subTask线程消费的topic的具体的partition,以及从checkpoint中恢复partition应该消费的起始offset。

Kafka08Fetcher负责和Kafka brokers通信,获取具体各个partition的leader,每个FlinkKafkaConsumer08都拥有一个Kafka08Fetcher。每个Kafka08Fetcher拥有一个或多个SimpleConsumerThread,SimpleConsumerThread负责从partition的leader中拉取数据,并将其反序列化,最后发送给下一级的operator,注意在SimpleConsumerThread中使用的是Kafka的低级API,这是因为它需要灵活的控制从某个具体的offset进行消费。

(一) 生命周期

下面,我们罗列出FlinkKafkaConsumer08、Kafka08Fetcher和SimpleConsumerThread生命周期的几个关键点(关于每个Flink StreamTask的生命周期,可以参考:https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/task_lifecycle.html),以弄清楚FlinkKafkaConsumer08是如何恢复offset 以及保证exactly-once的。

1.FlinkKafkaConsumer08:

  1. initializeState():从最后一个成功的checkpoint中获取各个partition的offset到restoredState中。

  2. open():从restoredState中获取这个subTask所消费的topic的partition的起始offset,保存到subscribedPartitionsToStartOffsets中;如果这是一个第一次向topic消费的job的subTask,那么Flink根据job的并行度以及这个subTask的index均匀的分配partition给这个subTask消费。此时,partition的起始offset就由我们在上文中介绍的配置来决定。

  3. run(): 如果subscribedPartitionsToStartOffsets不为空,创建Kafka08Fetcher,执行其runFetchLoop()。

  4. close(): job被cancel或者出现了异常,那么调用close()方法,close方法会调用Fetcher的cancel()方法,FlinkKafkaConsumer08所在的subTask结束。


  1. Kafka08Fetcher:

Kafka08Fetcher从FlinkKafkaConsumer08中获取它要消费的partition加入到它的unassignedPartitionsQueue中。然后在它的runFetchLoop()中,对这个Queue中的partition进行消费。下面主要罗列runFetchLoop()中的主要细节。

  1. 首先创建brokerToThread:Map,这个map会把broker和连接到这个broker的线程映射起来。创建zookeeperOffsetHandler,根据配置的起始的offset行为从zookeeper或Kafka中获取没有从checkpoint中恢复的partition的offset。然后创建PeriodicOffsetCommitter线程周期性的向Zookeeper提交offset。

  2. 向Flink内部(JMX)注册MetricGroup,名为"KafkaConsumer",我们可以利用这个MetricGroup来对FlinkKafakConsumer08的消费状况进行监控。

  3. while(running) 循环:获取unassignedPartitionsQueue中应该被消费的partition,通过低级api寻找他们的Leader。然后从brokerToThread中寻找这个leader对应的SimpleConsumerThread是否被创建了,如果没有被创建,那么创建并运行一个SimpleConsumerThread,并更新brokerToThread;如果已经被创建,并且没有新的partition加入,那么重复这个while循环。否则,终止这个leader对应的SimpleConsumerThread,并创建新的SimpleConsumerThread然后继续消费。

  4. cancel():如果发生了异常或者job被手动cancel,关闭zookeeperOffsetHandler、periodicCommitter和brokerToThread中的所有线程。

3.SimpleConsumerThread:

  1. 创建SimpleConsumer(Kafka低级api),然后再一次根据配置的起始offset行为去Kafka中获取没有从checkpoint和Zookeeper中恢复的partition的offset。

  2. while(running) 循环,我们跳过errorhandle,直接展示最核心的代码:
    在这里插入图片描述

其中valueBytes和keyBytes均为从kafka中读取到的消息,他们经过反序列化后交给owner发送消息给下一级的operator。

在这里插入图片描述

这里的owner就是Kafka08Fetcher,它除了会将消息发送给下一级的operator外,还会记录这个partition的state(即offset),Flink异步的将这个state记录到checkpoint中。

这里需要注意的是,Flink异步记录checkpoint的行为是由我们的来配置的,只有当我们设置了enableCheckpointing()时,Flink才会在checkpoint完成时(整个job的所有的operator都收到了这个checkpoint的barrier才意味这checkpoint完成,具体参考我们对Flink checkpoint的介绍)将offset记录起来并提交,这时候才能够保证exactly-once。

  1. 如果线程被终止,那么关闭SimpleConsumer。

(二) 容错机制

在发生错误的情况下,Flink会如何处理呢?在finally块中记录最后消费到的offset再向JobManager提交checkpoint吗?在通常情况下,比如发生了手动cancel或者userCode的异常时,这么做没有问题。可是如果是因为其他原因(如Full GC)使得TaskManagerhung住了,甚至是机器挂了,那么这个时候就不能通过finally 块来保证exactly-once了。Flink依赖的是带barrier的checkpointing机制来解决容错的问题。

我们通过下面一副图来简述这种机制:

在这里插入图片描述

barrier可以理解为checkpoint之间的分隔符,在它之前的data属于前一个checkpoint,而在它之后的data属于另一个checkpoint。同时,barrier会由source(如FlinkKafkaConsumer)发起,并混在数据中,同数据一样传输给下一级的operator,直到sink为止。假设我们的Streaming Job只有一个source、一个map operator 以及一个sink,属于barrier所分隔的checkpoint 的数据已经被处理完毕并sink,而barrier还处于source和map operator之间,barrier 还处于map和sink之间。由于barrier已经被sink收到,那么说明checkpoint已经完成了(这个checkpoint的状态为completed并被存到了state backend中),它之前的数据已经被处理完毕并sink。

但是由于sink还没有收到barrier,那么所有之前之后的数据都会被缓存在sink的Input Buffer中,也就是说这部分数据虽然已经经过source消费并经过map处理了,但是还是没有写入目的地。所以如果Job在这个时候失败了,最后一个成功committed的checkpoint是checkpoint,所以FlinkKafkaConsumer从checkpoint中恢复出相应的partitionoffset就可以了。

我们注意到,虽然之后的部分数据和之后的所有数据虽然已经被source消费,但是都没有被sink,这部分数据会被FlinkKafkaConsumer“重复”消费,我们并没有丢失任何的数据也没有重复写入任何数据,保证了exactly-once。

小节:

  1. 在配置了checkpointingenable的情况下,FlinkKafkaConsumer08在开始消费数据之前,会优先从checkpoint中恢复出被消费的partition的offset,如果没有从checkpoint中恢复某些partition的offset,它会从Zookeeper中恢复,若从Zookeeper中仍然没有恢复,它会根据配置的offset起始行为来配置起始offset。

2.FlinkKafkaConsumer08通过Kafka的低级API和Flink带barrier的轻量级checkpoint机制保证了在高吞吐量的情况下的exactly-once。

三、监控

我们在前文提到,Flink会定时的将offset提交到Zookeeper中,但是提交到Zookeeper的offset并不是实时的offset。官方更为推荐从Flink注册的Metric来监控Flink的消费情况。
在这里插入图片描述

上面的两幅图片显示了analytics_package_standard这个kafka topic的 partition-0在Flink中的当前offset,这个offset我们可以通过jconsole、jvisualvm等工具查看,也可以直接通过Flink TaskManager开启的JMX 端口获取。如果我们要监控消费淤积,再从Kafka中获取相应partition的latestoffset即可。我们可以在配置文件flink-conf.yaml中配置Flink的JMX端口:

Flink 1.3中,还增加了许多有用的监控,比如总的消息条数,消息的瞬时读取速度,latency等等,我们可以在Flink的Web UI的Task Metric中查看这些监控指标:

在这里插入图片描述

结论

FlinkKafkaConsumer提供了一套健壮的机制保证了在高吞吐量的情况下exactly-once的消费Kafka的数据,它的API的使用与配置也比较简单,同时也便于监控。

转发链接:https://www.sohu.com/a/168546400_617676


推荐阅读
  • Kafka 是由 Apache 软件基金会开发的高性能分布式消息系统,支持高吞吐量的发布和订阅功能,主要使用 Scala 和 Java 编写。本文将深入解析 Kafka 的安装与配置过程,为程序员提供详尽的操作指南,涵盖从环境准备到集群搭建的每一个关键步骤。 ... [详细]
  • 阿里巴巴终面技术挑战:如何利用 UDP 实现 TCP 功能?
    在阿里巴巴的技术面试中,技术总监曾提出一道关于如何利用 UDP 实现 TCP 功能的问题。当时回答得不够理想,因此事后进行了详细总结。通过与总监的进一步交流,了解到这是一道常见的阿里面试题。面试官的主要目的是考察应聘者对 UDP 和 TCP 在原理上的差异的理解,以及如何通过 UDP 实现类似 TCP 的可靠传输机制。 ... [详细]
  • 在JavaWeb开发中,文件上传是一个常见的需求。无论是通过表单还是其他方式上传文件,都必须使用POST请求。前端部分通常采用HTML表单来实现文件选择和提交功能。后端则利用Apache Commons FileUpload库来处理上传的文件,该库提供了强大的文件解析和存储能力,能够高效地处理各种文件类型。此外,为了提高系统的安全性和稳定性,还需要对上传文件的大小、格式等进行严格的校验和限制。 ... [详细]
  • MySQL Decimal 类型的最大值解析及其在数据处理中的应用艺术
    在关系型数据库中,表的设计与SQL语句的编写对性能的影响至关重要,甚至可占到90%以上。本文将重点探讨MySQL中Decimal类型的最大值及其在数据处理中的应用技巧,通过实例分析和优化建议,帮助读者深入理解并掌握这一重要知识点。 ... [详细]
  • ### 优化后的摘要本学习指南旨在帮助读者全面掌握 Bootstrap 前端框架的核心知识点与实战技巧。内容涵盖基础入门、核心功能和高级应用。第一章通过一个简单的“Hello World”示例,介绍 Bootstrap 的基本用法和快速上手方法。第二章深入探讨 Bootstrap 与 JSP 集成的细节,揭示两者结合的优势和应用场景。第三章则进一步讲解 Bootstrap 的高级特性,如响应式设计和组件定制,为开发者提供全方位的技术支持。 ... [详细]
  • Web开发框架概览:Java与JavaScript技术及框架综述
    Web开发涉及服务器端和客户端的协同工作。在服务器端,Java是一种优秀的编程语言,适用于构建各种功能模块,如通过Servlet实现特定服务。客户端则主要依赖HTML进行内容展示,同时借助JavaScript增强交互性和动态效果。此外,现代Web开发还广泛使用各种框架和库,如Spring Boot、React和Vue.js,以提高开发效率和应用性能。 ... [详细]
  • 在当今的软件开发领域,分布式技术已成为程序员不可或缺的核心技能之一,尤其在面试中更是考察的重点。无论是小微企业还是大型企业,掌握分布式技术对于提升工作效率和解决实际问题都至关重要。本周的Java架构师实战训练营中,我们深入探讨了Kafka这一高效的分布式消息系统,它不仅支持发布订阅模式,还能在高并发场景下保持高性能和高可靠性。通过实际案例和代码演练,学员们对Kafka的应用有了更加深刻的理解。 ... [详细]
  • 本文介绍了如何利用ObjectMapper实现JSON与JavaBean之间的高效转换。ObjectMapper是Jackson库的核心组件,能够便捷地将Java对象序列化为JSON格式,并支持从JSON、XML以及文件等多种数据源反序列化为Java对象。此外,还探讨了在实际应用中如何优化转换性能,以提升系统整体效率。 ... [详细]
  • 基于Dubbo与Zipkin的微服务调用链路监控解决方案
    本文提出了一种基于Dubbo与Zipkin的微服务调用链路监控解决方案。通过抽象配置层,支持HTTP和Kafka两种数据上报方式,实现了灵活且高效的调用链路追踪。该方案不仅提升了系统的可维护性和扩展性,还为故障排查提供了强大的支持。 ... [详细]
  • 第二章:Kafka基础入门与核心概念解析
    本章节主要介绍了Kafka的基本概念及其核心特性。Kafka是一种分布式消息发布和订阅系统,以其卓越的性能和高吞吐量而著称。最初,Kafka被设计用于LinkedIn的活动流和运营数据处理,旨在高效地管理和传输大规模的数据流。这些数据主要包括用户活动记录、系统日志和其他实时信息。通过深入解析Kafka的设计原理和应用场景,读者将能够更好地理解其在现代大数据架构中的重要地位。 ... [详细]
  • HBase Java API 进阶:过滤器详解与应用实例
    本文详细探讨了HBase 1.2.6版本中Java API的高级应用,重点介绍了过滤器的使用方法和实际案例。首先,文章对几种常见的HBase过滤器进行了概述,包括列前缀过滤器(ColumnPrefixFilter)和时间戳过滤器(TimestampsFilter)。此外,还详细讲解了分页过滤器(PageFilter)的实现原理及其在大数据查询中的应用场景。通过具体的代码示例,读者可以更好地理解和掌握这些过滤器的使用技巧,从而提高数据处理的效率和灵活性。 ... [详细]
  • Hadoop 2.6 主要由 HDFS 和 YARN 两大部分组成,其中 YARN 包含了运行在 ResourceManager 的 JVM 中的组件以及在 NodeManager 中运行的部分。本文深入探讨了 Hadoop 2.6 日志文件的解析方法,并详细介绍了 MapReduce 日志管理的最佳实践,旨在帮助用户更好地理解和优化日志处理流程,提高系统运维效率。 ... [详细]
  • Spring框架入门指南:专为新手打造的详细学习笔记
    Spring框架是Java Web开发中广泛应用的轻量级应用框架,以其卓越的功能和出色的性能赢得了广大开发者的青睐。本文为初学者提供了详尽的学习指南,涵盖基础概念、核心组件及实际应用案例,帮助新手快速掌握Spring框架的核心技术与实践技巧。 ... [详细]
  • Java中高级工程师面试必备:JVM核心知识点全面解析
    对于软件开发人员而言,随着技术框架的不断演进和成熟,许多高级功能已经被高度封装,使得初级开发者只需掌握基本用法即可迅速完成项目。然而,对于中高级工程师而言,深入了解Java虚拟机(JVM)的核心知识点是必不可少的。这不仅有助于优化性能和解决复杂问题,还能在面试中脱颖而出。本文将全面解析JVM的关键概念和技术细节,帮助读者全面提升技术水平。 ... [详细]
  • Spring框架中的面向切面编程(AOP)技术详解
    面向切面编程(AOP)是Spring框架中的关键技术之一,它通过将横切关注点从业务逻辑中分离出来,实现了代码的模块化和重用。AOP的核心思想是将程序运行过程中需要多次处理的功能(如日志记录、事务管理等)封装成独立的模块,即切面,并在特定的连接点(如方法调用)动态地应用这些切面。这种方式不仅提高了代码的可维护性和可读性,还简化了业务逻辑的实现。Spring AOP利用代理机制,在不修改原有代码的基础上,实现了对目标对象的增强。 ... [详细]
author-avatar
Jump_jiedB0_666
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有