首页
技术博客
PHP教程
数据库技术
前端开发
HTML5
Nginx
php论坛
新用户注册
|
会员登录
PHP教程
技术博客
编程问答
PNG素材
编程语言
前端技术
Android
PHP教程
HTML5教程
数据库
Linux技术
Nginx技术
PHP安全
WebSerer
职场攻略
JavaScript
开放平台
业界资讯
大话程序猿
登录
极速注册
取消
热门标签 | HotTags
install
cpython
dockerfile
httpclient
list
frameworks
javascript
nodejs
header
shell
byte
match
format
keyword
solr
perl
bash
php
dll
controller
python3
case
fetch
spring
vba
import
erlang
typescript
request
jar
golang
config
flutter
bit
command
netty
function
regex
web3
future
hashset
dagger
express
stream
go
lua
less
ip
php5
hash
export
uri
bytecode
python2
bitmap
metadata
version
merge
hashcode
数组
js
utf-8
md5
process
subset
replace
emoji
tree
input
heatmap
ascii
split
filter
httprequest
java
eval
loops
copy
c语言
当前位置:
开发笔记
>
编程语言
> 正文
Kafka核心技术与实战——19|CommitFailedException异常怎么处理?
作者:瑾諪kinti_754 | 来源:互联网 | 2023-06-11 22:38
所谓CommitFailedException,顾名思义就是Consumer客户端在提交位移时出现了错误或异常,而且还是那种不可恢复的严重异常很多提交位移的API方法是支持自动错误
所谓 CommitFailedException,顾名思义就是 Consumer 客户端在提交位移时出现了错误或异常,而且还是那种不可恢复的严重异常
很多提交位移的 API 方法是支持自动错误重试的,比如我们在上一期中提到的commitSync 方法
异常解释
本次提交位移失败了,原因是消费者组已经开启了 Rebalance 过程,并且将要提交位移的分区分配给了另一个消费者实例
你的消费者实例花费了太长的时间进行消息处理,耽误了调用 poll 方法
在后半部分,社区给出了两个相应的解决办法(即橙色字部分):
增加期望的时间间隔 max.poll.interval.ms 参数值。
减少 poll 方法一次性返回的消息数量,即减少 max.poll.records 参数值
CommitFailedException 异常通常发生在手动提交位移时,即用户显式调用 KafkaConsumer.commitSync() 方法时
场景一
当消息处理的总时间超过预设的 max.poll.interval.ms 参数值时,Kafka Consumer 端会抛出 CommitFailedException 异常
你只需要写一个 Consumer 程序,使用 KafkaConsumer.subscribe 方法随意订阅一个主题
之后设置 Consumer 端参数 max.poll.interval.ms=5 秒
最后在循环调用 KafkaConsumer.poll 方法之间,插入 Thread.sleep(6000) 和手动提交位移,就可以成功复现这个异常了
Properties props = new Properties();…props.put("max.poll.interval.ms", 5000);consumer.subscribe(Arrays.asList("test-topic")); while (true) { ConsumerRecords
records = consumer.poll(Duration.ofSeconds(1)); // 使用 Thread.sleep 模拟真实的消息处理逻辑 Thread.sleep(6000L); consumer.commitSync();}
如果要防止这种场景下抛出异常,你需要简化你的消息处理逻辑。具体来说有 4 种方法
1、缩短单条消息处理的时间。比如,之前下游系统消费一条消息的时间是 100 毫秒,优化之后成功地下降到 50 毫秒,那么此时 Consumer 端的 TPS 就提升了一倍。
2、增加 Consumer 端允许下游系统消费一批消息的最大时长。这取决于 Consumer 端参数 max.poll.interval.ms 的值。在最新版的 Kafka 中,该参数的默认值是 5 分钟。如果你的消费逻辑不能简化,那么提高该参数值是一个不错的办法。值得一提的是,Kafka 0.10.1.0 之前的版本是没有这个参数的,因此如果你依然在使用 0.10.1.0 之前的客户端 API,那么你需要增加 session.timeout.ms 参数的值。不幸的是,session.timeout.ms 参数还有其他的含义,因此增加该参数的值可能会有其他方面的“不良影响”,这也是社区在 0.10.1.0 版本引入 max.poll.interval.ms 参数,将这部分含义从 session.timeout.ms 中剥离出来的原因之一。
3、减少下游系统一次性消费的消息总数。这取决于 Consumer 端参数 max.poll.records 的值。当前该参数的默认值是 500 条,表明调用一次 KafkaConsumer.poll 方法,最多返回 500 条消息。可以说,该参数规定了单次 poll 方法能够返回的消息总数的上限。如果前两种方法对你都不适用的话,降低此参数值是避免 CommitFailedException 异常最简单的手段。
4、下游系统使用多线程来加速消费。这应该算是“最高级”同时也是最难实现的解决办法了。具体的思路就是,让下游系统手动创建多个消费线程处理 poll 方法返回的一批消息。之前你使用 Kafka Consumer 消费数据更多是单线程的,所以当消费速度无法匹及 Kafka Consumer 消息返回的速度时,它就会抛出 CommitFailedException 异常。如果是多线程,你就可以灵活地控制线程数量,随时调整消费承载能力,再配以目前多核的硬件条件,该方法可谓是防止 CommitFailedException 最高档的解决之道。事实上,很多主流的大数据流处理框架使用的都是这个方法,比如 Apache Flink 在集成 Kafka 时,就是创建了多个 KafkaConsumerThread 线程,自行处理多线程间的数据消费。不过,凡事有利就有弊,这个方法实现起来并不容易,特别是在多个线程间如何处理位移提交这个问题上,更是极容易出错
综合以上这 4 个处理方法,我个人推荐你首先尝试采用方法 1 来预防此异常的发生。优化下游系统的消费逻辑是百利而无一害的法子,不像方法 2、3 那样涉及到 Kafka Consumer 端 TPS 与消费延时(Latency)的权衡。如果方法 1 实现起来有难度,那么你可以按照下面的法则来实践方法 2、3
首先,你需要弄清楚你的下游系统消费每条消息的平均延时是多少。比如你的消费逻辑是从 Kafka 获取到消息后写入到下游的 MongoDB 中,假设访问 MongoDB 的平均延时不超过 2 秒,那么你可以认为消息处理需要花费 2 秒的时间。如果按照 max.poll.records 等于 500 来计算,一批消息的总消费时长大约是 1000 秒,因此你的 Consumer 端的 max.poll.interval.ms 参数值就不能低于 1000 秒。如果你使用默认配置,那默认值 5 分钟显然是不够的,你将有很大概率遭遇 CommitFailedException 异常。将 max.poll.interval.ms 增加到 1000 秒以上的做法就属于上面的第 2 种方法。
除了调整 max.poll.interval.ms 之外,你还可以选择调整 max.poll.records 值,减少每次 poll 方法返回的消息数。还拿刚才的例子来说,你可以设置 max.poll.records 值为 150,甚至更少,这样每批消息的总消费时长不会超过 300 秒(150*2=300),即 max.poll.interval.ms 的默认值 5 分钟。这种减少 max.poll.records 值的做法就属于上面提到的方法 3。
场景二
了解这个冷门场景,可以帮助你拓宽 Kafka Consumer 的知识面,也能提前预防一些古怪的问题
学习 Kafka 的消费者,不过大都集中在消费者组上,即所谓的 Consumer Group。其实,Kafka Java Consumer 端还提供了一个名为 Standalone Consumer 的独立消费者
消费者组和独立消费者在使用之前都要指定 group.id
现在问题来了,如果你的应用中同时出现了设置相同 group.id 值的消费者组程序和独立消费者程序,那么当独立消费者程序手动提交位移时,Kafka 就会立即抛出 CommitFailedException 异常,因为 Kafka 无法识别这个具有相同 group.id 的消费者实例,于是就向它返回一个错误,表明它不是消费者组内合法的成员
虽然说这个场景很冷门,但也并非完全不会遇到。在一个大型公司中,特别是那些将 Kafka 作为全公司级消息引擎系统的公司中,每个部门或团队都可能有自己的消费者应用,谁能保证各自的 Consumer 程序配置的 group.id 没有重复呢?
小结
kafka
io
sum
api
int
list
session
timeout
多线程
写下你的评论吧 !
吐个槽吧,看都看了
会员登录
|
用户注册
推荐阅读
fetch
kafka 0.9+消费者配置参数说明
ConsumerConfiguration在kafka0.9使用JavaConsumer替代了老版本的scalaConsumer。新版的配置如下:bootstrap. ...
[详细]
蜡笔小新 2023-10-16 10:44:59
import
Java容器中的compareto方法排序原理解析
本文从源码解析Java容器中的compareto方法的排序原理,讲解了在使用数组存储数据时的限制以及存储效率的问题。同时提到了Redis的五大数据结构和list、set等知识点,回忆了作者大学时代的Java学习经历。文章以作者做的思维导图作为目录,展示了整个讲解过程。 ...
[详细]
蜡笔小新 2023-12-14 13:53:31
jar
XML介绍与使用的概述及标签规则
本文介绍了XML的基本概念和用途,包括XML的可扩展性和标签的自定义特性。同时还详细解释了XML标签的规则,包括标签的尖括号和合法标识符的组成,标签必须成对出现的原则以及特殊标签的使用方法。通过本文的阅读,读者可以对XML的基本知识有一个全面的了解。 ...
[详细]
蜡笔小新 2023-12-13 17:39:50
spring
SpringJdbcTemplate的使用详解
本文详细介绍了Spring的JdbcTemplate的使用方法,包括执行存储过程、存储函数的call()方法,执行任何SQL语句的execute()方法,单个更新和批量更新的update()和batchUpdate()方法,以及单查和列表查询的query()和queryForXXX()方法。提供了经过测试的API供使用。 ...
[详细]
蜡笔小新 2023-12-13 14:27:11
spring
2021最新总结网易/腾讯/CVTE/字节面经分享(附答案解析)
本文分享作者在2021年面试网易、腾讯、CVTE和字节等大型互联网企业的经历和问题,包括稳定性设计、数据库优化、分布式锁的设计等内容。同时提供了大厂最新面试真题笔记,并附带答案解析。 ...
[详细]
蜡笔小新 2023-12-09 19:11:31
spring
什么是大数据lambda架构
一、什么是Lambda架构Lambda架构由Storm的作者[NathanMarz]提出,根据维基百科的定义,Lambda架构的设计是为了在处理大规模数 ...
[详细]
蜡笔小新 2023-10-17 16:06:09
list
Ansibleplaybook roles安装redis实例(学习笔记二十九)
1、相关redis参数:2、templatesredis.conf配置相关参数:daemonizeyespidfilevarrunredis_{{red ...
[详细]
蜡笔小新 2023-10-17 15:59:52
request
讨伐Java多线程与高并发——MQ篇
本文是学习Java多线程与高并发知识时做的笔记。这部分内容比较多,按照内容分为5个部分:多线程基础篇JUC篇同步容器和并发容器篇线程池篇MQ篇本篇 ...
[详细]
蜡笔小新 2023-10-16 11:14:01
php
druid接入kafka indexing service整个流程
先介绍下我们的druid集群配置Overload1台Coordinator1台Middlemanager3台Broker3台Historical一共12台,其中cold6台,hot ...
[详细]
蜡笔小新 2023-10-15 19:51:21
php
你知道Kafka和Redis的各自优缺点吗?一文带你优化选择,不走弯路
你知道Kafka和Redis的各自优缺点吗?一文带你优化选择,不走弯路 ...
[详细]
蜡笔小新 2023-10-15 17:24:27
import
Table counter missing events when exactly once is enabled and the process is abruptly killed (kill9)
Checklist[x]Ihaveincludedinformationaboutrelevantversions ...
[详细]
蜡笔小新 2023-10-15 15:43:43
php
kafka教程基本概念
kafka教程基本概念 ...
[详细]
蜡笔小新 2023-10-14 18:38:21
php
马蜂窝数据总监分享:从数仓到数据中台,大数据演进技术选型最优解
大家好,今天分享的议题主要包括几大内容:带大家回顾一下大数据在国内的发展,从传统数仓到当前数据中台的演进过程;我个人认为数 ...
[详细]
蜡笔小新 2023-10-14 14:20:07
import
javascript – 概述在Firefox上无法正常工作
我试图提出一些自定义大纲,以达到一些Web可访问性建议.但我不能用Firefox制作.这就是它在Chrome上的外观:而那个图标实际上是一个锚点.在Firefox上,它只概述了整个 ...
[详细]
蜡笔小新 2023-12-14 10:20:38
list
JavaSE笔试题-接口、抽象类、多态等问题解答
本文解答了JavaSE笔试题中关于接口、抽象类、多态等问题。包括Math类的取整数方法、接口是否可继承、抽象类是否可实现接口、抽象类是否可继承具体类、抽象类中是否可以有静态main方法等问题。同时介绍了面向对象的特征,以及Java中实现多态的机制。 ...
[详细]
蜡笔小新 2023-12-14 10:01:13
瑾諪kinti_754
这个家伙很懒,什么也没留下!
Tags | 热门标签
install
cpython
dockerfile
httpclient
list
frameworks
javascript
nodejs
header
shell
byte
match
format
keyword
solr
perl
bash
php
dll
controller
python3
case
fetch
spring
vba
import
erlang
typescript
request
jar
RankList | 热门文章
1
最优页面替换算法
2
VS2010 添加搜索路径和链接库的方法
3
STM32入坑(12)串口发送字节、半字、字、字符串、数组及实现串口控制
4
hanlp的基本使用
5
PostgreSQL使用方法小结(未完待续)
6
c语言不允许对数组大小作动态定义,c语言第07章数组.ppt
7
《踏莎行》翻译 原文赏析诗人宋卢祖皋
8
Autojs自动化 实现自动删除公众号文章(通过订阅号助手删除)
9
Win10双系统安装教程(适用所有计算机)
10
Stata数据可视化: 十幅精美图形的绘制
11
SCCM 2012系列4 配置SCCM2012 Endpoint Protection上
12
node mysql update_Node.js操作mysql数据库增删改查
13
DreamWeaver MX 2004中 设为首页和加入收藏的实现
14
SPOJ (不知道题号是多少) DETER3Find The Determinant III
15
【HDU 1009】FatMouse' Trade
PHP1.CN | 中国最专业的PHP中文社区 |
DevBox开发工具箱
|
json解析格式化
|
PHP资讯
|
PHP教程
|
数据库技术
|
服务器技术
|
前端开发技术
|
PHP框架
|
开发工具
|
在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved |
京公网安备 11010802041100号
|
京ICP备19059560号-4
| PHP1.CN 第一PHP社区 版权所有