热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

[官方Flink入门笔记]八、TableAPI编程

一.什么是TableAPI为了更好地了解TableAPI,我们先看下Flink都提供了哪些API供用户使用。1.1.FlinkAPI总览Flink根据使用的便捷性
一 .什么是 Table API

为了更好地了解 Table API,我们先看下 Flink 都提供了哪些 API 供用户使用。

1.1. Flink API 总览

在这里插入图片描述
Flink 根据使用的便捷性和表达能力的强弱提供了 3 层 API,由上到下,表达能力逐渐增强,比如 processFunction,是最底层的 API,表达能力最强,我们可以用他来操作 state 和 timer 等复杂功能。Datastream API 相对于 processFunction 来说,又进行了进一步封装,提供了很多标准的语义算子给大家使用,比如我们常用的 window 算子(包括 Tumble, slide,session 等)。那么最上面的 SQL 和 Table API 使用最为便捷,具有自身的很多特点,重点归纳如下:

在这里插入图片描述

  • 第一: Table API & SQL 是一种声明式的 API。用户只需关心做什么,不用关心怎么做,比如图中的 WordCount 例子,只需要关心按什么维度聚合,做哪种类型的聚合,不需要关心底层的实现。

  • 第二: 高性能。Table API & SQL 底层会有优化器对 query 进行优化。举个例子,假如 WordCount 的例子里写了两个 count 操作,优化器会识别并避免重复的计算,计算的时候只保留一个 count 操作,输出的时候再把相同的值输出两遍即可,以达到更好的性能。

  • 第三: 流批统一。上图例子可以发现,API 并没有区分流和批,同一套 query 可以流批复用,对业务开发来说,避免开发两套代码。

  • 第四: 标准稳定。Table API & SQL 遵循 SQL 标准,不易变动。API 比较稳定的好处是不用考虑 API 兼容性问题。

  • 第五: 易理解。语义明确,所见即所得。


1.2. Table API 特性

Table API 自身的特性。主要可以归纳为以下两点:

在这里插入图片描述

  • 第一,Table API 使得多声明的数据处理写起来比较容易。

比如我们有一个 Table&#xff08;tab&#xff09;&#xff0c;并且需要执行一些过滤操作然后输出到结果表&#xff0c;对应的实现是&#xff1a;tab.where(“a <10”).inertInto(“resultTable1”)&#xff1b;此外&#xff0c;我们还需要做另外一些筛选&#xff0c;然后也对结果输出&#xff0c;即 tab.where(“a > 100”).insertInto(“resultTable2”)。你会发现&#xff0c;用 Table API 写起来会非常简洁方便&#xff0c;两行代码就把功能实现了。

  • 第二&#xff0c;Table API 是 Flink 自身的一套 API&#xff0c;这使得我们更容易地去扩展标准的 SQL。
    当然&#xff0c;在扩展 SQL 的时候并不是随意的去扩展&#xff0c;需要考虑 API 的语义、原子性和正交性&#xff0c;并且当且仅当需要的时候才去添加。

对比 SQL&#xff0c;我们可以认为 Table API 是 SQL 的超集。SQL 有的操作&#xff0c;Table API 可以有&#xff0c;然而我们又可以从易用性和功能性地角度对 SQL 进行扩展和提升。

二 .Table API编程

我们来看下如何用 Table API 来编程。本章会先从一个 WordCount 的例子出发&#xff0c;让大家对 Table API 编程先有一个大概的认识&#xff0c;然后再具体介绍一下 Table API 的操作&#xff0c;比如&#xff0c;如何获取一个 Table&#xff0c;如何输出一个 Table&#xff0c;以及如何对 Table 执行查询操作。

2.1. WordCount举例

这是一个完整的&#xff0c;用 java 编写的 batch 版本的 WordCount 例子&#xff0c;此外&#xff0c;还有 scala 和 streaming 版本的 WordCount&#xff0c;都统一上传到了 GitHub 上&#xff08;https://github.com/hequn8128/TableApiDemo&#xff09;&#xff0c;大家可以下载下来尝试运行或者修改。

package org.apache.flink.table.api.example.stream;import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.FileSystem;
import org.apache.flink.table.descriptors.OldCsv;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.types.Row;public class JavaStreamWordCount {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env &#61; StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tEnv &#61; TableEnvironment.getTableEnvironment(env);String path &#61; JavaStreamWordCount.class.getClassLoader().getResource("words.txt").getPath();tEnv.connect(new FileSystem().path(path)).withFormat(new OldCsv().field("word", Types.STRING).lineDelimiter("\n")).withSchema(new Schema().field("word", Types.STRING)).inAppendMode().registerTableSource("fileSource");Table result &#61; tEnv.scan("fileSource").groupBy("word").select("word, count(1) as count");tEnv.toRetractStream(result, Row.class).print();env.execute();}
}

