热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

列名或所提供值的数目与表定义不匹配_FlinkSQL实现自定义支持多Topic的KafkaTableSource

导读:在FlinkSQL中TableSource提供对存储在外部系统(数据库,键值存储,消息队列)或文件中的数据的访问。在TableEnv

导读:在FlinkSQL中TableSource提供对存储在外部系统(数据库,键值存储,消息队列)或文件中的数据的访问。 在TableEnvironment中注册TableSource后,可以通过Table API或SQL查询对其进行访问。Flink 已内置了一批常用的Source & Sink,如KafkaTableSource、JDBCTableSource等。本文将基于自定义KafkaTableSource从以下四点展开讨论:

  • 需要自定义KafkaTableSource场景描述
  • 实现自定义KafkaTableSource的思路
  • 自定义KafkaTableSource具体实现方式
  • 自定义KafkaTableSourceFactory具体实现方式

场景描述

场景描述:从多个渠道采集用户行为数据(数据来自不同渠道,但是结构类型一致),并将采集到的数据传输到各个渠道在Kafka所对应的topic中,之后由Flink Job访问Kafka获取数据并进行处理。

解决方案:这里采用FlinkSQL进行Job开发,通过使用内置的KafkaTableSource的connector.topic属性可以实现从指定Kafka topic获取消息。

bsTableEnv.sqlUpdate("CREATE TABLE sourceTable (" + "name STRING," + "country STRING" + ")" + "WITH (" + "'connector.type' = 'kafka'," + "'connector.version' = '"+CONNECTOR_VERSION+"'," + "'connector.topic' = '"+SOURCE_TOPIC+"'," + "'connector.properties.zookeeper.connect' = '"+ZOOKEEPER_CONNECT+"'," + "'connector.properties.bootstrap.servers' = '"+BROKER_LIST+"'," + "'connector.properties.group.id' = '"+GROUP_ID+"'," + "'format.type' = 'json')");

美中不足之处:由于数据虽然来自不同渠道,但是结构类型一致。我们期望可以通过一个Job处理多个topic的消息,而通过分析代码发现KafkaTableSource的connector.topic只能支持单个topic Name。代码如下:

1、KafkaTableSource 通过 createKafkaConsumer 方法来创建 FlinkKafkaConsumer。

8b007d4e011b81c0d4e9b058a47bd0ef.png

2、进入createKafkaConsumer发现其构造方法中对传入的String topic 通过Collections.singletonList(topic) 包装成了一个List。

f1ab2cb9973cbab24dc2b905ebe73c93.png

基于以上分析发现KafkaTableSource的connector.topic并不能支持多个topic的写法。

实现自定义KafkaTableSource的思路

1、既然KafkaTableSource满足不了我们的需求,而官方支持用户自定义Sources,那么我们可以通过自定义一个KafkaTableSource来达到我们的需求,Flink自定义Sources & Sink 文档地址:

//FlinkSQL 1.10--User-defined Sources & Sinks https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sourceSinks.html#define-a-tablesource

翻阅官方文档,发现可以通过实现StreamTableSource接口来构建一个自定义StreamTableSource

7ffeac2676f80a58f579666390072d93.png

2、接下来,我们查看了StreamTableSource的层次结构,发现Flink还提供了KafkaTableSourceBase类

3548d645f6aabe50015fed75945e1d8c.png

StreamTableSource层次结构

KafkaTableSourceBase类实现了StreamTableSource接口,同时也提供构建一个自定义KafkaTableSource所需的大部分基础内容(包括连接器connector的实现,这里就不深入讨论,感兴趣的朋友可以参考官方文档:

https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/kafka.html)。

这里我们要关注一个点,KafkaTableSourceBase类提供了一个createKafkaConsumer()的抽象方法,用于创建KafkaConsumer。

fbce4022491e03a02ba8a380ef3d3067.png

KafkaTableSourceBase类

a6542ad74e68652ecdb3a63d608f2bb6.png

createKafkaConsumer

3、我们再看看KafkaTableSource类,其继承了KafkaTableSourceBase类,并实现了createKafkaConsumer()方法(这里要注意FlinkKafkaConsumer的版本问题)。

257c3629757b76f4c7abb280b437d408.png

KafkaTableSource

4、深入查看FlinkKafkaConsumer类,发现其提供了多个构造方法,其中包括可接收List topics。

