ERROR Unexpected exception in Thread[KafkaBasedLog Work Thread - connect-configs,5,main](org.apache.kafka.connect.util.KafkaBasedLog:334) java.lang.IllegalStateException: Consumer is not subscribed to any topics or assigned any partitionsat org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1109)at org.apache.kafka.connect.util.KafkaBasedLog.poll(KafkaBasedLog.java:256)at org.apache.kafka.connect.util.KafkaBasedLog.access$500(KafkaBasedLog.java:69)at org.apache.kafka.connect.util.KafkaBasedLog$WorkThread.run(KafkaBasedLog.java:327)
public ConsumerRecords<K, V>poll(long timeout){acquire();try{if(timeout <0)thrownewIllegalArgumentException("Timeout must not be negative");// 如果没有任何订阅&#xff0c;抛出异常if(this.subscriptions.hasNoSubscriptionOrUserAssignment())thrownewIllegalStateException("Consumer is not subscribed to any topics or assigned any partitions");// 一直poll新数据直到超时long start &#61; time.milliseconds();// 距离超时还剩余多少时间long remaining &#61; timeout;do{// 获取数据&#xff0c;如果自动提交&#xff0c;则进行偏移量自动提交&#xff0c;如果设置offset重置&#xff0c;则进行offset重置Map<TopicPartition, List<ConsumerRecord<K, V>>> records &#61;pollOnce(remaining);if(!records.isEmpty()){// 再返回结果之前&#xff0c;我们可以进行下一轮的fetch请求&#xff0c;避免阻塞等待fetcher.sendFetches();client.pollNoWakeup();// 如果有拦截器进行拦截&#xff0c;没有直接返回if(this.interceptors &#61;&#61; null)returnnewConsumerRecords<>(records);elsereturnthis.interceptors.onConsume(newConsumerRecords<>(records));}long elapsed &#61; time.milliseconds()- start;remaining &#61; timeout - elapsed;}while(remaining >0);return ConsumerRecords.empty();}finally{release();} }