热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

【源码】Flink三层图结构——StreamGraph生成前准备Transformation

最近一直在看StreamGraph生成的源码,刚好有点思路,准备动手了发现,如果不说下Transformation后面的StreamGraph会差比较多意思,所以先做点铺垫。##T

最近一直在看 StreamGraph 生成的源码,刚好有点思路,准备动手了发现,
如果不说下 Transformation 后面的 StreamGraph 会差比较多意思,
所以先做点铺垫。


## Transformation

Transformation 类是 Flink 转换算子的基类,实现类有下面这些

AbstractMultipleInputTransformation
CoFeedbackTransformation
FeedbackTransformation
KeyedMultipleInputTransformation
LegacySourceTransformation
MultipleInputTransformation
OneInputTransformation
PartitionTransformation
PhysicalTransformation
SelectTransformation
SideOutputTransformation
SinkTransformation
SourceTransformation
SplitTransformation
TwoInputTransformation
UnionTransformation

类图:

 

从这些 Transformation 中也可以看出Flink 支持的转换类型: Source、Sink、一个输入、两个输入、多个输入、Union、侧输出、Select、分区 等转换操作


## source Transformation 的起始

env.addSource(new SimpleStringSource)

调用 StreamExecutionEnvironment.scala 的 addSource 方法

def addSource[T: TypeInformation](function: SourceFunction[T]): DataStream[T] = {
require(function != null, "Function must not be null.")

val cleanFun = scalaClean(function)
val typeInfo = implicitly[TypeInformation[T]]
asScalaStream(javaEnv.addSource(cleanFun, typeInfo))
}

然后调用 javaEnv.addSource 方法

StreamExecutionEnvironment.java

public DataStreamSource addSource(SourceFunction function, TypeInformation typeInfo) {
return addSource(function, "Custom Source", typeInfo);
}
public DataStreamSource addSource(SourceFunction function, String sourceName, TypeInformation typeInfo) {
TypeInformation resolvedTypeInfo = getTypeInfo(function, sourceName, SourceFunction.class, typeInfo);
boolean isParallel = function instanceof ParallelSourceFunction;
clean(function);
// 创建 StreamSource
final StreamSource sourceOperator = new StreamSource<>(function);

// 使用 StreamSource 创建 DataStreamSource 同时创建 Source 的Transformation 了, this 指 env
return new DataStreamSource<>(this, resolvedTypeInfo, sourceOperator, isParallel, sourceName);
}

DataStreamSource.java 使用输入的 sourceName, operator, outTypeInfo, Parallelism 创建 LegacySourceTransformation

public DataStreamSource(
StreamExecutionEnvironment environment,
TypeInformation outTypeInfo,
StreamSource operator,
boolean isParallel,
String sourceName) {
super(environment, new LegacySourceTransformation<>(sourceName, operator, outTypeInfo, environment.getParallelism()));
this.isParallel = isParallel;
if (!isParallel) {
setParallelism(1);
}
}

最终调用到 DataStream.java 的 DataStream 方法,将生成的 LegacySourceTransformation 放入到 DataStream 中

public DataStream(StreamExecutionEnvironment environment, Transformation transformation) {
this.envirOnment= Preconditions.checkNotNull(environment, "Execution Environment must not be null.");
this.transformation = Preconditions.checkNotNull(transformation, "Stream Transformation must not be null.");
}

addSource 返回一个 DataStreamSource ,transformation 是 LegacySourceTransformation,并携带 StreamExecutionEnvironment 对象,继续后面算子的调用


## map 算子看 Transformation

stream
.map(str => str)

代码执行到 map 这一行时,会调用到 DataStream.scala 的 map 方法