a3fdbbbf6a0c84db2123625a12b4590a.png

FlinkKafkaConsumer

那么到这里我们便可以参考KafkaTableSource例子,通过继承KafkaTableSourceBase并根据自身需求实现createKafkaConsumer方法。

5、这里把类图简单画出来助于理解,层次结构如下:

77be78ac02ac59d1dd015d6a0a5ba36c.png

自定义KafkaTableSource实现方式

1、编写一个MoreTopicKafkaTableSource类并继承 KafkaTableSourceBase,然后重写createKafkaConsumer()方法。这里简单思路为将topic根据逗号进行切割并存成一个List。

aa30cc6b6ce642a470f11c3e50361170.png

MoreTopicKafkaTableSource

2、通过FlinkKafkaConsumer的 FlinkKafkaConsumer(List topics, DeserializationSchema deserializer, Properties props) 构造方法达到可以多topic的效果。

问题:自此自定义的MoreTopicKafkaTableSource完成了,但是如何将Source在环境中注册同时又如何让程序知道我们想用的是MoreTopicKafkaTableSource而不是内置的KafkaTableSource?

自定义KafkaTableSourceFactory实现方式

1、查阅官方文档发现,我们可以通过定义TableFactory来解决以上的问题。文档如下:

88341f3b114a4c9fadfa068a3fdd8fe4.png

TableFactory允许从基于字符串的属性中创建与表相关的不同实例。 调用所有可用的工厂以匹配给定的属性集和相应的工厂类。工厂利用Java的服务提供商接口(SPI)进行发现。 这意味着每个依赖项和JAR文件都应在META_INF / services资源目录中包含一个文件org.apache.flink.table.factories.TableFactory,该文件列出了它提供的所有可用表工厂。

2、接下来,我们参考KafkaTableSource类所对应的KafkaTableSourceSinkFactory工厂类,该工厂类继承了KafkaTableSourceSinkFactoryBase。这里要关注的是KafkaTableSourceSinkFactory实现了createKafkaTableSource()方法,在该方法中创建了KafkaTableSource对象。

7e0cdeeb171a8216b8faf63848abfb14.png

3、同样,我们也创建一个MoreTopicKafkaTableSourceSinkFactory并继承KafkaTableSourceSinkFactoryBase,实现createKafkaTableSource()方法使其创建我们的MoreTopicKafkaTableSource对象。

9bc3e368b6c42d2cde7fbed6a2c7643c.png

4、在这里还有一个重要的点就是requiredContext()方法,其用于匹配工厂。

ff9c388b67150da709349b740200b32f.png

指定为其实现此工厂的上下文。如果满足指定的属性和值集,则框架保证仅与此工厂匹配。典型的属性可能是连接器类型, 格式.type,或更新模式。属性键,如connector.property-version 和 format.property-version后兼容情况保留的。

由于KafkaTableSourceSinkFactory和我们的自定义MoreTopicKafkaTableSourceSinkFactory都继承了KafkaTableSourceSinkFactoryBase,而KafkaTableSourceSinkFactoryBase提供了requiredContext。这会导致框架匹配到两个工厂,出现异常。

解决办法:可以在MoreTopicKafkaTableSourceSinkFactory 自定义多一个key-value。

01fe5476357d4b39a214ffcd1de6b484.png

5、最后一个很重要的步骤是在 resource 目录下添加文件夹 META_INF/services,并创建文件 org.apache.flink.table.factories.TableFactory,在文件中写上新建的 Factory 类,自此结束。

总结
  1. 编写一个MoreTopicKafkaTableSource 继承 KafkaTableSourceBase,并重写createKafkaConsumer方法。
  2. 创建一个MoreTopicKafkaTableSourceSinkFactory并继承KafkaTableSourceSinkFactoryBase,实现createKafkaTableSource()方法使其创建我们的MoreTopicKafkaTableSource。
  3. 在MoreTopicKafkaTableSourceSinkFactory中重写requiredContext并为工厂匹配添加多一个key-value规则使匹配更加精确。
  4. resource 目录下添加文件夹 META_INF/services,并创建文件 org.apache.flink.table.factories.TableFactory,在文件中写上新建的 Factory 类。

