热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

【kafka】Consumerisnotsubscribedtoanytopics

1.概述一个网友的问题,然后我帮他解决,后来没告诉我后面结果如何了,先转载记录一下转载:https:blog.csdn.n

在这里插入图片描述


1.概述

一个网友的问题,然后我帮他解决,后来没告诉我后面结果如何了,先转载记录一下
转载:
https://blog.csdn.net/github_32521685/article/details/89953671

产生该问题的原因主要是zookeeper中存在旧版本的kafka-connect topic信息,导致新版本的kafka-connect启动异常:

ERROR Unexpected exception in Thread[KafkaBasedLog Work Thread - connect-configs,5,main] (org.apache.kafka.connect.util.KafkaBasedLog:334)
java.lang.IllegalStateException: Consumer is not subscribed to any topics or assigned any partitionsat org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1109)at org.apache.kafka.connect.util.KafkaBasedLog.poll(KafkaBasedLog.java:256)at org.apache.kafka.connect.util.KafkaBasedLog.access$500(KafkaBasedLog.java:69)at org.apache.kafka.connect.util.KafkaBasedLog$WorkThread.run(KafkaBasedLog.java:327)

解决办法:

(1) 使用kafka命令列出所有与connect相关的topic:

bin/kafka-topics.sh --list --zookeeper 10.255.8.102:2181

输出:

__consumer_offsets
ambari_kafka_service_check
connect-configs
connect-offsets
connect-status

(2)使用kafka命令删除所有与connect相关的topic:

bin/kafka-topics.sh --delete --zookeeper 10.255.8.102:2181 --topic connect-configs
bin/kafka-topics.sh --delete --zookeeper 10.255.8.102:2181 --topic connect-offsets
bin/kafka-topics.sh --delete --zookeeper 10.255.8.102:2181 --topic connect-status

最后验证是否删除:

bin/kafka-topics.sh --list --zookeeper 10.255.8.102:2181

输出:

__consumer_offsets
ambari_kafka_service_check
connect-configs - marked for deletion
connect-offsets - marked for deletion
connect-status - marked for deletion

输出信息中显示三个connect topic已被标记删除了,要想彻底删除,需要在kafka的server.properties配置文件里设置delete.topic.enable=true


2.分析原因

从指定的主题或者分区获取数据,在poll之前,你没有订阅任何主题或分区是不行的,每一次poll,消费者都会尝试使用最后一次消费的offset作为接下来获取数据的start offset,最后一次消费的offset也可以通过seek(TopicPartition, long)设置或者自动设置
通过源码可以找到:

