热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Kafka使用脚本根据时间重置消费位移,格式你写对了么?

前言kafka提供了消费组命令工具管理消费组:kafka-consumer-groups.sh,在0.11版本之后引入位移重置功能,重置
前言

kafka提供了消费组命令工具管理消费组:kafka-consumer-groups.sh,在0.11版本之后引入位移重置功能,重置策略如下(引用自官方文档):

--reset-offsets also has following scenarios to choose from (at least one scenario must be selected):

  • --to-datetime : Reset offsets to offsets from datetime. Format: 'YYYY-MM-DDTHH:mm:SS.sss'
  • --to-earliest : Reset offsets to earliest offset.
  • --to-latest : Reset offsets to latest offset.
  • --shift-by : Reset offsets shifting current offset by 'n', where 'n' can be positive or negative.
  • --from-file : Reset offsets to values defined in CSV file.
  • --to-current : Resets offsets to current offset.
  • --by-duration : Reset offsets to offset by duration from current timestamp. Format: 'PnDTnHnMnS'
  • --to-offset : Reset offsets to a specific offset.

就是可以根据时间重置、重置到最小位移、最大位移...等场景。

本文主要聊一下根据时间重置消费位点时候,这个时间格式的问题。

根据时间重置消费位移

示例命令:

sh bin/kafka-consumer-groups.sh \--bootstrap-server 'localhost:9092' --group test_topic_consumer \--topic test_topic \--reset-offsets --to-datetime 2021-11-29T12:00:00.000 \--execute

为了看起来直观点,我加了"\"换行展示。

示例命令重置消费组为test_topic_consumer订阅test_topic的消费偏移为2021年11月29号中午12点整的时候消息位点。

实际上,结果不一定是这样,和时区有关。

协调世界时(UTC)

关于utc,查看维基百科

kafka脚本采用的是utc时间标准,与北京时间换算如下:

点击查看:图片来源

北京为东8时区,采用的是utc+08:00,这一段可以看维基百科,下面我复制出一部分,可以了解一下:

UTC+08:00是比世界协调时间快8小时的时区,理论上的位置在东经112度30分127度30分之间,是东盟标准时间的候选时区之一,居住在本时区的人数约有17亿人,占全世界人口的24%,是全世界人口最多的时区。

该时区亦为包括台湾、新加坡、马来西亚、中国、文莱、印尼中部及澳大利亚西部在内的绝大多数汉语使用者所居住的时区。所以互联网上的不少中文网站会使用该时区标记时间,而不论该网站所在地的官方时区为何。

所以,如果你现在使用的是北京时间,如果按示例重置位点,则实际上不是重置到12:00,而且是重置到了20:00的消息位点。

北京时间重置写法

如果是北京时间,则命令应该如下:

sh bin/kafka-consumer-groups.sh \--bootstrap-server 'localhost:9092' --group test_topic_consumer \--topic test_topic \--reset-offsets --to-datetime 2021-11-29T12:00:00.000+08:00 \--execute

即时间格式为:2021-11-29T12:00:00.000+08:00 ,表示重置到11月29号的12点。当然这里还有其它写法,下面是源码注释中支持的写法:

(1) yyyy-MM-dd'T'HH:mm:ss.SSS, ex: 2020-11-10T16:51:38.198
(2) yyyy-MM-dd'T'HH:mm:ss.SSSZ, ex: 2020-11-10T16:51:38.198+0800
(3) yyyy-MM-dd'T'HH:mm:ss.SSSX, ex: 2020-11-10T16:51:38.198+08
(4) yyyy-MM-dd'T'HH:mm:ss.SSSXX, ex: 2020-11-10T16:51:38.198+0800
(5) yyyy-MM-dd'T'HH:mm:ss.SSSXXX, ex: 2020-11-10T16:51:38.198+08:00

重置流程

kafka根据时间重置消费位点这一块逻辑也是相当简单:

  1. 获取指定topic的分区(也可以是所有topic)
  2. 将时间转换为对应的时间戳,此时转换的时候就是上面提到的时区问题
  3. 根据时间戳获取对应的消息位点
  4. 修改消费位点为对应的消息位点

下面这段代码是根据时间戳查询位点的逻辑:

