热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

c++11string转ing_Flink1.11自定义TableSouceamp;Sink变化

Flink1.11VSFlink1.11之前版本01注册Table的方式我们使用FlinkTableAPI&SQL的时候,首先要存在一张表,然后才能去执

                             Flink 1.11    VS   Flink1.11之前版本

01

注册Table的方式

我们使用Flink Table API & SQL的时候,首先要存在一张表,然后才能去执行相应的查询或者插入、更新等SQL语句,那么这张表如何获得呢?

常用的两种方式:

(1)DataStream获取

SingleOutputStreamOperator source = env.addSource(new MySourceFunction())        .name("demo-source") .returns(new RowTypeInfo(Types.SQL_TIMESTAMP, Types.STRING, Types.INT, Types.INT)) .assignTimestampsAndWatermarks(new MydefineWatermark()); tableEnv.registerDataStream("myTble",source,"scoretime.rowtime ,name,score,age"); Table t1 = tableEnv.scan("myTble");

MySourceFunction :自定义Source,用来生成数据

MydefineWatermark:自定义watermark,用来生成水印

上面代码展示了如何从一个datastream到table的转变,在生成table的时候,可以指定相应的rowtime以及proctim时间属性字段,然后将其注册到

StreamTableEnvironment中。

对于DataStream转Table还有额外的接口提供:

Table fromDataStream(DataStream var1); Table fromDataStream(DataStream var1, String var2); void registerDataStream(String var1, DataStream var2); void registerDataStream(String var1, DataStream var2, String var3);

(2)TableSource实现

我们把Kafka的TableSouce拿出来展示:

