热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

一文搞懂Flink如何移动计算

对于分布式框架来说,我们经常听到的一句话就是:移动计算,不移动数据。那么对于Flink来说是如何移动计算的呢?我们一起重点看

对于分布式框架来说,我们经常听到的一句话就是:移动计算,不移动数据。那么对于 Flink 来说是如何移动计算的呢?我们一起重点看一下 ExecuteGraph

基本概念

ExecutionJobVertex:表示 JobGraph 的一个计算顶点,每个 ExecutionJobVertex 可能会有很多个 并行的 ExecutionVertex
ExecutionVertex:表示一个并行的 subtask
Execution: 表示 ExecutionVertex 的一次尝试执行

Graph 变化

在这里插入图片描述

源代码

由 一文搞定 Flink Job 提交全流程 我们可以知道在 创建 jobMaster 的同时还 create executionGraph ,一路追踪至 ExecutionGraphBuilder.buildGraph 方法

......
// topologically sort the job vertices and attach the graph to the existing one// 排好序的 topology source->flatMap Filter->sink// 一个 operator chain 形成一个 JobVertex 。single operator as a special operator chainList<JobVertex> sortedTopology &#61; jobGraph.getVerticesSortedTopologicallyFromSources();if (log.isDebugEnabled()) {log.debug("Adding {} vertices from job graph {} ({}).", sortedTopology.size(), jobName, jobId);}executionGraph.attachJobGraph(sortedTopology);......

进入 attachJobGraph