感谢您的阅读,如果喜欢本文欢迎关注和转发,本头条号将坚持持续分享IT技术知识。对于文章内容有其他想法或意见建议等,欢迎提出共同讨论共同进步。



推荐阅读
  • Nacos 0.3 数据持久化详解与实践
    本文详细介绍了如何将 Nacos 0.3 的数据持久化到 MySQL 数据库,并提供了具体的步骤和注意事项。 ... [详细]
  • 包含phppdoerrorcode的词条 ... [详细]
  • Hadoop的文件操作位于包org.apache.hadoop.fs里面,能够进行新建、删除、修改等操作。比较重要的几个类:(1)Configurati ... [详细]
  • DAO(Data Access Object)模式是一种用于抽象和封装所有对数据库或其他持久化机制访问的方法,它通过提供一个统一的接口来隐藏底层数据访问的复杂性。 ... [详细]
  • 本文介绍如何使用 Python 的 DOM 和 SAX 方法解析 XML 文件,并通过示例展示了如何动态创建数据库表和处理大量数据的实时插入。 ... [详细]
  • 从0到1搭建大数据平台
    从0到1搭建大数据平台 ... [详细]
  • 使用HTML和JavaScript实现视频截图功能
    本文介绍了如何利用HTML和JavaScript实现从远程MP4、本地摄像头及本地上传的MP4文件中截取视频帧,并展示了具体的实现步骤和示例代码。 ... [详细]
  • 一个建表一个执行crud操作建表代码importandroid.content.Context;importandroid.database.sqlite.SQLiteDat ... [详细]
  • Spring Data JdbcTemplate 入门指南
    本文将介绍如何使用 Spring JdbcTemplate 进行数据库操作,包括查询和插入数据。我们将通过一个学生表的示例来演示具体步骤。 ... [详细]
  • 基于iSCSI的SQL Server 2012群集测试(一)SQL群集安装
    一、测试需求介绍与准备公司计划服务器迁移过程计划同时上线SQLServer2012,引入SQLServer2012群集提高高可用性,需要对SQLServ ... [详细]
  • oracle c3p0 dword 60,web_day10 dbcp c3p0 dbutils
    createdatabasemydbcharactersetutf8;alertdatabasemydbcharactersetutf8;1.自定义连接池为了不去经常创建连接和释放 ... [详细]
  • 本文详细解析了Java类加载系统的父子委托机制。在Java程序中,.java源代码文件编译后会生成对应的.class字节码文件,这些字节码文件需要通过类加载器(ClassLoader)进行加载。ClassLoader采用双亲委派模型,确保类的加载过程既高效又安全,避免了类的重复加载和潜在的安全风险。该机制在Java虚拟机中扮演着至关重要的角色,确保了类加载的一致性和可靠性。 ... [详细]
  • ### 优化后的摘要本学习指南旨在帮助读者全面掌握 Bootstrap 前端框架的核心知识点与实战技巧。内容涵盖基础入门、核心功能和高级应用。第一章通过一个简单的“Hello World”示例,介绍 Bootstrap 的基本用法和快速上手方法。第二章深入探讨 Bootstrap 与 JSP 集成的细节,揭示两者结合的优势和应用场景。第三章则进一步讲解 Bootstrap 的高级特性,如响应式设计和组件定制,为开发者提供全方位的技术支持。 ... [详细]
  • 深入解析:React与Webpack配置进阶指南(第二部分)
    在本篇进阶指南的第二部分中,我们将继续探讨 React 与 Webpack 的高级配置技巧。通过实际案例,我们将展示如何使用 React 和 Webpack 构建一个简单的 Todo 应用程序,具体包括 `TodoApp.js` 文件中的代码实现,如导入 React 和自定义组件 `TodoList`。此外,我们还将深入讲解 Webpack 配置文件的优化方法,以提升开发效率和应用性能。 ... [详细]
  • 在第二课中,我们将深入探讨Scala的面向对象编程核心概念及其在Spark源码中的应用。首先,通过详细的实战案例,全面解析Scala中的类和对象。作为一门纯面向对象的语言,Scala的类设计和对象使用是理解其面向对象特性的关键。此外,我们还将介绍如何通过阅读Spark源码来进一步巩固对这些概念的理解。这不仅有助于提升编程技能,还能为后续的高级应用开发打下坚实的基础。 ... [详细]
author-avatar
难得一见_Eva
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有