热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Kudu:删除了不存在的数据报错status=Notfound:keynotfound(error0)

大数据-sparkspark专栏,sprak源码解读九师兄9.90去订阅1.美图2.背景一个实时同步的任务,突然发现写入Kudu的是时候,执行报错020-01-1709:3

1.美图


2.背景

一个实时同步的任务,突然发现写入Kudu的是时候,执行

报错

020-01-17 09:32:22,067 ERROR com.dtwave.meteor.connector.core.service.WorkerSubTask -[deliverRecords][590]: Sub task T_6676615507350773-0 threw an uncaught exception. Sub task is being killed and will not recover util manually restarted.
com.dtwave.meteor.connector.common.exception.SinkException: Sub task T_6676615507350773-0 flush error: Row error for primary key="1000", tablet=null, server=1a65f2ced6b84c2096b0fd39097bfe5e, status=Not found: key not found (error 0)at com.dtwave.meteor.connector.kudu.util.BufferedRecords.flush(BufferedRecords.java:106)at com.dtwave.meteor.connector.kudu.KuduWriter.write(KuduWriter.java:86)at com.dtwave.meteor.connector.kudu.KuduSinkTask.sink(KuduSinkTask.java:73)at com.dtwave.meteor.connector.core.service.WorkerSubTask.deliverRecords(WorkerSubTask.java:576)at com.dtwave.meteor.connector.core.service.WorkerSubTask.poll(WorkerSubTask.java:470)at com.dtwave.meteor.connector.core.service.WorkerSubTask.iteration(WorkerSubTask.java:360)at com.dtwave.meteor.connector.core.service.WorkerSubTask.execute(WorkerSubTask.java:291)at com.dtwave.meteor.connector.core.service.WorkerSubTask.doRun(WorkerSubTask.java:252)

3.原因分析

Apache Kudu不能删除不存在的数据

使用Apache Kudu客户端,对KafkaConnect Sink进行扩展。使用的Apache Kudu的Java客户端。突然有天发现作业无法提交,一直报错。

后来才发现这是Kudu自身的一种校验机制。为了忽略这种校验机制,更符合我们的SQL习惯,我对代码做了改造。

而在Kudu的提交配置.上,使用了手动提交的配置。而且我也建议使用手动提交的配置,这样效率更好,提交后对于异常数据的处理更加完整。
配置方式如下:

session . setFlushMode( SessionConf iguration. FlushMode . MANUAL_ FLUSH);

flush的代码原本是

/*** 最终flush操作,主要解决删除的时候,删除了不存在的数据*/private void terminalFlush() throws KuduException {final java.util.List<OperationResponse> responses = session.flush();for (OperationResponse response : responses) {if (response.hasRowError()) {throw new SinkException("encounter key not found error.More.detail " +"=> table : "+ response.getRowError().getOperation().getTable().getName() +"" +"=> row : "+response.getRowError().getOperation().getRow().stringifyRowKey());}}LOG.debug("Sub task {} flushed sink records", id);}

修改为

/*** 最终flush操作,主要解决删除的时候,删除了不存在的数据*/
private void terminalFlush() throws KuduException {final java.util.List<OperationResponse> responses = session.flush();for (OperationResponse response : responses) {if (!response.hasRowError()) {continue;}String errorString = response.getRowError().toString();// 主要过滤kudu 删除的时候,删除到了不存在的数据if(errorString.contains("key not found")){LOG.warn("encounter key not found error.More.detail " +"=> table : "+ response.getRowError().getOperation().getTable().getName() +"" +"=> row : "+response.getRowError().getOperation().getRow().stringifyRowKey());continue;}if (response.hasRowError()) {throw new SinkException("encounter key not found error.More.detail " +"=> table : "+ response.getRowError().getOperation().getTable().getName() +"" +"=> row : "+response.getRowError().getOperation().getRow().stringifyRowKey());}}LOG.debug("Sub task {} flushed sink records", id);
}

参考:http://www.rengongzineng.com.cn/post/5131.html


推荐阅读
author-avatar
岩蕃wy之人
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有