public abstract class KafkaTableSourceBase implements StreamTableSource, DefinedProctimeAttribute, DefinedRowtimeAttributes, DefinedFieldMapping {

StreamTableSource:流表需要实现的接口

DataStreamgetDataStream(StreamExecutionEnvironment var1);

DefinedRowtimeAttributes:支持event time属性字段需要实现的接口

ListgetRowtimeAttributeDescriptors();

DefinedProctimeAttribute:支持processint time属性字段需要实现的接口

String getProctimeAttribute();

DefinedFieldMapping:字段映射需要实现的接口

Map getFieldMapping();

在这里方便大家去理解,提供一个具有rowtime属性的tablesource的例子:

//自定义table source public static class GeneratorTableSource implements StreamTableSource, DefinedRowtimeAttributes, DefinedFieldMapping { private final int numKeys; private final float recordsPerKeyAndSecond; private final int durationSeconds; private final int offsetSeconds; public GeneratorTableSource(int numKeys, float recordsPerKeyAndSecond, int durationSeconds, int offsetSeconds) { this.numKeys &#61; numKeys; this.recordsPerKeyAndSecond &#61; recordsPerKeyAndSecond; this.durationSeconds &#61; durationSeconds; this.offsetSeconds &#61; offsetSeconds; } &#64;Override public DataStreamgetDataStream(StreamExecutionEnvironment execEnv) {      // 自定义SourceFunction, 功能&#xff1a;      // numKeys &#xff1a;一共生成key的个数      // recordsPerKeyAndSecond &#xff1a;每秒每个key生成数据条数      // durationSeconds &#xff1a;设置执行多长时间 return execEnv.addSource(new Generator(numKeys, recordsPerKeyAndSecond, durationSeconds, offsetSeconds)); } &#64;Override public TypeInformationgetReturnType() {    // 自定义的SourceFunction(Generator)生成数据的TypeInformation return Types.ROW(Types.INT, Types.SQL_TIMESTAMP, Types.STRING); }    // 设置TableSchema &#64;Override public TableSchema getTableSchema() { return new TableSchema( new String[] {"key", "rowtime", "rowtime_string"}, new TypeInformation[] {Types.INT, Types.SQL_TIMESTAMP, Types.STRING}); } &#64;Override public String explainSource() { return "GeneratorTableSource"; }    // 定义ts字段为rowtime属性字段 &#64;Override public ListgetRowtimeAttributeDescriptors() { return Collections.singletonList( new RowtimeAttributeDescriptor( "rowtime", new ExistingField("ts"), new BoundedOutOfOrderTimestamps(0))); }    // table字段映射与dataStream字段关系 &#64;Override public MapgetFieldMapping() { Map mapping &#61; new HashMap<>(); mapping.put("key", "f0"); mapping.put("ts", "f1"); mapping.put("rowtime_string", "f2"); return mapping; } }

02

Table转DataStream

既然DataStream可以转为Table&#xff0c;同理&#xff0c;Flink也为我们提供了反转的接口&#xff0c;如下&#xff1a;

DataStream toAppendStream(Table var1, Class var2); DataStream toAppendStream(Table var1, TypeInformation var2); DataStream toAppendStream(Table var1, Class var2, StreamQueryConfig var3); DataStream toAppendStream(Table var1, TypeInformation var2, StreamQueryConfig var3); DataStream> toRetractStream(Table var1, Class var2); DataStream> toRetractStream(Table var1, TypeInformation var2); DataStream> toRetractStream(Table var1, Class var2, StreamQueryConfig var3); DataStream> toRetractStream(Table var1, TypeInformation var2, StreamQueryConfig var3);

这里具体代码我们不在展开&#xff0c;主要介绍一下Sink的三种模式&#xff1a;

(1)Append: 追加模式&#xff0c;直接往后追加数据&#xff0c;不会变动已经写入的数据&#xff1b;

(2)Upsert: 更新模式&#xff0c;会存在更新已经写入数据的可能&#xff0c;如果变更的数据不存在那么直接写入即可&#xff1b;

(3)Retract: 回撤模式&#xff0c;会产生两条数据&#xff0c;数据格式是Tuple类型,tuple._2是真正的数据&#xff0c;tuple._1是true或者false&#xff0c;新加入的数据是true,之前存在的数据是false

Retract与Upsert也可以从另一方面理解&#xff1a;

Retract是无key的聚合&#xff0c;而 Upsert是有key的聚合

SELECT name, SUM(score) FROM table GROUP BY name;

SELECT COUNT(*) FROM table;

03

Flink 1.11 TableSouce&Sink与之前区别

首先我们看一下Flink 1.11官网为我们提供自定义实现Table Source & Sink的文档&#xff1a;

9714e63a6ab69bf0d031f69cb54cacc5.png

那么Flink 1.11之前是什么样子呢&#xff1f;

51b5671299b946e2c8881826fbc2296d.png

上面我们很容易发现&#xff0c;Flink 1.11之前的TableSource已经变味了Dynamic Table Source&#xff0c; 之前的三种StreamTableSink已经变为了Dynamic Table Sink。

Flink 1.11官网已经为我们提供了一个自定义TableSouce的例子&#xff0c;大家可以进行查看&#xff1a;


https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sourceSinks.html#full-stack-example

我们可以简单看一下dynamic source的架构&#xff1a;

be9551342ad2ad9e79a33978098184dc.png

这里面主要分为三个阶段以及箭头描述了对象的转换。

Metadata: dynamic table的元数据用CatalogTable实例表示&#xff0c;它可以通过DDL语句或者外部catalog提供

Planning&#xff1a;当执行Table程序的各种计划以及优化的时候&#xff0c;需要把CatalogTable转为相应的DynamicTableSource(如select语句)或者DynamicTableSink(如insert into语句)。DynamicTableSourceFactory和DynamicTableSinkFactory提供特定connect的逻辑&#xff0c;用于将CatalogTable的元数据转换为DynamicTableSource和DynamicTableSink的实例&#xff0c;除了这个之外Factory还会对options进行验证&#xff0c;以及提供解码、编码格式等

Runtime: 逻辑计划完成以后&#xff0c;将会从table connect中获得runtime implementation &#xff0c;运行时逻辑是在flink的core connect里面实现的&#xff0c;例如 InputFormat 或者 SourceFunction 。

主要包含了三种runtime&#xff0c;对应着scan table 、lookup table 、table sink:

  (1)ScanRuntimeProvider

  (2)LookupRuntimeProvider,