public ConsumerRecords<K, V> poll(long timeout) {acquire();try {if (timeout < 0)throw new IllegalArgumentException("Timeout must not be negative");// 如果没有任何订阅&#xff0c;抛出异常if (this.subscriptions.hasNoSubscriptionOrUserAssignment())throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions");// 一直poll新数据直到超时long start &#61; time.milliseconds();// 距离超时还剩余多少时间long remaining &#61; timeout;do {// 获取数据&#xff0c;如果自动提交&#xff0c;则进行偏移量自动提交&#xff0c;如果设置offset重置&#xff0c;则进行offset重置Map<TopicPartition, List<ConsumerRecord<K, V>>> records &#61; pollOnce(remaining);if (!records.isEmpty()) {// 再返回结果之前&#xff0c;我们可以进行下一轮的fetch请求&#xff0c;避免阻塞等待fetcher.sendFetches();client.pollNoWakeup();// 如果有拦截器进行拦截&#xff0c;没有直接返回if (this.interceptors &#61;&#61; null)return new ConsumerRecords<>(records);elsereturn this.interceptors.onConsume(new ConsumerRecords<>(records));}long elapsed &#61; time.milliseconds() - start;remaining &#61; timeout - elapsed;} while (remaining > 0);return ConsumerRecords.empty();} finally {release();}
}

推荐阅读
  • 深入理解Kafka服务端请求队列中请求的处理
    本文深入分析了Kafka服务端请求队列中请求的处理过程,详细介绍了请求的封装和放入请求队列的过程,以及处理请求的线程池的创建和容量设置。通过场景分析、图示说明和源码分析,帮助读者更好地理解Kafka服务端的工作原理。 ... [详细]
  • Android系统源码分析Zygote和SystemServer启动过程详解
    本文详细解析了Android系统源码中Zygote和SystemServer的启动过程。首先介绍了系统framework层启动的内容,帮助理解四大组件的启动和管理过程。接着介绍了AMS、PMS等系统服务的作用和调用方式。然后详细分析了Zygote的启动过程,解释了Zygote在Android启动过程中的决定作用。最后通过时序图展示了整个过程。 ... [详细]
  • centos安装Mysql的方法及步骤详解
    本文介绍了centos安装Mysql的两种方式:rpm方式和绿色方式安装,详细介绍了安装所需的软件包以及安装过程中的注意事项,包括检查是否安装成功的方法。通过本文,读者可以了解到在centos系统上如何正确安装Mysql。 ... [详细]
  • 我正在使用sql-serverkafka-connect和debezium监视sqlserver数据库,但是当我发布并运行我的wo ... [详细]
  • Nginx使用(server参数配置)
    本文介绍了Nginx的使用,重点讲解了server参数配置,包括端口号、主机名、根目录等内容。同时,还介绍了Nginx的反向代理功能。 ... [详细]
  • 目录实现效果:实现环境实现方法一:基本思路主要代码JavaScript代码总结方法二主要代码总结方法三基本思路主要代码JavaScriptHTML总结实 ... [详细]
  • Centos7.6安装Gitlab教程及注意事项
    本文介绍了在Centos7.6系统下安装Gitlab的详细教程,并提供了一些注意事项。教程包括查看系统版本、安装必要的软件包、配置防火墙等步骤。同时,还强调了使用阿里云服务器时的特殊配置需求,以及建议至少4GB的可用RAM来运行GitLab。 ... [详细]
  • 本文介绍了如何使用C#制作Java+Mysql+Tomcat环境安装程序,实现一键式安装。通过将JDK、Mysql、Tomcat三者制作成一个安装包,解决了客户在安装软件时的复杂配置和繁琐问题,便于管理软件版本和系统集成。具体步骤包括配置JDK环境变量和安装Mysql服务,其中使用了MySQL Server 5.5社区版和my.ini文件。安装方法为通过命令行将目录转到mysql的bin目录下,执行mysqld --install MySQL5命令。 ... [详细]
  • 本文介绍了Python爬虫技术基础篇面向对象高级编程(中)中的多重继承概念。通过继承,子类可以扩展父类的功能。文章以动物类层次的设计为例,讨论了按照不同分类方式设计类层次的复杂性和多重继承的优势。最后给出了哺乳动物和鸟类的设计示例,以及能跑、能飞、宠物类和非宠物类的增加对类数量的影响。 ... [详细]
  • Spring学习(4):Spring管理对象之间的关联关系
    本文是关于Spring学习的第四篇文章,讲述了Spring框架中管理对象之间的关联关系。文章介绍了MessageService类和MessagePrinter类的实现,并解释了它们之间的关联关系。通过学习本文,读者可以了解Spring框架中对象之间的关联关系的概念和实现方式。 ... [详细]
  • 基于Socket的多个客户端之间的聊天功能实现方法
    本文介绍了基于Socket的多个客户端之间实现聊天功能的方法,包括服务器端的实现和客户端的实现。服务器端通过每个用户的输出流向特定用户发送消息,而客户端通过输入流接收消息。同时,还介绍了相关的实体类和Socket的基本概念。 ... [详细]
  • 本文为Codeforces 1294A题目的解析,主要讨论了Collecting Coins整除+不整除问题。文章详细介绍了题目的背景和要求,并给出了解题思路和代码实现。同时提供了在线测评地址和相关参考链接。 ... [详细]
  • 使用nodejs爬取b站番剧数据,计算最佳追番推荐
    本文介绍了如何使用nodejs爬取b站番剧数据,并通过计算得出最佳追番推荐。通过调用相关接口获取番剧数据和评分数据,以及使用相应的算法进行计算。该方法可以帮助用户找到适合自己的番剧进行观看。 ... [详细]
  • 如何自行分析定位SAP BSP错误
    The“BSPtag”Imentionedintheblogtitlemeansforexamplethetagchtmlb:configCelleratorbelowwhichi ... [详细]
  • Nginx使用AWStats日志分析的步骤及注意事项
    本文介绍了在Centos7操作系统上使用Nginx和AWStats进行日志分析的步骤和注意事项。通过AWStats可以统计网站的访问量、IP地址、操作系统、浏览器等信息,并提供精确到每月、每日、每小时的数据。在部署AWStats之前需要确认服务器上已经安装了Perl环境,并进行DNS解析。 ... [详细]
author-avatar
花落---守护者
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有