本文翻译自官网:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sourceSinks.html
TableSource 提供访问存储在外部系统(数据库、key-value 存款,消息队列)或文件中的数据的权限。TableSource 在 TableEnvironment 中注册后,就可以在 Table API 或 SQL 查询中访问了。
TableSink 将表发送到外部存储系统,例如数据库,key-value 存储,消息队列,或文件系统(使用不同的编码(格式),e.g.: CSV,Parquet,or ORC)。
TableFactory 允许将与外部系统的连接的声明与实际实现分开。TableFactory 从标准化的基于字符串的属性创建 table sources 和 sinks 的已配置实例。可以使用描述符或 SQL客户端的YAML配置文件以编程方式生成属性。
请查看通用 concepts and API 页面,以详细了解如何注册TableSource以及如何通过TableSink发出表。 有关如何使用工厂的示例,请参见 内置的源,接收器和格式页面。
TableSource是一个通用接口,可让Table API和SQL查询访问存储在外部系统中的数据。 它提供了表的 schema 以及与该表的 schema 映射到行的记录。 根据是在流查询还是批处理查询中使用TableSource,记录是作为 DataSet 或 DataStream产生。
如果在流查询中使用TableSource,则必须实现StreamTableSource接口;如果在批处理查询中使用TableSource,则必须实现BatchTableSource接口。 TableSource还可以同时实现两个接口,并且可以在流查询和批处理查询中使用。
StreamTableSource和BatchTableSource扩展了定义以下方法的基本接口TableSource:
TableSource[T] {
def getTableSchema: TableSchema
def getReturnType: TypeInformation[T]
def explainSource: String
}
TableSource接口将逻辑表 schema 与返回的DataStream或DataSet的物理类型分开。 因此,表 schema 的所有字段(getTableSchema())必须映射到具有相应物理返回类型(getReturnType())类型的字段。 默认情况下,此映射是基于字段名称完成的。 例如,一个TableSource用两个字段[name:String,size:Integer]定义一个表 schema,它需要一个TypeInformation,其中至少有两个字段分别称为name和size,类型分别为String和Integer。 这可能是PojoTypeInfo或RowTypeInfo,它们具有两个名为name和size且具有匹配类型的字段。
但是,某些类型(例如Tuple或CaseClass类型)确实支持自定义字段名称。 如果TableSource返回具有固定字段名称类型的DataStream或DataSet,则它可以实现DefinedFieldMapping接口以将表 schema 中的字段名称映射到物理返回类型的字段名称。
BatchTableSource接口扩展了TableSource接口并定义了另一个方法:
BatchTableSource[T] extends TableSource[T] { def getDataSet(execEnv: ExecutionEnvironment): DataSet[T] }
StreamTableSource接口扩展了TableSource接口并定义了另一个方法:
StreamTableSource[T] extends TableSource[T] { def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[T] }
流表API和SQL查询的基于时间的操作(例如窗口聚合或 joins )需要显式指定的时间属性。
TableSource在其表 schema 中将时间属性定义为Types.SQL_TIMESTAMP类型的字段。 与模式中的所有常规字段相反,时间属性不得与表源的返回类型中的物理字段匹配。 相反,TableSource通过实现某个接口来定义时间属性。
处理时间属性通常在流查询中使用。 处理时间属性返回访问该属性的 operator 的当前 wall-clock 时间。 TableSource通过实现DefinedProctimeAttribute接口来定义处理时间属性。 该接口如下所示:
DefinedProctimeAttribute {
def getProctimeAttribute: String
}
注意:StreamTableSource和BatchTableSource都可以实现DefinedProctimeAttribute并定义处理时间属性。 如果是BatchTableSource,则在表扫描期间,使用当前时间戳初始化处理时间字段。
行时间属性是TIMESTAMP类型的属性,在流查询和批处理查询中以统一的方式处理。
可以通过指定SQL_TIMESTAMP类型的表 schema 字段声明为rowtime属性
TableSource通过实现DefinedRowtimeAttributes接口来定义行时间属性。 该接口如下所示:
DefinedRowtimeAttributes {
def getRowtimeAttributeDescriptors: util.List[RowtimeAttributeDescriptor]
}
注意:尽管getRowtimeAttributeDescriptors()方法返回描述符列表,但目前仅支持单个rowtime属性。 我们计划将来删除此限制,并支持具有多个rowtime属性的表。
注意:StreamTableSource和BatchTableSource都可以实现DefinedRowtimeAttributes并定义行时间属性。 无论哪种情况,都使用TimestampExtractor提取rowtime字段。 因此,实现StreamTableSource和BatchTableSource并定义rowtime属性的TableSource为流查询和批处理查询提供了完全相同的数据。
Flink为常见用例提供TimestampExtractor实现。
当前可以使用以下TimestampExtractor实现:
可以通过实现相应的接口来定义自定义TimestampExtractor。
Flink为常见用例提供WatermarkStrategy实现。
当前有以下WatermarkStrategy实现:
可以通过实现相应的接口来定义自定义的WatermarkStrategy。
TableSource通过实现ProjectableTableSource接口来支持投影下推。 该接口定义了一个方法:
ProjectableTableSource[T] {
def projectFields(fields: Array[Int]): TableSource[T]
}
注意:为了使Flink可以将投影下推 table source 与其原始形式区分开,必须重写explainSource方法以包括有关投影字段的信息。
ProjectableTableSource为项目平面字段添加了支持。 如果TableSource定义了具有嵌套模式的表,则可以实现NestedFieldsProjectableTableSource以将投影扩展到嵌套字段。 NestedFieldsProjectableTableSource的定义如下:
NestedFieldsProjectableTableSource[T] {
def projectNestedFields(fields: Array[Int], nestedFields: Array[Array[String]]): TableSource[T]
}
请注意,不得更改投影字段的类型,但未使用的字段可以设置为null或默认值。
FilterableTableSource接口增加了对将过滤器下推到TableSource的支持。 扩展此接口的TableSource能够过滤记录,以使返回的DataStream或DataSet返回较少的记录。
该接口如下所示:
FilterableTableSource[T] {
def applyPredicate(predicates: java.util.List[Expression]): TableSource[T]
def isFilterPushedDown: Boolean
}
注意:为了使Flink能够将过滤器下推 table source 与其原始形式区分开来,必须重写explainSource方法以包括有关下推式过滤器的信息。
注意:这是一项实验功能。 将来的版本中可能会更改接口。 仅Blink planner支持。
LookupableTableSource接口增加了对通过查找方式通过键列访问表的支持。 当用于与维表联接以丰富某些信息时,这非常有用。 如果要在查找模式下使用TableSource,则应在时态表联接语法中使用源。
该接口如下所示:
LookupableTableSource[T] extends TableSource[T] { def getLookupFunction(lookupKeys: Array[String]): TableFunction[T] def getAsyncLookupFunction(lookupKeys: Array[String]): AsyncTableFunction[T] def isAsyncEnabled: Boolean }
TableSink指定如何将表发送到外部系统或位置。 该接口是通用的,因此它可以支持不同的存储位置和格式。 批处理表和流式表有不同的表接收器。
通用接口如下所示:
TableSink[T] { def getOutputType: TypeInformationdef getFieldNames: Array[String] def getFieldTypes: Array[TypeInformation] def configure(fieldNames: Array[String], fieldTypes: Array[TypeInformation]): TableSink[T] }
调用TableSink#configure方法将表的 schema(字段名称和类型)传递给TableSink。 该方法必须返回TableSink的新实例,该实例被配置为发出提供的Table模式。 请注意,提供的TableSchema不应包含计算列以反映物理TableSink的模式。
定义一个外部TableSink以发出一个批处理表。
该接口如下所示:
BatchTableSink[T] extends TableSink[T] { def emitDataSet(dataSet: DataSet[T]): Unit }
定义一个外部TableSink来发出仅具有插入更改的流表。
该接口如下所示:
AppendStreamTableSink[T] extends TableSink[T] { def emitDataStream(dataStream: DataStream[T]): Unit }
如果还通过更新或删除更改来修改表,则将引发TableException。
定义一个外部TableSink以发出具有插入,更新和删除更改的流表。
该接口如下所示:
RetractStreamTableSink[T] extends TableSink[Tuple2[Boolean, T]] { def getRecordType: TypeInformation[T] def emitDataStream(dataStream: DataStream[Tuple2[Boolean, T]]): Unit }
该表将被转换为 accumulate 和撤消消息流,这些消息被编码为Java Tuple2。 第一个字段是指示消息类型的布尔标志(true表示插入,false表示删除)。 第二个字段保存请求的类型的记录。
定义一个外部TableSink以发出具有插入,更新和删除更改的流表。
该接口如下所示:
UpsertStreamTableSink[T] extends TableSink[Tuple2[Boolean, T]] { def setKeyFields(keys: Array[String]): Unit def setIsAppendOnly(isAppendOnly: Boolean): Unit def getRecordType: TypeInformation[T] def emitDataStream(dataStream: DataStream[Tuple2[Boolean, T]]): Unit }
该表必须具有唯一的键字段(原子的或复合的)或 append-only 。 如果表没有唯一键并且不是append-only ,则将引发TableException。 该表的唯一键由UpsertStreamTableSink#setKeyFields()方法配置。
该表将转换为upsert和delete消息流,它们被编码为Java Tuple2。 第一个字段是指示消息类型的布尔标志。 第二个字段保存请求的类型T的记录。
具有 true 布尔值字段的消息是已配置密钥的upsert消息。 带有 false 标志的消息是已配置密钥的删除消息。 如果表是 append-only,则所有消息都将具有true标志,并且必须将其解释为插入。
TableFactory允许从基于字符串的属性创建与表相关的不同实例。 调用所有可用的工厂以匹配给定的属性集和相应的工厂类。
工厂利用 Java’s Service Provider Interfaces(SPI)进行发现。 这意味着每个依赖项和JAR文件都应在 META_INF/services 资源目录中包含一个文件org.apache.flink.table.factories.TableFactory,该文件列出了它提供的所有可用表工厂。
每个表工厂都需要实现以下接口:
package org.apache.flink.table.factories trait TableFactory { def requiredContext(): util.Map[String, String] def supportedProperties(): util.List[String] }
为了创建特定实例,工厂类可以实现org.apache.flink.table.factories中提供的一个或多个接口:
BatchTableSourceFactory: Creates a batch table source.
BatchTableSinkFactory: Creates a batch table sink.
StreamTableSourceFactory: Creates a stream table source.
StreamTableSinkFactory: Creates a stream table sink.
DeserializationSchemaFactory: Creates a deserialization schema format.
SerializationSchemaFactory: Creates a serialization schema format.
工厂的发现分为多个阶段:
Discover all available factories. 发现所有可用的工厂。 Filter by factory class (e.g., StreamTableSourceFactory). 按工厂分类过滤 Filter by matching context. 通过匹配上下文进行过滤。 Filter by supported properties. 按支持的属性过滤。 Verify that exactly one factory matches, otherwise throw an AmbiguousTableFactoryException or NoMatchingTableFactoryException. 验证一个工厂是否完全匹配,否则抛出AmbiguousTableFactoryException或NoMatchingTableFactoryException。
以下示例说明如何为自定义流源提供附加的connector.debug属性标志以进行参数化。
import java.util import org.apache.flink.table.sources.StreamTableSource import org.apache.flink.types.Row class MySystemTableSourceFactory extends StreamTableSourceFactory[Row] { override def requiredContext(): util.Map[String, String] = { val context = new util.HashMap[String, String]() context.put("update-mode", "append") context.put("connector.type", "my-system") context } override def supportedProperties(): util.List[String] = { val properties = new util.ArrayList[String]() properties.add("connector.debug") properties } override def createStreamTableSource(properties: util.Map[String, String]): StreamTableSource[Row] = { val isDebug = java.lang.Boolean.valueOf(properties.get("connector.debug")) # additional validation of the passed properties can also happen here new MySystemAppendTableSource(isDebug) } }
在SQL Client环境文件中,先前提供的工厂可以声明为:
tables: - name: MySystemTable type: source update-mode: append connector: type: my-system debug: true
将YAML文件转换为扁平化的字符串属性,并使用描述与外部系统的连接的那些属性来调用表工厂:
update-mode=append connector.type=my-system connector.debug=true
注意:属性例如table.#.name或tables.#.type是SQL Client的特定属性,不会传递给任何工厂。 根据执行环境的不同,type属性决定是否需要发现BatchTableSourceFactory / StreamTableSourceFactory(对于 source),BatchTableSinkFactory / StreamTableSinkFactory(对于 sink)还是两者都被发现。
对于使用说明性Scaladoc / Javadoc的类型安全的编程方法,Table&SQL API在org.apache.flink.table.descriptor中提供了描述符,这些描述符可转换为基于字符串的属性。 请参阅 sources,sink 和 format 的内置描述符作为参考。
可以通过扩展ConnectorDescriptor类来定义自定义描述符。
import org.apache.flink.table.descriptors.ConnectorDescriptor import java.util.HashMap import java.util.Map /** * Connector to MySystem with debug mode. */ class MySystemConnector(isDebug: Boolean) extends ConnectorDescriptor("my-system", 1, false) { override protected def toConnectorProperties(): Map[String, String] = { val properties = new HashMap[String, String] properties.put("connector.debug", isDebug.toString) properties } } The descriptor can then be used to create a table with the table environment. val tableEnv: StreamTableEnvironment = // ... tableEnv .connect(new MySystemConnector(isDebug = true)) .withSchema(...) .inAppendMode() .createTemporaryTable("MySystemTable")
欢迎关注Flink菜鸟公众号,会不定期更新Flink(开发技术)相关的推文