热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Kafka使用脚本根据时间重置消费位移,格式你写对了么?

前言kafka提供了消费组命令工具管理消费组:kafka-consumer-groups.sh,在0.11版本之后引入位移重置功能,重置
前言

kafka提供了消费组命令工具管理消费组:kafka-consumer-groups.sh,在0.11版本之后引入位移重置功能,重置策略如下(引用自官方文档):

--reset-offsets also has following scenarios to choose from (at least one scenario must be selected):

  • --to-datetime : Reset offsets to offsets from datetime. Format: 'YYYY-MM-DDTHH:mm:SS.sss'
  • --to-earliest : Reset offsets to earliest offset.
  • --to-latest : Reset offsets to latest offset.
  • --shift-by : Reset offsets shifting current offset by 'n', where 'n' can be positive or negative.
  • --from-file : Reset offsets to values defined in CSV file.
  • --to-current : Resets offsets to current offset.
  • --by-duration : Reset offsets to offset by duration from current timestamp. Format: 'PnDTnHnMnS'
  • --to-offset : Reset offsets to a specific offset.

就是可以根据时间重置、重置到最小位移、最大位移...等场景。

本文主要聊一下根据时间重置消费位点时候,这个时间格式的问题。

根据时间重置消费位移

示例命令:

sh bin/kafka-consumer-groups.sh \--bootstrap-server 'localhost:9092' --group test_topic_consumer \--topic test_topic \--reset-offsets --to-datetime 2021-11-29T12:00:00.000 \--execute

为了看起来直观点,我加了"\"换行展示。

示例命令重置消费组为test_topic_consumer订阅test_topic的消费偏移为2021年11月29号中午12点整的时候消息位点。

实际上,结果不一定是这样,和时区有关。

协调世界时(UTC)

关于utc,查看维基百科

kafka脚本采用的是utc时间标准,与北京时间换算如下:

点击查看:图片来源

北京为东8时区,采用的是utc+08:00,这一段可以看维基百科,下面我复制出一部分,可以了解一下:

UTC+08:00是比世界协调时间快8小时的时区,理论上的位置在东经112度30分127度30分之间,是东盟标准时间的候选时区之一,居住在本时区的人数约有17亿人,占全世界人口的24%,是全世界人口最多的时区。

该时区亦为包括台湾、新加坡、马来西亚、中国、文莱、印尼中部及澳大利亚西部在内的绝大多数汉语使用者所居住的时区。所以互联网上的不少中文网站会使用该时区标记时间,而不论该网站所在地的官方时区为何。

所以,如果你现在使用的是北京时间,如果按示例重置位点,则实际上不是重置到12:00,而且是重置到了20:00的消息位点。

北京时间重置写法

如果是北京时间,则命令应该如下:

sh bin/kafka-consumer-groups.sh \--bootstrap-server 'localhost:9092' --group test_topic_consumer \--topic test_topic \--reset-offsets --to-datetime 2021-11-29T12:00:00.000+08:00 \--execute

即时间格式为:2021-11-29T12:00:00.000+08:00 ,表示重置到11月29号的12点。当然这里还有其它写法,下面是源码注释中支持的写法:

(1) yyyy-MM-dd'T'HH:mm:ss.SSS, ex: 2020-11-10T16:51:38.198
(2) yyyy-MM-dd'T'HH:mm:ss.SSSZ, ex: 2020-11-10T16:51:38.198+0800
(3) yyyy-MM-dd'T'HH:mm:ss.SSSX, ex: 2020-11-10T16:51:38.198+08
(4) yyyy-MM-dd'T'HH:mm:ss.SSSXX, ex: 2020-11-10T16:51:38.198+0800
(5) yyyy-MM-dd'T'HH:mm:ss.SSSXXX, ex: 2020-11-10T16:51:38.198+08:00

重置流程

kafka根据时间重置消费位点这一块逻辑也是相当简单:

  1. 获取指定topic的分区(也可以是所有topic)
  2. 将时间转换为对应的时间戳,此时转换的时候就是上面提到的时区问题
  3. 根据时间戳获取对应的消息位点
  4. 修改消费位点为对应的消息位点

下面这段代码是根据时间戳查询位点的逻辑:

