热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Kafka之auto.offset.reset值解析

今日在使用kafka时,发现将 auto.offset.reset设置为earliest、latest、none都没有达到自

今日在使用kafka时,发现将 auto.offset.reset 设置为earliestlatest、none都没有达到自己预期的效果。

  • earliest: 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
  • latest: 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
  • none: topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常

模拟一:

方法:本轮配置的topic有被提交过过offset 值,之后将自动提交 offset值设置为false,并且也不手动提交offset 值;同时生产者生产新数据

@Bean
public KafkaConsumer kafkaConsumer() {
Map cOnfig= new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
config.put(ConsumerConfig.GROUP_ID_CONFIG, "testGroup");
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
// config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
KafkaConsumer cOnsumer= new KafkaConsumer<>(config);
return consumer;
}

earliest:读取全部新产生的数据,服务重启也会重复进行消费

 latest:读取全部新产生的数据,服务重启也会重复进行消费

 none:读取全部新产生的数据,服务重启也会重复进行消费

结论:各分区下有已提交的offset,三个参数都会从offset 后面开始消费

模拟二:

我们需要消费kafka上面积累的所有数据(默认48小时),如何消费?

在配置生产者时,我们重新配置一个新的groupId,新的groupId没有配置过 offset,重启程序

@Bean
public KafkaConsumer kafkaConsumer() {
Map cOnfig= new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
config.put(ConsumerConfig.GROUP_ID_CONFIG, "testGroup-new");
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
// config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
KafkaConsumer cOnsumer= new KafkaConsumer<>(config);
return consumer;
}

earliest:读取全部历史数据数据,服务重启也会重复进行消费

  latest:读取生产者当前新产生的数据,不会消费之前的数据

  none:服务重启后,程序报错

 结论:创建新分组时,earliest 每个分区是从头开始消费的。latest读取最新数据,none 没有为消费者组找到先前的offset值时,抛出异常

总结

1.如果存在已经提交的offest时,不管设置为earliest 或者latest 都会从已经提交的offest处开始消费 如果不存在已经提交的offest时,earliest 表示从头开始消费,latest 表示从最新的数据消费,也就是新产生的数据. none topic各分区都存在已提交的offset时,从提交的offest处开始消费;只要有一个分区不存在已提交的offset,则抛出异常

2.topic中已有分组消费数据,新建其他分组ID的消费者时,之前分组提交的offset对新建的分组消费不起作用。

3.不更改group.id,只是添加了config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");,consumer不会从最开始的位置消费 ,只要不更改消费组,只会从上次消费结束的地方继续消费;同理不更改group.id,latest也不会只消费最新的数据,只要不更改消费组,只会从上次消费结束的地方继续消费


版权声明:本文为Romantic_lei原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。
原文链接:https://blog.csdn.net/Romantic_lei/article/details/126597740
推荐阅读
author-avatar
mobiledu2502872023
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有