我们具体看下这个 WordCount 的例子。首先是对 environment 的一些初始化&#xff0c;先通过 ExecutionEnvironment 的 getExecutionEnvironment 方法拿到执行环境&#xff0c;然后再通过 BatchTableEnvironment 的 create 拿到对应的 Table 环境&#xff0c;拿到环境后&#xff0c;我们可以注册 TableSource、TableSink 或执行一些其他操作。

这里需要注意的是&#xff0c;ExecutionEnvironment 跟 BatchTableEnvironment 都是对应 Java 的版本&#xff0c;对于 scala 程序&#xff0c;这里需要是一个对应 scala 版本的 environment。这也是初学者一开始可能会遇到的问题&#xff0c;因为 environent 有很多且容易混淆。为了让大家更好区分这些 environment&#xff0c;下面对 environment 进行了一些归纳。
在这里插入图片描述
这里从 batch/stream&#xff0c;还有 Java/scala&#xff0c;对 environment 进行了分类&#xff0c;对于这些 environment 使用时需要特别注意&#xff0c;不要 import 错了。environment 的问题&#xff0c;社区已经进行了一些讨论&#xff0c;如上图下方的链接&#xff0c;这里不再具体展开。

我们再回到刚刚的 WordCount 的例子&#xff0c;拿到 environment 后&#xff0c;需要做的第二件事情是注册对应的TableSource。

tEnv.connect(new FileSystem().path(path)).withFormat(new OldCsv().field("word", Types.STRING).lineDelimiter("\n")).withSchema(new Schema().field("word", Types.STRING)).registerTableSource("fileSource");

使用起来也非常方便&#xff0c;首先&#xff0c;因为我们要读一个文件&#xff0c;需要指定读取文件的路径&#xff0c;指定了之后&#xff0c;我们需要再描述文件内容的格式&#xff0c;比如他是 csv 的文件并且行分割符是什么。还有就是指定这个文件对应的 Schema 是什么&#xff0c;比如只有一列单词&#xff0c;并且类型是 String。最后&#xff0c;我们需要把 TableSource 注册到 environment 里去。

Table result &#61; tEnv.scan("fileSource").groupBy("word").select("word, count(1) as count");tEnv.toDataSet(result, Row.class).print();

通过 scan 刚才注册好的 TableSource&#xff0c;我们可以拿到一个 Table 对象&#xff0c;并执行相应的一些操作&#xff0c;比如 GroupBy&#xff0c;count。最后&#xff0c;可以把 Table 按 DataSet 的方式进行输出。

以上便是一个 Table API 的 WordCount 完整例子。涉及 Table 的获取&#xff0c;Table 的操作&#xff0c;以及 Table 的输出。接下来会具体介绍如何获取 Table、输出 Table 和执行 Table 操作。

2.2. 如何获取一个Table

获取 Table 大体可以分为两步&#xff0c;

  • 第一步&#xff0c;注册对应的 TableSource&#xff1b;
  • 第二步&#xff0c;调用 Table environement 的 scan 方法获取 Table 对象。

注册 Table Source 又有3种方法&#xff1a;通过 Table descriptor 来注册&#xff0c;通过自定义 source 来注册&#xff0c;或者通过 DataStream 来注册。

具体的注册方式如下图所示&#xff1a;
在这里插入图片描述

2.3. 如何输出一个Table

对应输出 Table&#xff0c;我们也有类似的3种方法&#xff1a;Table descriptor, 自定义 Table sink 以及输出成一个 DataStream。

如下图所示&#xff1a;

在这里插入图片描述

2.4. 如何操作一个Table


2.4.1. Table 操作总览

Table 上有很多操作&#xff0c;比如一些 projection 操作 select、filter、where&#xff1b;聚合操作&#xff0c;如 groupBy、flatAggrgate&#xff1b;还有join操作&#xff0c;等等。
我们以一个具体的例子来介绍下 Table 上各操作的转换流程。
在这里插入图片描述

当我们拿到一个 Table 后&#xff0c;调用 groupBy 会返回一个 GroupedTable。
GroupedTable 里只有 select 方法&#xff0c;对 GroupedTable 调用 select 方法会返回一个 Table。
拿到这个 Table 后&#xff0c;我们可以再调用 Table 上的方法。

