一 .什么是 Table API
为了更好地了解 Table API,我们先看下 Flink 都提供了哪些 API 供用户使用。
1.1. Flink API 总览
Flink 根据使用的便捷性和表达能力的强弱提供了 3 层 API,由上到下,表达能力逐渐增强,比如 processFunction,是最底层的 API,表达能力最强,我们可以用他来操作 state 和 timer 等复杂功能。Datastream API 相对于 processFunction 来说,又进行了进一步封装,提供了很多标准的语义算子给大家使用,比如我们常用的 window 算子(包括 Tumble, slide,session 等)。那么最上面的 SQL 和 Table API 使用最为便捷,具有自身的很多特点,重点归纳如下:
-
第一: Table API & SQL 是一种声明式的 API。用户只需关心做什么,不用关心怎么做,比如图中的 WordCount 例子,只需要关心按什么维度聚合,做哪种类型的聚合,不需要关心底层的实现。
-
第二: 高性能。Table API & SQL 底层会有优化器对 query 进行优化。举个例子,假如 WordCount 的例子里写了两个 count 操作,优化器会识别并避免重复的计算,计算的时候只保留一个 count 操作,输出的时候再把相同的值输出两遍即可,以达到更好的性能。
-
第三: 流批统一。上图例子可以发现,API 并没有区分流和批,同一套 query 可以流批复用,对业务开发来说,避免开发两套代码。
-
第四: 标准稳定。Table API & SQL 遵循 SQL 标准,不易变动。API 比较稳定的好处是不用考虑 API 兼容性问题。
-
第五: 易理解。语义明确,所见即所得。
1.2. Table API 特性
Table API 自身的特性。主要可以归纳为以下两点:
- 第一,Table API 使得多声明的数据处理写起来比较容易。
比如我们有一个 Table&#xff08;tab&#xff09;&#xff0c;并且需要执行一些过滤操作然后输出到结果表&#xff0c;对应的实现是&#xff1a;tab.where(“a <10”).inertInto(“resultTable1”)&#xff1b;此外&#xff0c;我们还需要做另外一些筛选&#xff0c;然后也对结果输出&#xff0c;即 tab.where(“a > 100”).insertInto(“resultTable2”)。你会发现&#xff0c;用 Table API 写起来会非常简洁方便&#xff0c;两行代码就把功能实现了。
- 第二&#xff0c;Table API 是 Flink 自身的一套 API&#xff0c;这使得我们更容易地去扩展标准的 SQL。
当然&#xff0c;在扩展 SQL 的时候并不是随意的去扩展&#xff0c;需要考虑 API 的语义、原子性和正交性&#xff0c;并且当且仅当需要的时候才去添加。
对比 SQL&#xff0c;我们可以认为 Table API 是 SQL 的超集。SQL 有的操作&#xff0c;Table API 可以有&#xff0c;然而我们又可以从易用性和功能性地角度对 SQL 进行扩展和提升。
二 .Table API编程
我们来看下如何用 Table API 来编程。本章会先从一个 WordCount 的例子出发&#xff0c;让大家对 Table API 编程先有一个大概的认识&#xff0c;然后再具体介绍一下 Table API 的操作&#xff0c;比如&#xff0c;如何获取一个 Table&#xff0c;如何输出一个 Table&#xff0c;以及如何对 Table 执行查询操作。
2.1. WordCount举例
这是一个完整的&#xff0c;用 java 编写的 batch 版本的 WordCount 例子&#xff0c;此外&#xff0c;还有 scala 和 streaming 版本的 WordCount&#xff0c;都统一上传到了 GitHub 上&#xff08;https://github.com/hequn8128/TableApiDemo&#xff09;&#xff0c;大家可以下载下来尝试运行或者修改。
package org.apache.flink.table.api.example.stream;import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.FileSystem;
import org.apache.flink.table.descriptors.OldCsv;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.types.Row;public class JavaStreamWordCount {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env &#61; StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tEnv &#61; TableEnvironment.getTableEnvironment(env);String path &#61; JavaStreamWordCount.class.getClassLoader().getResource("words.txt").getPath();tEnv.connect(new FileSystem().path(path)).withFormat(new OldCsv().field("word", Types.STRING).lineDelimiter("\n")).withSchema(new Schema().field("word", Types.STRING)).inAppendMode().registerTableSource("fileSource");Table result &#61; tEnv.scan("fileSource").groupBy("word").select("word, count(1) as count");tEnv.toRetractStream(result, Row.class).print();env.execute();}
}
我们具体看下这个 WordCount 的例子。首先是对 environment 的一些初始化&#xff0c;先通过 ExecutionEnvironment 的 getExecutionEnvironment 方法拿到执行环境&#xff0c;然后再通过 BatchTableEnvironment 的 create 拿到对应的 Table 环境&#xff0c;拿到环境后&#xff0c;我们可以注册 TableSource、TableSink 或执行一些其他操作。
这里需要注意的是&#xff0c;ExecutionEnvironment 跟 BatchTableEnvironment 都是对应 Java 的版本&#xff0c;对于 scala 程序&#xff0c;这里需要是一个对应 scala 版本的 environment。这也是初学者一开始可能会遇到的问题&#xff0c;因为 environent 有很多且容易混淆。为了让大家更好区分这些 environment&#xff0c;下面对 environment 进行了一些归纳。
这里从 batch/stream&#xff0c;还有 Java/scala&#xff0c;对 environment 进行了分类&#xff0c;对于这些 environment 使用时需要特别注意&#xff0c;不要 import 错了。environment 的问题&#xff0c;社区已经进行了一些讨论&#xff0c;如上图下方的链接&#xff0c;这里不再具体展开。
我们再回到刚刚的 WordCount 的例子&#xff0c;拿到 environment 后&#xff0c;需要做的第二件事情是注册对应的TableSource。
tEnv.connect(new FileSystem().path(path)).withFormat(new OldCsv().field("word", Types.STRING).lineDelimiter("\n")).withSchema(new Schema().field("word", Types.STRING)).registerTableSource("fileSource");
使用起来也非常方便&#xff0c;首先&#xff0c;因为我们要读一个文件&#xff0c;需要指定读取文件的路径&#xff0c;指定了之后&#xff0c;我们需要再描述文件内容的格式&#xff0c;比如他是 csv 的文件并且行分割符是什么。还有就是指定这个文件对应的 Schema 是什么&#xff0c;比如只有一列单词&#xff0c;并且类型是 String。最后&#xff0c;我们需要把 TableSource 注册到 environment 里去。
Table result &#61; tEnv.scan("fileSource").groupBy("word").select("word, count(1) as count");tEnv.toDataSet(result, Row.class).print();
通过 scan 刚才注册好的 TableSource&#xff0c;我们可以拿到一个 Table 对象&#xff0c;并执行相应的一些操作&#xff0c;比如 GroupBy&#xff0c;count。最后&#xff0c;可以把 Table 按 DataSet 的方式进行输出。
以上便是一个 Table API 的 WordCount 完整例子。涉及 Table 的获取&#xff0c;Table 的操作&#xff0c;以及 Table 的输出。接下来会具体介绍如何获取 Table、输出 Table 和执行 Table 操作。
2.2. 如何获取一个Table
获取 Table 大体可以分为两步&#xff0c;
- 第一步&#xff0c;注册对应的 TableSource&#xff1b;
- 第二步&#xff0c;调用 Table environement 的 scan 方法获取 Table 对象。
注册 Table Source 又有3种方法&#xff1a;通过 Table descriptor 来注册&#xff0c;通过自定义 source 来注册&#xff0c;或者通过 DataStream 来注册。
具体的注册方式如下图所示&#xff1a;
2.3. 如何输出一个Table
对应输出 Table&#xff0c;我们也有类似的3种方法&#xff1a;Table descriptor, 自定义 Table sink 以及输出成一个 DataStream。
如下图所示&#xff1a;
2.4. 如何操作一个Table
2.4.1. Table 操作总览
Table 上有很多操作&#xff0c;比如一些 projection 操作 select、filter、where&#xff1b;聚合操作&#xff0c;如 groupBy、flatAggrgate&#xff1b;还有join操作&#xff0c;等等。
我们以一个具体的例子来介绍下 Table 上各操作的转换流程。
当我们拿到一个 Table 后&#xff0c;调用 groupBy 会返回一个 GroupedTable。
GroupedTable 里只有 select 方法&#xff0c;对 GroupedTable 调用 select 方法会返回一个 Table。
拿到这个 Table 后&#xff0c;我们可以再调用 Table 上的方法。
图中其他 Table&#xff0c;如 OverWindowedTable 也是类似的流程。
值得注意的是&#xff0c;引入各个类型的 Table 是为了保证 API 的合法性和便利性&#xff0c;比如 groupBy 之后只有 select 操作是有意义的&#xff0c;在编辑器上可以直接点出来。
前面我们提到&#xff0c;可以将 Table API 看成是 SQL 的超集&#xff0c;因此我们也可以对 Table 里的操作按此进行分类&#xff0c;大致分为三类&#xff0c;如下图所示&#xff1a;
- 第一类&#xff0c;是跟 SQL 对齐的一些操作&#xff0c;比如 select, filter, join 等。
- 第二类&#xff0c;是一些提升 Table API 易用性的操作。
- 第三类&#xff0c;是增强 Table API 功能的一些操作。
第一类操作由于和 SQL 类似&#xff0c;比较容易理解&#xff0c;其次&#xff0c;也可以查看官方的文档&#xff0c;了解具体的方法&#xff0c;所以这里不再展开介绍。
2.4.2. 提升易用性相关操作
介绍易用性之前&#xff0c;我们先来看一个问题。假设我们有一张很大的表&#xff0c;里面有一百列&#xff0c;此时需要去掉一列&#xff0c;那么SQL怎么写&#xff1f;我们需要 select 剩下的 99 列&#xff01;显然这会给用户带来不小的代价。为了解决这个问题&#xff0c;我们在Table上引入了一个 dropColumns 方法。利用 dropColumns 方法&#xff0c;我们便可以只写去掉的列。与此对应&#xff0c;还引入了 addColumns, addOrReplaceColumns 和 renameColumns 方法&#xff0c;如下图所示&#xff1a;
我们再看下面另一个问题&#xff1a;假设还是一张100列的表&#xff0c;我们需要选第20到第80列&#xff0c;那么我们如何操作呢&#xff1f;为了解决这个问题&#xff0c;我们又引入了 withColumns 和 withoutColumns 方法。对于刚才的问题&#xff0c;我们可以简单地写成 table.select(“withColumns(20 to 80)”)。
2.4.3. 增强功能相关操作
该小节会介绍下 TableAggregateFunction 的功能和用法。
在引入 TableAggregateFunction 之前&#xff0c;Flink 里有三种自定义函数&#xff1a;ScalarFunction&#xff0c;TableFunction 和 AggregateFunction。我们可以从输入和输出的维度对这些自定义函数进行分类。
如下图所示&#xff0c;ScalarFunction 是输入一行&#xff0c;输出一行&#xff1b;TableFunction 是输入一行&#xff0c;输出多行&#xff1b;AggregateFunction 是输入多行输出一行。
为了让语义更加完整&#xff0c;Table API 新加了 TableAggregateFunction&#xff0c;它可以接收和输出多行。
TableAggregateFunction 添加后&#xff0c;Table API 的功能可以得到很大的扩展&#xff0c;某种程度上可以用它来实现自定义 operator。
比如&#xff0c;我们可以用 TableAggregateFunction 来实现 TopN。
TableAggregateFunction 使用也很简单&#xff0c;方法签名和用法如下所示&#xff1a;
用法上&#xff0c;我们只需要调用 table.flatAggregate()&#xff0c;然后传入一个 TableAggregateFunction 实例即可。用户可以继承 TableAggregateFunction 来实现自定义的函数。继承的时候&#xff0c;需要先定义一个 Accumulator&#xff0c;用来存取状态&#xff0c;此外自定义的 TableAggregateFunction 需要实现 accumulate 和 emitValue 方法。accumulate 方法用来处理输入的数据&#xff0c;而 emitValue 方法负责根据 accumulator 里的状态输出结果。
官方原文 : https://ververica.cn/developers/table-api-programming/