热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

c++11string转ing_Flink1.11自定义TableSouceamp;Sink变化

Flink1.11VSFlink1.11之前版本01注册Table的方式我们使用FlinkTableAPI&SQL的时候,首先要存在一张表,然后才能去执

                             Flink 1.11    VS   Flink1.11之前版本

01

注册Table的方式

我们使用Flink Table API & SQL的时候,首先要存在一张表,然后才能去执行相应的查询或者插入、更新等SQL语句,那么这张表如何获得呢?

常用的两种方式:

(1)DataStream获取

SingleOutputStreamOperator source = env.addSource(new MySourceFunction())        .name("demo-source") .returns(new RowTypeInfo(Types.SQL_TIMESTAMP, Types.STRING, Types.INT, Types.INT)) .assignTimestampsAndWatermarks(new MydefineWatermark()); tableEnv.registerDataStream("myTble",source,"scoretime.rowtime ,name,score,age"); Table t1 = tableEnv.scan("myTble");

MySourceFunction :自定义Source,用来生成数据

MydefineWatermark:自定义watermark,用来生成水印

上面代码展示了如何从一个datastream到table的转变,在生成table的时候,可以指定相应的rowtime以及proctim时间属性字段,然后将其注册到

StreamTableEnvironment中。

对于DataStream转Table还有额外的接口提供:

Table fromDataStream(DataStream var1); Table fromDataStream(DataStream var1, String var2); void registerDataStream(String var1, DataStream var2); void registerDataStream(String var1, DataStream var2, String var3);

(2)TableSource实现

我们把Kafka的TableSouce拿出来展示:

public abstract class KafkaTableSourceBase implements StreamTableSource, DefinedProctimeAttribute, DefinedRowtimeAttributes, DefinedFieldMapping {

StreamTableSource:流表需要实现的接口

DataStreamgetDataStream(StreamExecutionEnvironment var1);

DefinedRowtimeAttributes:支持event time属性字段需要实现的接口

ListgetRowtimeAttributeDescriptors();

DefinedProctimeAttribute:支持processint time属性字段需要实现的接口

String getProctimeAttribute();

DefinedFieldMapping:字段映射需要实现的接口

Map getFieldMapping();

在这里方便大家去理解,提供一个具有rowtime属性的tablesource的例子:

//自定义table source public static class GeneratorTableSource implements StreamTableSource, DefinedRowtimeAttributes, DefinedFieldMapping { private final int numKeys; private final float recordsPerKeyAndSecond; private final int durationSeconds; private final int offsetSeconds; public GeneratorTableSource(int numKeys, float recordsPerKeyAndSecond, int durationSeconds, int offsetSeconds) { this.numKeys &#61; numKeys; this.recordsPerKeyAndSecond &#61; recordsPerKeyAndSecond; this.durationSeconds &#61; durationSeconds; this.offsetSeconds &#61; offsetSeconds; } &#64;Override public DataStreamgetDataStream(StreamExecutionEnvironment execEnv) {      // 自定义SourceFunction, 功能&#xff1a;      // numKeys &#xff1a;一共生成key的个数      // recordsPerKeyAndSecond &#xff1a;每秒每个key生成数据条数      // durationSeconds &#xff1a;设置执行多长时间 return execEnv.addSource(new Generator(numKeys, recordsPerKeyAndSecond, durationSeconds, offsetSeconds)); } &#64;Override public TypeInformationgetReturnType() {    // 自定义的SourceFunction(Generator)生成数据的TypeInformation return Types.ROW(Types.INT, Types.SQL_TIMESTAMP, Types.STRING); }    // 设置TableSchema &#64;Override public TableSchema getTableSchema() { return new TableSchema( new String[] {"key", "rowtime", "rowtime_string"}, new TypeInformation[] {Types.INT, Types.SQL_TIMESTAMP, Types.STRING}); } &#64;Override public String explainSource() { return "GeneratorTableSource"; }    // 定义ts字段为rowtime属性字段 &#64;Override public ListgetRowtimeAttributeDescriptors() { return Collections.singletonList( new RowtimeAttributeDescriptor( "rowtime", new ExistingField("ts"), new BoundedOutOfOrderTimestamps(0))); }    // table字段映射与dataStream字段关系 &#64;Override public MapgetFieldMapping() { Map mapping &#61; new HashMap<>(); mapping.put("key", "f0"); mapping.put("ts", "f1"); mapping.put("rowtime_string", "f2"); return mapping; } }

02

Table转DataStream

既然DataStream可以转为Table&#xff0c;同理&#xff0c;Flink也为我们提供了反转的接口&#xff0c;如下&#xff1a;

DataStream toAppendStream(Table var1, Class var2); DataStream toAppendStream(Table var1, TypeInformation var2); DataStream toAppendStream(Table var1, Class var2, StreamQueryConfig var3); DataStream toAppendStream(Table var1, TypeInformation var2, StreamQueryConfig var3); DataStream> toRetractStream(Table var1, Class var2); DataStream> toRetractStream(Table var1, TypeInformation var2); DataStream> toRetractStream(Table var1, Class var2, StreamQueryConfig var3); DataStream> toRetractStream(Table var1, TypeInformation var2, StreamQueryConfig var3);

这里具体代码我们不在展开&#xff0c;主要介绍一下Sink的三种模式&#xff1a;

(1)Append: 追加模式&#xff0c;直接往后追加数据&#xff0c;不会变动已经写入的数据&#xff1b;

(2)Upsert: 更新模式&#xff0c;会存在更新已经写入数据的可能&#xff0c;如果变更的数据不存在那么直接写入即可&#xff1b;

(3)Retract: 回撤模式&#xff0c;会产生两条数据&#xff0c;数据格式是Tuple类型,tuple._2是真正的数据&#xff0c;tuple._1是true或者false&#xff0c;新加入的数据是true,之前存在的数据是false

Retract与Upsert也可以从另一方面理解&#xff1a;

Retract是无key的聚合&#xff0c;而 Upsert是有key的聚合

SELECT name, SUM(score) FROM table GROUP BY name;

SELECT COUNT(*) FROM table;

03

Flink 1.11 TableSouce&Sink与之前区别

首先我们看一下Flink 1.11官网为我们提供自定义实现Table Source & Sink的文档&#xff1a;

9714e63a6ab69bf0d031f69cb54cacc5.png

那么Flink 1.11之前是什么样子呢&#xff1f;

51b5671299b946e2c8881826fbc2296d.png

上面我们很容易发现&#xff0c;Flink 1.11之前的TableSource已经变味了Dynamic Table Source&#xff0c; 之前的三种StreamTableSink已经变为了Dynamic Table Sink。

Flink 1.11官网已经为我们提供了一个自定义TableSouce的例子&#xff0c;大家可以进行查看&#xff1a;


https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sourceSinks.html#full-stack-example

我们可以简单看一下dynamic source的架构&#xff1a;

be9551342ad2ad9e79a33978098184dc.png

这里面主要分为三个阶段以及箭头描述了对象的转换。

Metadata: dynamic table的元数据用CatalogTable实例表示&#xff0c;它可以通过DDL语句或者外部catalog提供

Planning&#xff1a;当执行Table程序的各种计划以及优化的时候&#xff0c;需要把CatalogTable转为相应的DynamicTableSource(如select语句)或者DynamicTableSink(如insert into语句)。DynamicTableSourceFactory和DynamicTableSinkFactory提供特定connect的逻辑&#xff0c;用于将CatalogTable的元数据转换为DynamicTableSource和DynamicTableSink的实例&#xff0c;除了这个之外Factory还会对options进行验证&#xff0c;以及提供解码、编码格式等

Runtime: 逻辑计划完成以后&#xff0c;将会从table connect中获得runtime implementation &#xff0c;运行时逻辑是在flink的core connect里面实现的&#xff0c;例如 InputFormat 或者 SourceFunction 。

主要包含了三种runtime&#xff0c;对应着scan table 、lookup table 、table sink:

