作者:mobiledu2502872023 | 来源:互联网 | 2023-09-18 16:57
今日在使用kafka时,发现将 auto.offset.reset 设置为earliest、latest、none都没有达到自己预期的效果。
- earliest: 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
- latest: 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
- none: topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
模拟一:
方法:本轮配置的topic有被提交过过offset 值,之后将自动提交 offset值设置为false,并且也不手动提交offset 值;同时生产者生产新数据
@Bean
public KafkaConsumer kafkaConsumer() {
Map cOnfig= new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
config.put(ConsumerConfig.GROUP_ID_CONFIG, "testGroup");
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
// config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
KafkaConsumer cOnsumer= new KafkaConsumer<>(config);
return consumer;
}
earliest:读取全部新产生的数据,服务重启也会重复进行消费
latest:读取全部新产生的数据,服务重启也会重复进行消费
none:读取全部新产生的数据,服务重启也会重复进行消费
结论:各分区下有已提交的offset,三个参数都会从offset 后面开始消费
模拟二:
我们需要消费kafka上面积累的所有数据(默认48小时),如何消费?
在配置生产者时,我们重新配置一个新的groupId,新的groupId没有配置过 offset,重启程序
@Bean
public KafkaConsumer kafkaConsumer() {
Map cOnfig= new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
config.put(ConsumerConfig.GROUP_ID_CONFIG, "testGroup-new");
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
// config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
KafkaConsumer cOnsumer= new KafkaConsumer<>(config);
return consumer;
}
earliest:读取全部历史数据数据,服务重启也会重复进行消费
latest:读取生产者当前新产生的数据,不会消费之前的数据
none:服务重启后,程序报错
结论:创建新分组时,earliest 每个分区是从头开始消费的。latest读取最新数据,none 没有为消费者组找到先前的offset值时,抛出异常
总结:
1.如果存在已经提交的offest时,不管设置为earliest 或者latest 都会从已经提交的offest处开始消费 如果不存在已经提交的offest时,earliest 表示从头开始消费,latest 表示从最新的数据消费,也就是新产生的数据. none topic各分区都存在已提交的offset时,从提交的offest处开始消费;只要有一个分区不存在已提交的offset,则抛出异常
2.topic中已有分组消费数据,新建其他分组ID的消费者时,之前分组提交的offset对新建的分组消费不起作用。
3.不更改group.id,只是添加了config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");,consumer不会从最开始的位置消费 ,只要不更改消费组,只会从上次消费结束的地方继续消费;同理不更改group.id,latest也不会只消费最新的数据,只要不更改消费组,只会从上次消费结束的地方继续消费