def map[R: TypeInformation](fun: T => R): DataStream[R] = {
if (fun == null) {
throw new NullPointerException("Map function must not be null.")
}
val cleanFun = clean(fun)
val mapper = new MapFunction[T, R] {
def map(in: T): R = cleanFun(in)
}
// 又调用 map
map(mapper)
}
def map[R: TypeInformation](mapper: MapFunction[T, R]): DataStream[R] = {
if (mapper == null) {
throw new NullPointerException("Map function must not be null.")
}
val outType : TypeInformation[R] = implicitly[TypeInformation[R]]
// stream.map 调用到 DataStream.java 中了
asScalaStream(stream.map(mapper, outType).asInstanceOf[JavaStream[R]])
}

注: Flink 主要功能还是在 Java 代码中, Scala Api 就像个外壳,用 Scala 包装了一下,方便 Scala 代码调用,实际上还是会调用到 Java 代码上去

DataStream.java 的 map 方法

这里调用 transform 方法,要构建 Transformation 了,对于这个测试的写法来说, outputType 是 "String", Transformation 名是 "Map"

SimpleOperatorFactory.of(operator)) 获取的工厂类是: SimpleUdfStreamOperatorFactory str => str 就是 Udf

public SingleOutputStreamOperator map(MapFunction mapper, TypeInformation outputType) {
// 可以看到 transform ,这里的 outputType 是 String 了
return transform("Map", outputType, new StreamMap<>(clean(mapper)));
}
public SingleOutputStreamOperator transform(
String operatorName,
TypeInformation outTypeInfo,
OneInputStreamOperator operator) {
return doTransform(operatorName, outTypeInfo, SimpleOperatorFactory.of(operator));
}

 

DataStream.java 的 doTransform 方法创建 map 算子对于的 OneInputTransformation, 同时创建一个新的 DataStream: SingleOutputStreamOperator

// 真正创建 Transformation
protected SingleOutputStreamOperator doTransform(
String operatorName,
TypeInformation outTypeInfo,
StreamOperatorFactory operatorFactory) {
// read the output type of the input Transform to coax out errors about MissingTypeInfo
// 检验和设置 transformation 输出类型
transformation.getOutputType();
// 创建 一个输入的 Transformation, this.transformation 上一算子的 Transformation 做为 当前算子的 输入 Transformation
OneInputTransformation resultTransform = new OneInputTransformation<>(
this.transformation,
operatorName,
operatorFactory,
outTypeInfo,
environment.getParallelism());
@SuppressWarnings({"unchecked", "rawtypes"})
// 创建 一个输出的 StreamOperator 也是 DataStream, 也 携带 environment
SingleOutputStreamOperator returnStream = new SingleOutputStreamOperator(environment, resultTransform);
// 讲 创建的 Transformation 放到 ExecutionEnvironment 的 transformations 列表中
getExecutionEnvironment().addOperator(resultTransform);
// 返回 SingleOutputStreamOperator
return returnStream;
}

 

所以,执行完 map 后,返回的也是一个新的 DataStream,这不像有些用户,objectA.methodA().methodB() 每次都返回原来的 objectA


## sink Transformation 的起始

public DataStreamSink addSink(SinkFunction sinkFunction) {
// read the output type of the input Transform to coax out errors about MissingTypeInfo
// 检验输出和设置输出类型
transformation.getOutputType();
// configure the type if needed
// 检查输入方法类型
if (sinkFunction instanceof InputTypeConfigurable) {
((InputTypeConfigurable) sinkFunction).setInputType(getType(), getExecutionConfig());
}
// 创建一个 sinkOperator
StreamSink sinkOperator = new StreamSink<>(clean(sinkFunction));
// 使用 sinkOperator 创建 DataStreamSink , 同是创建 SinkTransformation
DataStreamSink sink = new DataStreamSink<>(this, sinkOperator);
// 把 SinkTransformation 添加到 transformations
getExecutionEnvironment().addOperator(sink.getTransformation());
// 返回 DataStreamSink
return sink;
}