图中其他 Table&#xff0c;如 OverWindowedTable 也是类似的流程。
值得注意的是&#xff0c;引入各个类型的 Table 是为了保证 API 的合法性和便利性&#xff0c;比如 groupBy 之后只有 select 操作是有意义的&#xff0c;在编辑器上可以直接点出来。

前面我们提到&#xff0c;可以将 Table API 看成是 SQL 的超集&#xff0c;因此我们也可以对 Table 里的操作按此进行分类&#xff0c;大致分为三类&#xff0c;如下图所示&#xff1a;
在这里插入图片描述

  • 第一类&#xff0c;是跟 SQL 对齐的一些操作&#xff0c;比如 select, filter, join 等。
  • 第二类&#xff0c;是一些提升 Table API 易用性的操作。
  • 第三类&#xff0c;是增强 Table API 功能的一些操作。

第一类操作由于和 SQL 类似&#xff0c;比较容易理解&#xff0c;其次&#xff0c;也可以查看官方的文档&#xff0c;了解具体的方法&#xff0c;所以这里不再展开介绍。

2.4.2. 提升易用性相关操作

介绍易用性之前&#xff0c;我们先来看一个问题。假设我们有一张很大的表&#xff0c;里面有一百列&#xff0c;此时需要去掉一列&#xff0c;那么SQL怎么写&#xff1f;我们需要 select 剩下的 99 列&#xff01;显然这会给用户带来不小的代价。为了解决这个问题&#xff0c;我们在Table上引入了一个 dropColumns 方法。利用 dropColumns 方法&#xff0c;我们便可以只写去掉的列。与此对应&#xff0c;还引入了 addColumns, addOrReplaceColumns 和 renameColumns 方法&#xff0c;如下图所示&#xff1a;
在这里插入图片描述

我们再看下面另一个问题&#xff1a;假设还是一张100列的表&#xff0c;我们需要选第20到第80列&#xff0c;那么我们如何操作呢&#xff1f;为了解决这个问题&#xff0c;我们又引入了 withColumns 和 withoutColumns 方法。对于刚才的问题&#xff0c;我们可以简单地写成 table.select(“withColumns(20 to 80)”)。

在这里插入图片描述
在这里插入图片描述

2.4.3. 增强功能相关操作

该小节会介绍下 TableAggregateFunction 的功能和用法。
在引入 TableAggregateFunction 之前&#xff0c;Flink 里有三种自定义函数&#xff1a;ScalarFunction&#xff0c;TableFunction 和 AggregateFunction。我们可以从输入和输出的维度对这些自定义函数进行分类。
如下图所示&#xff0c;ScalarFunction 是输入一行&#xff0c;输出一行&#xff1b;TableFunction 是输入一行&#xff0c;输出多行&#xff1b;AggregateFunction 是输入多行输出一行。

为了让语义更加完整&#xff0c;Table API 新加了 TableAggregateFunction&#xff0c;它可以接收和输出多行。
TableAggregateFunction 添加后&#xff0c;Table API 的功能可以得到很大的扩展&#xff0c;某种程度上可以用它来实现自定义 operator。
比如&#xff0c;我们可以用 TableAggregateFunction 来实现 TopN。

在这里插入图片描述
TableAggregateFunction 使用也很简单&#xff0c;方法签名和用法如下所示&#xff1a;

用法上&#xff0c;我们只需要调用 table.flatAggregate()&#xff0c;然后传入一个 TableAggregateFunction 实例即可。用户可以继承 TableAggregateFunction 来实现自定义的函数。继承的时候&#xff0c;需要先定义一个 Accumulator&#xff0c;用来存取状态&#xff0c;此外自定义的 TableAggregateFunction 需要实现 accumulate 和 emitValue 方法。accumulate 方法用来处理输入的数据&#xff0c;而 emitValue 方法负责根据 accumulator 里的状态输出结果。

在这里插入图片描述
在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

官方原文 : https://ververica.cn/developers/table-api-programming/