private def getLogTimestampOffsets(groupId: String, topicPartitions: Seq[TopicPartition], timestamp: java.lang.Long): Map[TopicPartition, LogOffsetResult] = {val timestampOffsets = topicPartitions.map { topicPartition =>// 指定根据时间戳类型查询位点,除此之外还有最小和最大日志位点等topicPartition -> OffsetSpec.forTimestamp(timestamp)}.toMap// 查询消息位点val offsets = adminClient.listOffsets(timestampOffsets.asJava,withTimeoutMs(new ListOffsetsOptions)).all.get//如果时间戳超过当前最新的消息时间了,就是查不到了,就是未知,下面会把未知这种转换为最新的消费位点val (successfulOffsetsForTimes, unsuccessfulOffsetsForTimes) =offsets.asScala.partition(_._2.offset != ListOffsetsResponse.UNKNOWN_OFFSET)val successfulLogTimestampOffsets = successfulOffsetsForTimes.map {case (topicPartition, listOffsetsResultInfo) => topicPartition -> LogOffsetResult.LogOffset(listOffsetsResultInfo.offset)}.toMapunsuccessfulOffsetsForTimes.foreach { entry =>println(s"\nWarn: Partition " + entry._1.partition() + " from topic " + entry._1.topic() +" is empty. Falling back to latest known offset.")}// 将查询到的和未知这种转换为最新的日志位点一起返回,准备重置successfulLogTimestampOffsets ++ getLogEndOffsets(groupId, unsuccessfulOffsetsForTimes.keySet.toSeq)}
可视化重置

如果觉得重置命令太麻烦,推荐一款可视化控制台:kafka-console-ui,github地址:https://github.com/xxd763795151/kafka-console-ui

新手用这个学习还是比较友好的:


推荐阅读
  • 生成式对抗网络模型综述摘要生成式对抗网络模型(GAN)是基于深度学习的一种强大的生成模型,可以应用于计算机视觉、自然语言处理、半监督学习等重要领域。生成式对抗网络 ... [详细]
  • 向QTextEdit拖放文件的方法及实现步骤
    本文介绍了在使用QTextEdit时如何实现拖放文件的功能,包括相关的方法和实现步骤。通过重写dragEnterEvent和dropEvent函数,并结合QMimeData和QUrl等类,可以轻松实现向QTextEdit拖放文件的功能。详细的代码实现和说明可以参考本文提供的示例代码。 ... [详细]
  • 本文介绍了如何将CIM_DateTime解析为.Net DateTime,并分享了解析过程中可能遇到的问题和解决方法。通过使用DateTime.ParseExact方法和适当的格式字符串,可以成功解析CIM_DateTime字符串。同时还提供了关于WMI和字符串格式的相关信息。 ... [详细]
  • sklearn数据集库中的常用数据集类型介绍
    本文介绍了sklearn数据集库中常用的数据集类型,包括玩具数据集和样本生成器。其中详细介绍了波士顿房价数据集,包含了波士顿506处房屋的13种不同特征以及房屋价格,适用于回归任务。 ... [详细]
  • Java在运行已编译完成的类时,是通过java虚拟机来装载和执行的,java虚拟机通过操作系统命令JAVA_HOMEbinjava–option来启 ... [详细]
  • 本文讨论了Kotlin中扩展函数的一些惯用用法以及其合理性。作者认为在某些情况下,定义扩展函数没有意义,但官方的编码约定支持这种方式。文章还介绍了在类之外定义扩展函数的具体用法,并讨论了避免使用扩展函数的边缘情况。作者提出了对于扩展函数的合理性的质疑,并给出了自己的反驳。最后,文章强调了在编写Kotlin代码时可以自由地使用扩展函数的重要性。 ... [详细]
  • 先看官方文档TheJavaTutorialshavebeenwrittenforJDK8.Examplesandpracticesdescribedinthispagedontta ... [详细]
  • Java太阳系小游戏分析和源码详解
    本文介绍了一个基于Java的太阳系小游戏的分析和源码详解。通过对面向对象的知识的学习和实践,作者实现了太阳系各行星绕太阳转的效果。文章详细介绍了游戏的设计思路和源码结构,包括工具类、常量、图片加载、面板等。通过这个小游戏的制作,读者可以巩固和应用所学的知识,如类的继承、方法的重载与重写、多态和封装等。 ... [详细]
  • 在Android开发中,使用Picasso库可以实现对网络图片的等比例缩放。本文介绍了使用Picasso库进行图片缩放的方法,并提供了具体的代码实现。通过获取图片的宽高,计算目标宽度和高度,并创建新图实现等比例缩放。 ... [详细]
  • Spring源码解密之默认标签的解析方式分析
    本文分析了Spring源码解密中默认标签的解析方式。通过对命名空间的判断,区分默认命名空间和自定义命名空间,并采用不同的解析方式。其中,bean标签的解析最为复杂和重要。 ... [详细]
  • CSS3选择器的使用方法详解,提高Web开发效率和精准度
    本文详细介绍了CSS3新增的选择器方法,包括属性选择器的使用。通过CSS3选择器,可以提高Web开发的效率和精准度,使得查找元素更加方便和快捷。同时,本文还对属性选择器的各种用法进行了详细解释,并给出了相应的代码示例。通过学习本文,读者可以更好地掌握CSS3选择器的使用方法,提升自己的Web开发能力。 ... [详细]
  • 本文介绍了Redis的基础数据结构string的应用场景,并以面试的形式进行问答讲解,帮助读者更好地理解和应用Redis。同时,描述了一位面试者的心理状态和面试官的行为。 ... [详细]
  • 阿,里,云,物,联网,net,core,客户端,czgl,aliiotclient, ... [详细]
  • 本文介绍了如何使用C#制作Java+Mysql+Tomcat环境安装程序,实现一键式安装。通过将JDK、Mysql、Tomcat三者制作成一个安装包,解决了客户在安装软件时的复杂配置和繁琐问题,便于管理软件版本和系统集成。具体步骤包括配置JDK环境变量和安装Mysql服务,其中使用了MySQL Server 5.5社区版和my.ini文件。安装方法为通过命令行将目录转到mysql的bin目录下,执行mysqld --install MySQL5命令。 ... [详细]
  • 深入理解Kafka服务端请求队列中请求的处理
    本文深入分析了Kafka服务端请求队列中请求的处理过程,详细介绍了请求的封装和放入请求队列的过程,以及处理请求的线程池的创建和容量设置。通过场景分析、图示说明和源码分析,帮助读者更好地理解Kafka服务端的工作原理。 ... [详细]
author-avatar
天眞啲笨尛孩
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有