DataStreamSink.java  创建 DataStreamSink 的时候,用当前的 DataStream 和 StreamSink 做参数, 当前的 DataStream 做为 StreamSink 的 input Transformation

protected DataStreamSink(DataStream inputStream, StreamSink operator) {
this.transformation = new SinkTransformation(inputStream.getTransformation(), "Unnamed", operator, inputStream.getExecutionEnvironment().getParallelism());
}

在创建 Sink 的 DataStream 的时候,将 前一个算子生成的 DataStream 传入 做为了 Sink 的 input Transformation。


## 总结

从 env.addSource.map.addSink 最简单的 Flink 程序,可以看到 Flink 创建 StreamGraph 前的 Transformation 生成过程,其他如: flatMap、filter、union、process 基本类似,其他如 join、window、forward 也相差不大

比如:

val join = process.join(map)
.where(str => str)
.equalTo(str => str)
.window(TumblingProcessingTimeWindows.of(Time.minutes(1)))
.apply(new JoinFunction[String, String, String] {
override def join(first: String, second: String): String = {
first + ";" + second
}
})

从 apply 追下去,会 看到  在 WindowedStream.java 的 apply 方法中 调用了 input.transform(opName, resultType, operator) 生成了 一个输出的 Transformation

@PublicEvolving
public SingleOutputStreamOperator transform(
String operatorName,
TypeInformation outTypeInfo,
OneInputStreamOperator operator) {
return doTransform(operatorName, outTypeInfo, SimpleOperatorFactory.of(operator));
}
protected SingleOutputStreamOperator doTransform(
String operatorName,
TypeInformation outTypeInfo,
StreamOperatorFactory operatorFactory) {
// read the output type of the input Transform to coax out errors about MissingTypeInfo
transformation.getOutputType();
OneInputTransformation resultTransform = new OneInputTransformation<>(
this.transformation,
operatorName,
operatorFactory,
outTypeInfo,
environment.getParallelism());
@SuppressWarnings({"unchecked", "rawtypes"})
SingleOutputStreamOperator returnStream = new SingleOutputStreamOperator(environment, resultTransform);
getExecutionEnvironment().addOperator(resultTransform);
return returnStream;

 

Transformation 就是 用户代码,转换成 Flink 算子的结果,Transformation

欢迎关注Flink菜鸟公众号,会不定期更新Flink(开发技术)相关的推文



推荐阅读
  • 本文介绍了UUID(通用唯一标识符)的概念及其在JavaScript中生成Java兼容UUID的代码实现与优化技巧。UUID是一个128位的唯一标识符,广泛应用于分布式系统中以确保唯一性。文章详细探讨了如何利用JavaScript生成符合Java标准的UUID,并提供了多种优化方法,以提高生成效率和兼容性。 ... [详细]
  • 本文详细介绍了在编写jQuery插件时需要注意的关键要点,包括模块化支持、命名规范和性能优化等内容,旨在帮助开发者提高插件的质量和可维护性。 ... [详细]
  • 本文详细介绍了 PHP 中对象的生命周期、内存管理和魔术方法的使用,包括对象的自动销毁、析构函数的作用以及各种魔术方法的具体应用场景。 ... [详细]
  • 如何使用 `org.eclipse.rdf4j.query.impl.MapBindingSet.getValue()` 方法及其代码示例详解 ... [详细]
  • 在对WordPress Duplicator插件0.4.4版本的安全评估中,发现其存在跨站脚本(XSS)攻击漏洞。此漏洞可能被利用进行恶意操作,建议用户及时更新至最新版本以确保系统安全。测试方法仅限于安全研究和教学目的,使用时需自行承担风险。漏洞编号:HTB23162。 ... [详细]
  • 当使用 `new` 表达式(即通过 `new` 动态创建对象)时,会发生两件事:首先,内存被分配用于存储新对象;其次,该对象的构造函数被调用以初始化对象。为了确保资源管理的一致性和避免内存泄漏,建议在使用 `new` 和 `delete` 时保持形式一致。例如,如果使用 `new[]` 分配数组,则应使用 `delete[]` 来释放内存;同样,如果使用 `new` 分配单个对象,则应使用 `delete` 来释放内存。这种一致性有助于防止常见的编程错误,提高代码的健壮性和可维护性。 ... [详细]
  • 本文探讨了使用JavaScript在不同页面间传递参数的技术方法。具体而言,从a.html页面跳转至b.html时,如何携带参数并使b.html替代当前页面显示,而非新开窗口。文中详细介绍了实现这一功能的代码及注释,帮助开发者更好地理解和应用该技术。 ... [详细]
  • 在Android应用开发中,实现与MySQL数据库的连接是一项重要的技术任务。本文详细介绍了Android连接MySQL数据库的操作流程和技术要点。首先,Android平台提供了SQLiteOpenHelper类作为数据库辅助工具,用于创建或打开数据库。开发者可以通过继承并扩展该类,实现对数据库的初始化和版本管理。此外,文章还探讨了使用第三方库如Retrofit或Volley进行网络请求,以及如何通过JSON格式交换数据,确保与MySQL服务器的高效通信。 ... [详细]
  • 本指南从零开始介绍Scala编程语言的基础知识,重点讲解了Scala解释器REPL(读取-求值-打印-循环)的使用方法。REPL是Scala开发中的重要工具,能够帮助初学者快速理解和实践Scala的基本语法和特性。通过详细的示例和练习,读者将能够熟练掌握Scala的基础概念和编程技巧。 ... [详细]
  • 本文总结了JavaScript的核心知识点和实用技巧,涵盖了变量声明、DOM操作、事件处理等重要方面。例如,通过`event.srcElement`获取触发事件的元素,并使用`alert`显示其HTML结构;利用`innerText`和`innerHTML`属性分别设置和获取文本内容及HTML内容。此外,还介绍了如何在表单中动态生成和操作``元素,以便更好地处理用户输入。这些技巧对于提升前端开发效率和代码质量具有重要意义。 ... [详细]
  • 在第二课中,我们将深入探讨Scala的面向对象编程核心概念及其在Spark源码中的应用。首先,通过详细的实战案例,全面解析Scala中的类和对象。作为一门纯面向对象的语言,Scala的类设计和对象使用是理解其面向对象特性的关键。此外,我们还将介绍如何通过阅读Spark源码来进一步巩固对这些概念的理解。这不仅有助于提升编程技能,还能为后续的高级应用开发打下坚实的基础。 ... [详细]
  • 在 JavaScript 中,浮点数的除法运算常常会出现精度问题。本文介绍了一种方法,通过自定义函数来精确控制除法运算结果的小数位数,确保计算结果的准确性。该方法不仅适用于除法,还可以扩展到其他浮点数运算,有效避免了 JavaScript 在处理浮点数时常见的精度误差。 ... [详细]
  • 本文详细解析了一种实用的函数,用于从URL中提取查询参数。该函数通过处理URL中的搜索部分,能够高效地获取并解析出所需的参数值,适用于各种Web开发场景。 ... [详细]
  • 深入剖析Java中SimpleDateFormat在多线程环境下的潜在风险与解决方案
    深入剖析Java中SimpleDateFormat在多线程环境下的潜在风险与解决方案 ... [详细]
  • 在PHP中,高效地分割字符串是一项常见的需求。本文探讨了多种技术,用于在特定字符(如“或”)后进行字符串分割。通过使用正则表达式和内置函数,可以实现更加灵活和高效的字符串处理。例如,可以使用 `preg_split` 函数来实现这一目标,该函数允许指定复杂的分隔符模式,从而提高代码的可读性和性能。此外,文章还介绍了如何优化分割操作以减少内存消耗和提高执行速度。 ... [详细]
author-avatar
佩人永雪3
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有