推荐阅读
  • Java序列化对象传给PHP的方法及原理解析
    本文介绍了Java序列化对象传给PHP的方法及原理,包括Java对象传递的方式、序列化的方式、PHP中的序列化用法介绍、Java是否能反序列化PHP的数据、Java序列化的原理以及解决Java序列化中的问题。同时还解释了序列化的概念和作用,以及代码执行序列化所需要的权限。最后指出,序列化会将对象实例的所有字段都进行序列化,使得数据能够被表示为实例的序列化数据,但只有能够解释该格式的代码才能够确定数据的内容。 ... [详细]
  • 如何用UE4制作2D游戏文档——计算篇
    篇首语:本文由编程笔记#小编为大家整理,主要介绍了如何用UE4制作2D游戏文档——计算篇相关的知识,希望对你有一定的参考价值。 ... [详细]
  • Java String与StringBuffer的区别及其应用场景
    本文主要介绍了Java中String和StringBuffer的区别,String是不可变的,而StringBuffer是可变的。StringBuffer在进行字符串处理时不生成新的对象,内存使用上要优于String类。因此,在需要频繁对字符串进行修改的情况下,使用StringBuffer更加适合。同时,文章还介绍了String和StringBuffer的应用场景。 ... [详细]
  • XML介绍与使用的概述及标签规则
    本文介绍了XML的基本概念和用途,包括XML的可扩展性和标签的自定义特性。同时还详细解释了XML标签的规则,包括标签的尖括号和合法标识符的组成,标签必须成对出现的原则以及特殊标签的使用方法。通过本文的阅读,读者可以对XML的基本知识有一个全面的了解。 ... [详细]
  • 闭包一直是Java社区中争论不断的话题,很多语言都支持闭包这个语言特性,闭包定义了一个依赖于外部环境的自由变量的函数,这个函数能够访问外部环境的变量。本文以JavaScript的一个闭包为例,介绍了闭包的定义和特性。 ... [详细]
  • 本文介绍了一些Java开发项目管理工具及其配置教程,包括团队协同工具worktil,版本管理工具GitLab,自动化构建工具Jenkins,项目管理工具Maven和Maven私服Nexus,以及Mybatis的安装和代码自动生成工具。提供了相关链接供读者参考。 ... [详细]
  • 纠正网上的错误:自定义一个类叫java.lang.System/String的方法
    本文纠正了网上关于自定义一个类叫java.lang.System/String的错误答案,并详细解释了为什么这种方法是错误的。作者指出,虽然双亲委托机制确实可以阻止自定义的System类被加载,但通过自定义一个特殊的类加载器,可以绕过双亲委托机制,达到自定义System类的目的。作者呼吁读者对网上的内容持怀疑态度,并带着问题来阅读文章。 ... [详细]
  • 本文整理了Java中java.lang.NoSuchMethodError.getMessage()方法的一些代码示例,展示了NoSuchMethodErr ... [详细]
  • 安卓select模态框样式改变_微软Office风格的多端(Web、安卓、iOS)组件库——Fabric UI...
    介绍FabricUI是微软开源的一套Office风格的多端组件库,共有三套针对性的组件,分别适用于web、android以及iOS,Fab ... [详细]
  • Spring特性实现接口多类的动态调用详解
    本文详细介绍了如何使用Spring特性实现接口多类的动态调用。通过对Spring IoC容器的基础类BeanFactory和ApplicationContext的介绍,以及getBeansOfType方法的应用,解决了在实际工作中遇到的接口及多个实现类的问题。同时,文章还提到了SPI使用的不便之处,并介绍了借助ApplicationContext实现需求的方法。阅读本文,你将了解到Spring特性的实现原理和实际应用方式。 ... [详细]
  • 使用在线工具jsonschema2pojo根据json生成java对象
    本文介绍了使用在线工具jsonschema2pojo根据json生成java对象的方法。通过该工具,用户只需将json字符串复制到输入框中,即可自动将其转换成java对象。该工具还能解析列表式的json数据,并将嵌套在内层的对象也解析出来。本文以请求github的api为例,展示了使用该工具的步骤和效果。 ... [详细]
  • ***byte(字节)根据长度转成kb(千字节)和mb(兆字节)**parambytes*return*publicstaticStringbytes2kb(longbytes){ ... [详细]
  • 本文介绍了在Cpp中将字符串形式的数值转换为int或float等数值类型的方法,主要使用了strtol、strtod和strtoul函数。这些函数可以将以null结尾的字符串转换为long int、double或unsigned long类型的数值,且支持任意进制的字符串转换。相比之下,atoi函数只能转换十进制数值且没有错误返回。 ... [详细]
  • 微信官方授权及获取OpenId的方法,服务器通过SpringBoot实现
    主要步骤:前端获取到code(wx.login),传入服务器服务器通过参数AppID和AppSecret访问官方接口,获取到OpenId ... [详细]
  • 本文介绍了解决java开源项目apache commons email简单使用报错的方法,包括使用正确的JAR包和正确的代码配置,以及相关参数的设置。详细介绍了如何使用apache commons email发送邮件。 ... [详细]
author-avatar
郭原雪2865
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有