热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

【kafka】Consumerisnotsubscribedtoanytopics

1.概述一个网友的问题,然后我帮他解决,后来没告诉我后面结果如何了,先转载记录一下转载:https:blog.csdn.n

在这里插入图片描述


1.概述

一个网友的问题,然后我帮他解决,后来没告诉我后面结果如何了,先转载记录一下
转载:
https://blog.csdn.net/github_32521685/article/details/89953671

产生该问题的原因主要是zookeeper中存在旧版本的kafka-connect topic信息,导致新版本的kafka-connect启动异常:

ERROR Unexpected exception in Thread[KafkaBasedLog Work Thread - connect-configs,5,main] (org.apache.kafka.connect.util.KafkaBasedLog:334)
java.lang.IllegalStateException: Consumer is not subscribed to any topics or assigned any partitionsat org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1109)at org.apache.kafka.connect.util.KafkaBasedLog.poll(KafkaBasedLog.java:256)at org.apache.kafka.connect.util.KafkaBasedLog.access$500(KafkaBasedLog.java:69)at org.apache.kafka.connect.util.KafkaBasedLog$WorkThread.run(KafkaBasedLog.java:327)

解决办法:

(1) 使用kafka命令列出所有与connect相关的topic:

bin/kafka-topics.sh --list --zookeeper 10.255.8.102:2181

输出:

__consumer_offsets
ambari_kafka_service_check
connect-configs
connect-offsets
connect-status

(2)使用kafka命令删除所有与connect相关的topic:

bin/kafka-topics.sh --delete --zookeeper 10.255.8.102:2181 --topic connect-configs
bin/kafka-topics.sh --delete --zookeeper 10.255.8.102:2181 --topic connect-offsets
bin/kafka-topics.sh --delete --zookeeper 10.255.8.102:2181 --topic connect-status

最后验证是否删除:

bin/kafka-topics.sh --list --zookeeper 10.255.8.102:2181

输出:

__consumer_offsets
ambari_kafka_service_check
connect-configs - marked for deletion
connect-offsets - marked for deletion
connect-status - marked for deletion

输出信息中显示三个connect topic已被标记删除了,要想彻底删除,需要在kafka的server.properties配置文件里设置delete.topic.enable=true


2.分析原因

从指定的主题或者分区获取数据,在poll之前,你没有订阅任何主题或分区是不行的,每一次poll,消费者都会尝试使用最后一次消费的offset作为接下来获取数据的start offset,最后一次消费的offset也可以通过seek(TopicPartition, long)设置或者自动设置
通过源码可以找到:

public ConsumerRecords<K, V> poll(long timeout) {acquire();try {if (timeout < 0)throw new IllegalArgumentException("Timeout must not be negative");// 如果没有任何订阅&#xff0c;抛出异常if (this.subscriptions.hasNoSubscriptionOrUserAssignment())throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions");// 一直poll新数据直到超时long start &#61; time.milliseconds();// 距离超时还剩余多少时间long remaining &#61; timeout;do {// 获取数据&#xff0c;如果自动提交&#xff0c;则进行偏移量自动提交&#xff0c;如果设置offset重置&#xff0c;则进行offset重置Map<TopicPartition, List<ConsumerRecord<K, V>>> records &#61; pollOnce(remaining);if (!records.isEmpty()) {// 再返回结果之前&#xff0c;我们可以进行下一轮的fetch请求&#xff0c;避免阻塞等待fetcher.sendFetches();client.pollNoWakeup();// 如果有拦截器进行拦截&#xff0c;没有直接返回if (this.interceptors &#61;&#61; null)return new ConsumerRecords<>(records);elsereturn this.interceptors.onConsume(new ConsumerRecords<>(records));}long elapsed &#61; time.milliseconds() - start;remaining &#61; timeout - elapsed;} while (remaining > 0);return ConsumerRecords.empty();} finally {release();}
}

推荐阅读
author-avatar
花落---守护者
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有