误导:
如果一直查 Job execution failed 的话,查到的大部分解决方法都是需要 nc -lk 端口号
其实这个报错的关键是在kafka上,日志拉下来会看到下面的这一段
日志中的这段,我遇到的是数字是2147483646(可以参考这篇文章),但网上查到的不同数字的解决方式都大同小异
Error: No entry found for connection 2147483646
我的代码情况是:
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "consumer-group");
properties.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("auto.offset.reset", "latest");
DataStream inputStream = env.addSource(new FlinkKafkaConsumer("hotitems", new SimpleStringSchema(), properties));
解决方式是:
在kafka的server.properties这个文件中进行修改,我是修改成下面这样子(ip可以根据自己的需求进行替换)。然后再重启kafka就可以了。
listeners=PLAINTEXT://localhost:9092
advertised.listeners=PLAINTEXT://localhost:9092