    (3)SinkRuntimeProvider

15c2139b40cb1f7d4d6f4eca9f966e2d.pngdb858bb9e12a6bda6178a65fb2ab5709.gif

 &#xff01;关注不迷路~ 各种福利、资源定期分享&#xff01;

▼ 更多技术文章 ▼

Flink1.9 新版本WebUI

Flink1.9源码编译

Flink1.9.1编译完成&#xff0c;启动集群时候遇到的问题--NoSuchMethodError排查思路

Fllink无法找到合适的TableFactory的子类问题的两种解决办法

Flink程序遇到Netty Bug

Flink DataStream注册Table中rowtime方法

Flink--Temporal Tables

Flink SQL之planner-blink优化(上)

Flink SQL之planner-blink优化(中)

Flink SQL之planner-blink优化(下)

基于Blink planner的SQL优化---DAG分段优化

Flink SQL---JSON数据源格式(暗藏杀机)

Flink1.9 使用Hive的局限性(期待下一个版本吧)

SQL入手了解Flink SQL的逻辑/物理计划

深入了解Flink SQL不得不知道的Calcite基本知识

Flink之TableSource & TableSink

Flink table api --window函数

KafkaTableSource如何支持Long类型作为rowtime字段的思考

一个 Blink 小白的成长之路

Flink TaskExecutors统一内存配置社区发展方向

Flink on Kubernetes

SQL On Flink / Spark产品化支持

Flink RestFul API--获取集群upload jar源码解析

Flink之状态处理器使用

Flink之Data Type & Serialization

Flink--线性回归(BGD(batch gradient descent))

聊一聊Flink的双流join

Kubernetes--Pod状态与重启策略

解决KafkaConsumer多线程接入不安全问题(spark streaming 消费kafka)

Spark如何实现动态分配Executor数量

spark资源调优

Spark性能优化之Spark Tungsten内存介绍

Spark特征处理之RFormula源码解析

记一次spark源码的bug排查,顺便简单看一下spark sql底层引擎catalyst处理的流程

Spark的Ml pipeline

Spark特征提取之TF-IDF

MapReduce Shuffle 和 Spark Shuffle 区别看这篇就够了

Spark -- HistoryServer

spark机器学习库评估指标总结

在家办公这些天整理的Kafka知识点大全

kafka管理工具 | 监控 | 偏移量 | 消费| offset&#xff0c;必备技能

幂等性与事务性Producer区别(Kafka)

Superset安装教程

集群灾难---HA模式重要性




推荐阅读
  • Android系统源码分析Zygote和SystemServer启动过程详解
    本文详细解析了Android系统源码中Zygote和SystemServer的启动过程。首先介绍了系统framework层启动的内容,帮助理解四大组件的启动和管理过程。接着介绍了AMS、PMS等系统服务的作用和调用方式。然后详细分析了Zygote的启动过程,解释了Zygote在Android启动过程中的决定作用。最后通过时序图展示了整个过程。 ... [详细]
  • Java学习笔记之面向对象编程(OOP)
    本文介绍了Java学习笔记中的面向对象编程(OOP)内容,包括OOP的三大特性(封装、继承、多态)和五大原则(单一职责原则、开放封闭原则、里式替换原则、依赖倒置原则)。通过学习OOP,可以提高代码复用性、拓展性和安全性。 ... [详细]
  • 1Lock与ReadWriteLock1.1LockpublicinterfaceLock{voidlock();voidlockInterruptibl ... [详细]
  • 如何自行分析定位SAP BSP错误
    The“BSPtag”Imentionedintheblogtitlemeansforexamplethetagchtmlb:configCelleratorbelowwhichi ... [详细]
  • Java太阳系小游戏分析和源码详解
    本文介绍了一个基于Java的太阳系小游戏的分析和源码详解。通过对面向对象的知识的学习和实践,作者实现了太阳系各行星绕太阳转的效果。文章详细介绍了游戏的设计思路和源码结构,包括工具类、常量、图片加载、面板等。通过这个小游戏的制作,读者可以巩固和应用所学的知识,如类的继承、方法的重载与重写、多态和封装等。 ... [详细]
  • SpringBoot uri统一权限管理的实现方法及步骤详解
    本文详细介绍了SpringBoot中实现uri统一权限管理的方法,包括表结构定义、自动统计URI并自动删除脏数据、程序启动加载等步骤。通过该方法可以提高系统的安全性,实现对系统任意接口的权限拦截验证。 ... [详细]
  • JavaSE笔试题-接口、抽象类、多态等问题解答
    本文解答了JavaSE笔试题中关于接口、抽象类、多态等问题。包括Math类的取整数方法、接口是否可继承、抽象类是否可实现接口、抽象类是否可继承具体类、抽象类中是否可以有静态main方法等问题。同时介绍了面向对象的特征,以及Java中实现多态的机制。 ... [详细]
  • C# 7.0 新特性:基于Tuple的“多”返回值方法
    本文介绍了C# 7.0中基于Tuple的“多”返回值方法的使用。通过对C# 6.0及更早版本的做法进行回顾,提出了问题:如何使一个方法可返回多个返回值。然后详细介绍了C# 7.0中使用Tuple的写法,并给出了示例代码。最后,总结了该新特性的优点。 ... [详细]
  • 本文详细介绍了Java中vector的使用方法和相关知识,包括vector类的功能、构造方法和使用注意事项。通过使用vector类,可以方便地实现动态数组的功能,并且可以随意插入不同类型的对象,进行查找、插入和删除操作。这篇文章对于需要频繁进行查找、插入和删除操作的情况下,使用vector类是一个很好的选择。 ... [详细]
  • 从零学Java(10)之方法详解,喷打野你真的没我6!
    本文介绍了从零学Java系列中的第10篇文章,详解了Java中的方法。同时讨论了打野过程中喷打野的影响,以及金色打野刀对经济的增加和线上队友经济的影响。指出喷打野会导致线上经济的消减和影响队伍的团结。 ... [详细]
  • 本文介绍了iOS数据库Sqlite的SQL语句分类和常见约束关键字。SQL语句分为DDL、DML和DQL三种类型,其中DDL语句用于定义、删除和修改数据表,关键字包括create、drop和alter。常见约束关键字包括if not exists、if exists、primary key、autoincrement、not null和default。此外,还介绍了常见的数据库数据类型,包括integer、text和real。 ... [详细]
  • 本文介绍了机器学习手册中关于日期和时区操作的重要性以及其在实际应用中的作用。文章以一个故事为背景,描述了学童们面对老先生的教导时的反应,以及上官如在这个过程中的表现。同时,文章也提到了顾慎为对上官如的恨意以及他们之间的矛盾源于早年的结局。最后,文章强调了日期和时区操作在机器学习中的重要性,并指出了其在实际应用中的作用和意义。 ... [详细]
  • Java中包装类的设计原因以及操作方法
    本文主要介绍了Java中设计包装类的原因以及操作方法。在Java中,除了对象类型,还有八大基本类型,为了将基本类型转换成对象,Java引入了包装类。文章通过介绍包装类的定义和实现,解答了为什么需要包装类的问题,并提供了简单易用的操作方法。通过本文的学习,读者可以更好地理解和应用Java中的包装类。 ... [详细]
  • 先看官方文档TheJavaTutorialshavebeenwrittenforJDK8.Examplesandpracticesdescribedinthispagedontta ... [详细]
  • Week04面向对象设计与继承学习总结及作业要求
    本文总结了Week04面向对象设计与继承的重要知识点,包括对象、类、封装性、静态属性、静态方法、重载、继承和多态等。同时,还介绍了私有构造函数在类外部无法被调用、static不能访问非静态属性以及该类实例可以共享类里的static属性等内容。此外,还提到了作业要求,包括讲述一个在网上商城购物或在班级博客进行学习的故事,并使用Markdown的加粗标记和语句块标记标注关键名词和动词。最后,还提到了参考资料中关于UML类图如何绘制的范例。 ... [详细]
author-avatar
漫漫瞹fyS-1908
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有