public void attachJobGraph(List<JobVertex> topologiallySorted) throws JobException {assertRunningInJobMasterMainThread();LOG.debug("Attaching {} topologically sorted vertices to existing job graph with {} " &#43;"vertices and {} intermediate results.",topologiallySorted.size(),tasks.size(),intermediateResults.size());final ArrayList<ExecutionJobVertex> newExecJobVertices &#61; new ArrayList<>(topologiallySorted.size());final long createTimestamp &#61; System.currentTimeMillis();//从 source operator chain 开始for (JobVertex jobVertex : topologiallySorted) {if (jobVertex.isInputVertex() && !jobVertex.isStoppable()) {this.isStoppable &#61; false;}/*//在这里生成 ExecutionGraph 的每个节点//首先是进行了一堆赋值&#xff0c;将任务信息交给要生成的图节点&#xff0c;以及设定并行度等等//然后是创建本节点的 IntermediateResult&#xff0c;根据本节点的下游节点的个数确定创建几份//最后是根据设定好的并行度创建用于执行 task 的 ExecutionVertex//如果 job 有设定 inputsplit 的话&#xff0c;这里还要指定 inputsplits*/// create the execution job vertex and attach it to the graph// 已经开始并行化了ExecutionJobVertex ejv &#61; new ExecutionJobVertex(this,jobVertex,1,rpcTimeout,globalModVersion,createTimestamp);/*//这里要处理所有的JobEdge//对每个edge&#xff0c;获取对应的intermediateResult&#xff0c;并记录到本节点的输入上//最后&#xff0c;把每个ExecutorVertex和对应的IntermediateResult关联起来*/ejv.connectToPredecessors(this.intermediateResults);ExecutionJobVertex previousTask &#61; this.tasks.putIfAbsent(jobVertex.getID(), ejv);if (previousTask !&#61; null) {throw new JobException(String.format("Encountered two job vertices with ID %s : previous&#61;[%s] / new&#61;[%s]",jobVertex.getID(), ejv, previousTask));}for (IntermediateResult res : ejv.getProducedDataSets()) {IntermediateResult previousDataSet &#61; this.intermediateResults.putIfAbsent(res.getId(), res);if (previousDataSet !&#61; null) {throw new JobException(String.format("Encountered two intermediate data set with ID %s : previous&#61;[%s] / new&#61;[%s]",res.getId(), res, previousDataSet));}}this.verticesInCreationOrder.add(ejv);this.numVerticesTotal &#43;&#61; ejv.getParallelism();newExecJobVertices.add(ejv);}terminationFuture &#61; new CompletableFuture<>();failoverStrategy.notifyNewVertices(newExecJobVertices);}

关键性方法 new ExecutionJobVertex&#xff0c;除了进行了一些基本的赋值操作外&#xff0c;还并行化了 intermediateResult&#xff0c;并行化了 ExecutionVertex。
说白点&#xff0c;就是创建了几个特别相似的 intermediateResult 对象以及 ExecutionVertex 对象&#xff0c;具体如下

// 已经开始并行化了public ExecutionJobVertex(ExecutionGraph graph,JobVertex jobVertex,int defaultParallelism,Time timeout,long initialGlobalModVersion,long createTimestamp) throws JobException {if (graph &#61;&#61; null || jobVertex &#61;&#61; null) {throw new NullPointerException();}this.graph &#61; graph;this.jobVertex &#61; jobVertex;int vertexParallelism &#61; jobVertex.getParallelism();// 最终的并行度int numTaskVertices &#61; vertexParallelism > 0 ? vertexParallelism : defaultParallelism;final int configuredMaxParallelism &#61; jobVertex.getMaxParallelism();this.maxParallelismConfigured &#61; (VALUE_NOT_SET !&#61; configuredMaxParallelism);// if no max parallelism was configured by the user, we calculate and set a defaultsetMaxParallelismInternal(maxParallelismConfigured ?configuredMaxParallelism : KeyGroupRangeAssignment.computeDefaultMaxParallelism(numTaskVertices));// verify that our parallelism is not higher than the maximum parallelismif (numTaskVertices > maxParallelism) {throw new JobException(String.format("Vertex %s&#39;s parallelism (%s) is higher than the max parallelism (%s). Please lower the parallelism or increase the max parallelism.",jobVertex.getName(),numTaskVertices,maxParallelism));}this.parallelism &#61; numTaskVertices;this.taskVertices &#61; new ExecutionVertex[numTaskVertices];this.operatorIDs &#61; Collections.unmodifiableList(jobVertex.getOperatorIDs());this.userDefinedOperatorIds &#61; Collections.unmodifiableList(jobVertex.getUserDefinedOperatorIDs());this.inputs &#61; new ArrayList<>(jobVertex.getInputs().size());// take the sharing groupthis.slotSharingGroup &#61; jobVertex.getSlotSharingGroup();this.coLocationGroup &#61; jobVertex.getCoLocationGroup();// setup the coLocation groupif (coLocationGroup !&#61; null && slotSharingGroup &#61;&#61; null) {throw new JobException("Vertex uses a co-location constraint without using slot sharing");}// create the intermediate resultsthis.producedDataSets &#61; new IntermediateResult[jobVertex.getNumberOfProducedIntermediateDataSets()];// intermediateResult 开始并行化for (int i &#61; 0; i < jobVertex.getProducedDataSets().size(); i&#43;&#43;) {final IntermediateDataSet result &#61; jobVertex.getProducedDataSets().get(i);this.producedDataSets[i] &#61; new IntermediateResult(result.getId(),this,numTaskVertices,result.getResultType());}Configuration jobConfiguration &#61; graph.getJobConfiguration();int maxPriorAttemptsHistoryLength &#61; jobConfiguration !&#61; null ?jobConfiguration.getInteger(JobManagerOptions.MAX_ATTEMPTS_HISTORY_SIZE) :JobManagerOptions.MAX_ATTEMPTS_HISTORY_SIZE.defaultValue();// create all task vertices// 移动计算// ExecutionVertex 开始并行化for (int i &#61; 0; i < numTaskVertices; i&#43;&#43;) {ExecutionVertex vertex &#61; new ExecutionVertex(this,i,producedDataSets,timeout,initialGlobalModVersion,createTimestamp,maxPriorAttemptsHistoryLength);this.taskVertices[i] &#61; vertex;}// sanity check for the double referencing between intermediate result partitions and execution verticesfor (IntermediateResult ir : this.producedDataSets) {if (ir.getNumberOfAssignedPartitions() !&#61; parallelism) {throw new RuntimeException("The intermediate result&#39;s partitions were not correctly assigned.");}}// set up the input splits, if the vertex has anytry {&#64;SuppressWarnings("unchecked")InputSplitSource<InputSplit> splitSource &#61; (InputSplitSource<InputSplit>) jobVertex.getInputSplitSource();if (splitSource !&#61; null) {Thread currentThread &#61; Thread.currentThread();ClassLoader oldContextClassLoader &#61; currentThread.getContextClassLoader();currentThread.setContextClassLoader(graph.getUserClassLoader());try {inputSplits &#61; splitSource.createInputSplits(numTaskVertices);if (inputSplits !&#61; null) {splitAssigner &#61; splitSource.getInputSplitAssigner(inputSplits);}} finally {currentThread.setContextClassLoader(oldContextClassLoader);}}else {inputSplits &#61; null;}}catch (Throwable t) {throw new JobException("Creating the input splits caused an error: " &#43; t.getMessage(), t);}}

至此移动计算&#xff0c;就算清楚了


推荐阅读
  • 自动轮播,反转播放的ViewPagerAdapter的使用方法和效果展示
    本文介绍了如何使用自动轮播、反转播放的ViewPagerAdapter,并展示了其效果。该ViewPagerAdapter支持无限循环、触摸暂停、切换缩放等功能。同时提供了使用GIF.gif的示例和github地址。通过LoopFragmentPagerAdapter类的getActualCount、getActualItem和getActualPagerTitle方法可以实现自定义的循环效果和标题展示。 ... [详细]
  • JDK源码学习之HashTable(附带面试题)的学习笔记
    本文介绍了JDK源码学习之HashTable(附带面试题)的学习笔记,包括HashTable的定义、数据类型、与HashMap的关系和区别。文章提供了干货,并附带了其他相关主题的学习笔记。 ... [详细]
  • 基于Socket的多个客户端之间的聊天功能实现方法
    本文介绍了基于Socket的多个客户端之间实现聊天功能的方法,包括服务器端的实现和客户端的实现。服务器端通过每个用户的输出流向特定用户发送消息,而客户端通过输入流接收消息。同时,还介绍了相关的实体类和Socket的基本概念。 ... [详细]
  • Java 11相对于Java 8,OptaPlanner性能提升有多大?
    本文通过基准测试比较了Java 11和Java 8对OptaPlanner的性能提升。测试结果表明,在相同的硬件环境下,Java 11相对于Java 8在垃圾回收方面表现更好,从而提升了OptaPlanner的性能。 ... [详细]
  • 本文整理了Java面试中常见的问题及相关概念的解析,包括HashMap中为什么重写equals还要重写hashcode、map的分类和常见情况、final关键字的用法、Synchronized和lock的区别、volatile的介绍、Syncronized锁的作用、构造函数和构造函数重载的概念、方法覆盖和方法重载的区别、反射获取和设置对象私有字段的值的方法、通过反射创建对象的方式以及内部类的详解。 ... [详细]
  • 本文介绍了一个Python函数same_set,用于判断两个相等长度的数组是否包含相同的元素。函数会忽略元素的顺序和重复次数,如果两个数组包含相同的元素,则返回1,否则返回0。文章还提供了函数的具体实现代码和样例输入输出。 ... [详细]
  • 本文分享了一个关于在C#中使用异步代码的问题,作者在控制台中运行时代码正常工作,但在Windows窗体中却无法正常工作。作者尝试搜索局域网上的主机,但在窗体中计数器没有减少。文章提供了相关的代码和解决思路。 ... [详细]
  • Java学习笔记之面向对象编程(OOP)
    本文介绍了Java学习笔记中的面向对象编程(OOP)内容,包括OOP的三大特性(封装、继承、多态)和五大原则(单一职责原则、开放封闭原则、里式替换原则、依赖倒置原则)。通过学习OOP,可以提高代码复用性、拓展性和安全性。 ... [详细]
  • 本文介绍了在CentOS上安装Python2.7.2的详细步骤,包括下载、解压、编译和安装等操作。同时提供了一些注意事项,以及测试安装是否成功的方法。 ... [详细]
  • 本文介绍了Swing组件的用法,重点讲解了图标接口的定义和创建方法。图标接口用来将图标与各种组件相关联,可以是简单的绘画或使用磁盘上的GIF格式图像。文章详细介绍了图标接口的属性和绘制方法,并给出了一个菱形图标的实现示例。该示例可以配置图标的尺寸、颜色和填充状态。 ... [详细]
  • Whatsthedifferencebetweento_aandto_ary?to_a和to_ary有什么区别? ... [详细]
  • 欢乐的票圈重构之旅——RecyclerView的头尾布局增加
    项目重构的Git地址:https:github.comrazerdpFriendCircletreemain-dev项目同步更新的文集:http:www.jianshu.comno ... [详细]
  • Android系统源码分析Zygote和SystemServer启动过程详解
    本文详细解析了Android系统源码中Zygote和SystemServer的启动过程。首先介绍了系统framework层启动的内容,帮助理解四大组件的启动和管理过程。接着介绍了AMS、PMS等系统服务的作用和调用方式。然后详细分析了Zygote的启动过程,解释了Zygote在Android启动过程中的决定作用。最后通过时序图展示了整个过程。 ... [详细]
  • 本文介绍了一个编程问题,要求求解一个给定n阶方阵的鞍点个数。通过输入格式的描述,可以了解到输入的是一个n阶方阵,每个元素都是整数。通过输出格式的描述,可以了解到输出的是鞍点的个数。通过题目集全集传送门,可以了解到提供了两个函数is_line_max和is_rank_min,用于判断一个元素是否为鞍点。本文还提供了三个样例,分别展示了不同情况下的输入和输出。 ... [详细]
  • 本文介绍了OkHttp3的基本使用和特性,包括支持HTTP/2、连接池、GZIP压缩、缓存等功能。同时还提到了OkHttp3的适用平台和源码阅读计划。文章还介绍了OkHttp3的请求/响应API的设计和使用方式,包括阻塞式的同步请求和带回调的异步请求。 ... [详细]
author-avatar
php.com
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有