  (1)ScanRuntimeProvider

  (2)LookupRuntimeProvider,

    (3)SinkRuntimeProvider

15c2139b40cb1f7d4d6f4eca9f966e2d.pngdb858bb9e12a6bda6178a65fb2ab5709.gif

 &#xff01;关注不迷路~ 各种福利、资源定期分享&#xff01;

▼ 更多技术文章 ▼

Flink1.9 新版本WebUI

Flink1.9源码编译

Flink1.9.1编译完成&#xff0c;启动集群时候遇到的问题--NoSuchMethodError排查思路

Fllink无法找到合适的TableFactory的子类问题的两种解决办法

Flink程序遇到Netty Bug

Flink DataStream注册Table中rowtime方法

Flink--Temporal Tables

Flink SQL之planner-blink优化(上)

Flink SQL之planner-blink优化(中)

Flink SQL之planner-blink优化(下)

基于Blink planner的SQL优化---DAG分段优化

Flink SQL---JSON数据源格式(暗藏杀机)

Flink1.9 使用Hive的局限性(期待下一个版本吧)

SQL入手了解Flink SQL的逻辑/物理计划

深入了解Flink SQL不得不知道的Calcite基本知识

Flink之TableSource & TableSink

Flink table api --window函数

KafkaTableSource如何支持Long类型作为rowtime字段的思考

一个 Blink 小白的成长之路

Flink TaskExecutors统一内存配置社区发展方向

Flink on Kubernetes

SQL On Flink / Spark产品化支持

Flink RestFul API--获取集群upload jar源码解析

Flink之状态处理器使用

Flink之Data Type & Serialization

Flink--线性回归(BGD(batch gradient descent))

聊一聊Flink的双流join

Kubernetes--Pod状态与重启策略

解决KafkaConsumer多线程接入不安全问题(spark streaming 消费kafka)

Spark如何实现动态分配Executor数量

spark资源调优

Spark性能优化之Spark Tungsten内存介绍

Spark特征处理之RFormula源码解析

记一次spark源码的bug排查,顺便简单看一下spark sql底层引擎catalyst处理的流程

Spark的Ml pipeline

Spark特征提取之TF-IDF

MapReduce Shuffle 和 Spark Shuffle 区别看这篇就够了

Spark -- HistoryServer

spark机器学习库评估指标总结

在家办公这些天整理的Kafka知识点大全

kafka管理工具 | 监控 | 偏移量 | 消费| offset&#xff0c;必备技能

幂等性与事务性Producer区别(Kafka)

Superset安装教程

集群灾难---HA模式重要性




推荐阅读
author-avatar
漫漫瞹fyS-1908
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有