private def getLogTimestampOffsets(groupId: String, topicPartitions: Seq[TopicPartition], timestamp: java.lang.Long): Map[TopicPartition, LogOffsetResult] = {val timestampOffsets = topicPartitions.map { topicPartition =>// 指定根据时间戳类型查询位点,除此之外还有最小和最大日志位点等topicPartition -> OffsetSpec.forTimestamp(timestamp)}.toMap// 查询消息位点val offsets = adminClient.listOffsets(timestampOffsets.asJava,withTimeoutMs(new ListOffsetsOptions)).all.get//如果时间戳超过当前最新的消息时间了,就是查不到了,就是未知,下面会把未知这种转换为最新的消费位点val (successfulOffsetsForTimes, unsuccessfulOffsetsForTimes) =offsets.asScala.partition(_._2.offset != ListOffsetsResponse.UNKNOWN_OFFSET)val successfulLogTimestampOffsets = successfulOffsetsForTimes.map {case (topicPartition, listOffsetsResultInfo) => topicPartition -> LogOffsetResult.LogOffset(listOffsetsResultInfo.offset)}.toMapunsuccessfulOffsetsForTimes.foreach { entry =>println(s"\nWarn: Partition " + entry._1.partition() + " from topic " + entry._1.topic() +" is empty. Falling back to latest known offset.")}// 将查询到的和未知这种转换为最新的日志位点一起返回,准备重置successfulLogTimestampOffsets ++ getLogEndOffsets(groupId, unsuccessfulOffsetsForTimes.keySet.toSeq)}
可视化重置

如果觉得重置命令太麻烦,推荐一款可视化控制台:kafka-console-ui,github地址:https://github.com/xxd763795151/kafka-console-ui

新手用这个学习还是比较友好的:


推荐阅读
  • 微信小程序实现类似微博的无限回复功能,内置云开发数据库支持
    本文详细介绍了如何利用微信小程序实现类似于微博的无限回复功能,并充分利用了微信云开发的数据库支持。文中不仅提供了关键代码片段,还包含了完整的页面代码,方便开发者按需使用。此外,HTML页面中包含了一些示例图片,开发者可以根据个人喜好进行替换。文章还将展示详细的数据库结构设计,帮助读者更好地理解和实现这一功能。 ... [详细]
  • 本文详细介绍了在 React Native 开发过程中遇到的 'Could not connect to development server' 错误及其解决方法。该问题不仅影响开发效率,而且难以通过网络资源找到确切的解决方案。本文将提供详细的步骤,帮助开发者快速解决这一常见问题。 ... [详细]
  • 解决Bootstrap DataTable Ajax请求重复问题
    在最近的一个项目中,我们使用了JQuery DataTable进行数据展示,虽然使用起来非常方便,但在测试过程中发现了一个问题:当查询条件改变时,有时查询结果的数据不正确。通过FireBug调试发现,点击搜索按钮时,会发送两次Ajax请求,一次是原条件的请求,一次是新条件的请求。 ... [详细]
  • 思科IOS XE与ISE集成实现TACACS认证配置
    本文详细介绍了如何在思科IOS XE设备上配置TACACS认证,并通过ISE(Identity Services Engine)进行用户管理和授权。配置包括网络拓扑、设备设置和ISE端的具体步骤。 ... [详细]
  • poj 3352 Road Construction ... [详细]
  • MySQL Decimal 类型的最大值解析及其在数据处理中的应用艺术
    在关系型数据库中,表的设计与SQL语句的编写对性能的影响至关重要,甚至可占到90%以上。本文将重点探讨MySQL中Decimal类型的最大值及其在数据处理中的应用技巧,通过实例分析和优化建议,帮助读者深入理解并掌握这一重要知识点。 ... [详细]
  • Codeforces竞赛解析:Educational Round 84(Div. 2评级),题目A:奇数和问题
    Codeforces竞赛解析:Educational Round 84(Div. 2评级),题目A:奇数和问题 ... [详细]
  • 在使用 Cacti 进行监控时,发现已运行的转码机未产生流量,导致 Cacti 监控界面显示该转码机处于宕机状态。进一步检查 Cacti 日志,发现数据库中存在 SQL 查询失败的问题,错误代码为 145。此问题可能是由于数据库表损坏或索引失效所致,建议对相关表进行修复操作以恢复监控功能。 ... [详细]
  • 基于Net Core 3.0与Web API的前后端分离开发:Vue.js在前端的应用
    本文介绍了如何使用Net Core 3.0和Web API进行前后端分离开发,并重点探讨了Vue.js在前端的应用。后端采用MySQL数据库和EF Core框架进行数据操作,开发环境为Windows 10和Visual Studio 2019,MySQL服务器版本为8.0.16。文章详细描述了API项目的创建过程、启动步骤以及必要的插件安装,为开发者提供了一套完整的开发指南。 ... [详细]
  • Java Socket 关键参数详解与优化建议
    Java Socket 的 API 虽然被广泛使用,但其关键参数的用途却鲜为人知。本文详细解析了 Java Socket 中的重要参数,如 backlog 参数,它用于控制服务器等待连接请求的队列长度。此外,还探讨了其他参数如 SO_TIMEOUT、SO_REUSEADDR 等的配置方法及其对性能的影响,并提供了优化建议,帮助开发者提升网络通信的稳定性和效率。 ... [详细]
  • 在Linux系统中,网络配置是至关重要的任务之一。本文详细解析了Firewalld和Netfilter机制,并探讨了iptables的应用。通过使用`ip addr show`命令来查看网卡IP地址(需要安装`iproute`包),当网卡未分配IP地址或处于关闭状态时,可以通过`ip link set`命令进行配置和激活。此外,文章还介绍了如何利用Firewalld和iptables实现网络流量控制和安全策略管理,为系统管理员提供了实用的操作指南。 ... [详细]
  • 在 CentOS 6.5 系统上部署 VNC 服务器的详细步骤与配置指南
    在 CentOS 6.5 系统上部署 VNC 服务器时,首先需要确认 VNC 服务是否已安装。通常情况下,VNC 服务默认未安装。可以通过运行特定的查询命令来检查其安装状态。如果查询结果为空,则表明 VNC 服务尚未安装,需进行手动安装。此外,建议在安装前确保系统的软件包管理器已更新至最新版本,以避免兼容性问题。 ... [详细]
  • 尽管我们尽最大努力,任何软件开发过程中都难免会出现缺陷。为了更有效地提升对支持部门的协助与支撑,本文探讨了多种策略和最佳实践,旨在通过改进沟通、增强培训和支持流程来减少这些缺陷的影响,并提高整体服务质量和客户满意度。 ... [详细]
  • 在订单服务启动过程中,首先会从Eureka服务器中查找已注册的配置中心,随后从Gitee配置仓库中获取特定的 `order-test.yml` 文件,以确保服务能够正确加载所需的配置信息。这一流程保证了配置管理的灵活性和可维护性。 ... [详细]
  • 在CentOS 7上部署WebRTC网关Janus
    在CentOS 7上部署WebRTC网关Janus ... [详细]
author-avatar
天眞啲笨尛孩
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有