热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Kafka版本不兼容引发TaskExecutor启动故障分析与解决

在本地环境中部署了两个不同版本的Flink集群,分别为1.9.1和1.9.2。近期在尝试启动1.9.1版本的Flink任务时,遇到了TaskExecutor启动失败的问题。尽管TaskManager日志显示正常,但任务仍无法成功启动。经过详细分析,发现该问题是由Kafka版本不兼容引起的。通过调整Kafka客户端配置并升级相关依赖,最终成功解决了这一故障。

问题描述

本地部署过两套flink环境,分别是1.9.1和1.9.2版本。最近在1.9.1版本下启动flink任务时,taskmanager日志正常,但taskexecutor疯狂打印日志,如下

2021-01-23 09:38:41,743 INFO  org.apache.kafka.common.utils.AppInfoParser                   - Kafka version : 0.9.0.1
2021-01-23 09:38:41,743 INFO  org.apache.kafka.common.utils.AppInfoParser                   - Kafka commitId : 23c69d62a0cabf06
2021-01-23 09:38:41,878 INFO  org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - Consumer subtask 0 will start reading the following 1 partitions from the committed group offsets in Kafka: [KafkaTopicPartition{topic='billow_ex_hy_live_tglog_barrage_report_test', partition=0}]
2021-01-23 09:38:41,883 INFO  org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - Consumer subtask 0 creating fetcher with offsets {KafkaTopicPartition{topic='billow_ex_hy_live_tglog_barrage_report_test', partition=0}=-915623761773}.
2021-01-23 09:38:41,887 INFO  org.apache.flink.runtime.taskmanager.Task                     - Source: kafka-reader-_0 -> Sink: Print to Std. Out (1/1) (7f384a6ac95e594178a601f5ed139ce7) switched from RUNNING to FAILED.
java.lang.Exception: java.lang.IllegalAccessError: tried to access method org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.adjustAutoCommitConfig(Ljava/util/Properties;Lorg/apache/flink/streaming/connectors/kafka/config/OffsetCommitMode;)V from class org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
        at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.checkThrowSourceExecutionException(SourceStreamTask.java:217)
        at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.processInput(SourceStreamTask.java:133)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:301)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:406)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalAccessError: tried to access method org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.adjustAutoCommitConfig(Ljava/util/Properties;Lorg/apache/flink/streaming/connectors/kafka/config/OffsetCommitMode;)V from class org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.createFetcher(FlinkKafkaConsumer.java:223)
        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:695)
        at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
        at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
        at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:203)
2021-01-23 09:38:41,889 INFO  org.apache.flink.runtime.taskmanager.Task                     - Freeing task resources for Source: kafka-reader-_0 -> Sink: Print to Std. Out (1/1) (7f384a6ac95e594178a601f5ed139ce7).
2021-01-23 09:38:41,889 DEBUG org.apache.flink.runtime.taskmanager.Task                     - Release task Source: kafka-reader-_0 -> Sink: Print to Std. Out (1/1) network resources (state: FAILED).

但在flink 1.9.2版本下不存在问题,仔细对比两个环境下的taskexecutor启动日志, 发现1.9.1版本的环境taskexecutor加载了很多包,kafka版本也不是目前本地运行的。

跟踪原因,这个是因为之前生产环境的flink集群,各种问题相当垃圾,少了很多依赖包,编译的时候有些要jar打包进去有些要去掉,kafka集群还是上古时代的0.9版本。

为了应付之前的flink实时任务部署上线,所以本地开发测试搭建了了flink 1.9.1 + kafka 0.9的环境,还在flink的lib目录下放了各种所需要的包,还包括kafka 0.9版本的jar包。

(base) leonlai@LEONLAI-MB0 env % ll flink-1.9.1/lib | wc -l     
     131

(base) leonlai@LEONLAI-MB0 env % ll flink-1.9.1/lib | grep kafka
-rw-r--r--    1 leonlai  staff     37610 Aug 11 18:58 flink-connector-kafka-0.9_2.11-1.9.1.jar
-rw-r--r--    1 leonlai  staff    104635 Aug 11 18:58 flink-connector-kafka-base_2.11-1.9.1.jar
-rw-r--r--    1 leonlai  staff    641408 Aug 11 18:58 kafka-clients-0.9.0.1.jar

看下flink 1.9.2环境的 lib目录,很干净

(base) leonlai@LEONLAI-MB0 env % ll flink-1.9.2/lib        
total 269872
drwxr-xr-x@  7 leonlai  staff       224 Jan 24  2020 .
drwxr-xr-x@ 13 leonlai  staff       416 Jan 24  2020 ..
-rw-r--r--@  1 leonlai  staff  96731949 Jan 24  2020 flink-dist_2.11-1.9.2.jar
-rw-r--r--@  1 leonlai  staff  22182626 Jan 24  2020 flink-table-blink_2.11-1.9.2.jar
-rw-r--r--@  1 leonlai  staff  18750846 Jan 24  2020 flink-table_2.11-1.9.2.jar
-rw-r--r--@  1 leonlai  staff    489884 Aug  8  2019 log4j-1.2.17.jar
-rw-r--r--@  1 leonlai  staff      9931 Aug  8  2019 slf4j-log4j12-1.7.15.jar

 但我的代码是基于新生产环境写的,flink 1.9.1以上的版本+kafka 2.2.0版本,所以在旧的环境下跟kafka 0.9版本冲突了。

解决方法


  1. 恢复flink 1.9.1干净环境,把不需要的包备份然后删掉;或者本地切换到flin 1.9.2干净环境即可,建议。
  2. 将代码和本地kafka环境统一切换到0.9版本,不建议。

 

 

 


推荐阅读
author-avatar
开心宝2502869253
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有