问题描述
本地部署过两套flink环境,分别是1.9.1和1.9.2版本。最近在1.9.1版本下启动flink任务时,taskmanager日志正常,但taskexecutor疯狂打印日志,如下
2021-01-23 09:38:41,743 INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 0.9.0.1
2021-01-23 09:38:41,743 INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : 23c69d62a0cabf06
2021-01-23 09:38:41,878 INFO org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - Consumer subtask 0 will start reading the following 1 partitions from the committed group offsets in Kafka: [KafkaTopicPartition{topic='billow_ex_hy_live_tglog_barrage_report_test', partition=0}]
2021-01-23 09:38:41,883 INFO org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - Consumer subtask 0 creating fetcher with offsets {KafkaTopicPartition{topic='billow_ex_hy_live_tglog_barrage_report_test', partition=0}=-915623761773}.
2021-01-23 09:38:41,887 INFO org.apache.flink.runtime.taskmanager.Task - Source: kafka-reader-_0 -> Sink: Print to Std. Out (1/1) (7f384a6ac95e594178a601f5ed139ce7) switched from RUNNING to FAILED.
java.lang.Exception: java.lang.IllegalAccessError: tried to access method org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.adjustAutoCommitConfig(Ljava/util/Properties;Lorg/apache/flink/streaming/connectors/kafka/config/OffsetCommitMode;)V from class org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.checkThrowSourceExecutionException(SourceStreamTask.java:217)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.processInput(SourceStreamTask.java:133)
at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:301)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:406)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalAccessError: tried to access method org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.adjustAutoCommitConfig(Ljava/util/Properties;Lorg/apache/flink/streaming/connectors/kafka/config/OffsetCommitMode;)V from class org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.createFetcher(FlinkKafkaConsumer.java:223)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:695)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:203)
2021-01-23 09:38:41,889 INFO org.apache.flink.runtime.taskmanager.Task - Freeing task resources for Source: kafka-reader-_0 -> Sink: Print to Std. Out (1/1) (7f384a6ac95e594178a601f5ed139ce7).
2021-01-23 09:38:41,889 DEBUG org.apache.flink.runtime.taskmanager.Task - Release task Source: kafka-reader-_0 -> Sink: Print to Std. Out (1/1) network resources (state: FAILED).
但在flink 1.9.2版本下不存在问题,仔细对比两个环境下的taskexecutor启动日志, 发现1.9.1版本的环境taskexecutor加载了很多包,kafka版本也不是目前本地运行的。
跟踪原因,这个是因为之前生产环境的flink集群,各种问题相当垃圾,少了很多依赖包,编译的时候有些要jar打包进去有些要去掉,kafka集群还是上古时代的0.9版本。
为了应付之前的flink实时任务部署上线,所以本地开发测试搭建了了flink 1.9.1 + kafka 0.9的环境,还在flink的lib目录下放了各种所需要的包,还包括kafka 0.9版本的jar包。
(base) leonlai@LEONLAI-MB0 env % ll flink-1.9.1/lib | wc -l
131
(base) leonlai@LEONLAI-MB0 env % ll flink-1.9.1/lib | grep kafka
-rw-r--r-- 1 leonlai staff 37610 Aug 11 18:58 flink-connector-kafka-0.9_2.11-1.9.1.jar
-rw-r--r-- 1 leonlai staff 104635 Aug 11 18:58 flink-connector-kafka-base_2.11-1.9.1.jar
-rw-r--r-- 1 leonlai staff 641408 Aug 11 18:58 kafka-clients-0.9.0.1.jar
看下flink 1.9.2环境的 lib目录,很干净
(base) leonlai@LEONLAI-MB0 env % ll flink-1.9.2/lib
total 269872
drwxr-xr-x@ 7 leonlai staff 224 Jan 24 2020 .
drwxr-xr-x@ 13 leonlai staff 416 Jan 24 2020 ..
-rw-r--r--@ 1 leonlai staff 96731949 Jan 24 2020 flink-dist_2.11-1.9.2.jar
-rw-r--r--@ 1 leonlai staff 22182626 Jan 24 2020 flink-table-blink_2.11-1.9.2.jar
-rw-r--r--@ 1 leonlai staff 18750846 Jan 24 2020 flink-table_2.11-1.9.2.jar
-rw-r--r--@ 1 leonlai staff 489884 Aug 8 2019 log4j-1.2.17.jar
-rw-r--r--@ 1 leonlai staff 9931 Aug 8 2019 slf4j-log4j12-1.7.15.jar
但我的代码是基于新生产环境写的,flink 1.9.1以上的版本+kafka 2.2.0版本,所以在旧的环境下跟kafka 0.9版本冲突了。
解决方法
- 恢复flink 1.9.1干净环境,把不需要的包备份然后删掉;或者本地切换到flin 1.9.2干净环境即可,建议。
- 将代码和本地kafka环境统一切换到0.9版本,不建议。