热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

一文搞懂Flink如何移动计算

对于分布式框架来说,我们经常听到的一句话就是:移动计算,不移动数据。那么对于Flink来说是如何移动计算的呢?我们一起重点看

对于分布式框架来说,我们经常听到的一句话就是:移动计算,不移动数据。那么对于 Flink 来说是如何移动计算的呢?我们一起重点看一下 ExecuteGraph

基本概念

ExecutionJobVertex:表示 JobGraph 的一个计算顶点,每个 ExecutionJobVertex 可能会有很多个 并行的 ExecutionVertex
ExecutionVertex:表示一个并行的 subtask
Execution: 表示 ExecutionVertex 的一次尝试执行

Graph 变化

在这里插入图片描述

源代码

由 一文搞定 Flink Job 提交全流程 我们可以知道在 创建 jobMaster 的同时还 create executionGraph ,一路追踪至 ExecutionGraphBuilder.buildGraph 方法

......
// topologically sort the job vertices and attach the graph to the existing one// 排好序的 topology source->flatMap Filter->sink// 一个 operator chain 形成一个 JobVertex 。single operator as a special operator chainList<JobVertex> sortedTopology &#61; jobGraph.getVerticesSortedTopologicallyFromSources();if (log.isDebugEnabled()) {log.debug("Adding {} vertices from job graph {} ({}).", sortedTopology.size(), jobName, jobId);}executionGraph.attachJobGraph(sortedTopology);......

进入 attachJobGraph

public void attachJobGraph(List<JobVertex> topologiallySorted) throws JobException {assertRunningInJobMasterMainThread();LOG.debug("Attaching {} topologically sorted vertices to existing job graph with {} " &#43;"vertices and {} intermediate results.",topologiallySorted.size(),tasks.size(),intermediateResults.size());final ArrayList<ExecutionJobVertex> newExecJobVertices &#61; new ArrayList<>(topologiallySorted.size());final long createTimestamp &#61; System.currentTimeMillis();//从 source operator chain 开始for (JobVertex jobVertex : topologiallySorted) {if (jobVertex.isInputVertex() && !jobVertex.isStoppable()) {this.isStoppable &#61; false;}/*//在这里生成 ExecutionGraph 的每个节点//首先是进行了一堆赋值&#xff0c;将任务信息交给要生成的图节点&#xff0c;以及设定并行度等等//然后是创建本节点的 IntermediateResult&#xff0c;根据本节点的下游节点的个数确定创建几份//最后是根据设定好的并行度创建用于执行 task 的 ExecutionVertex//如果 job 有设定 inputsplit 的话&#xff0c;这里还要指定 inputsplits*/// create the execution job vertex and attach it to the graph// 已经开始并行化了ExecutionJobVertex ejv &#61; new ExecutionJobVertex(this,jobVertex,1,rpcTimeout,globalModVersion,createTimestamp);/*//这里要处理所有的JobEdge//对每个edge&#xff0c;获取对应的intermediateResult&#xff0c;并记录到本节点的输入上//最后&#xff0c;把每个ExecutorVertex和对应的IntermediateResult关联起来*/ejv.connectToPredecessors(this.intermediateResults);ExecutionJobVertex previousTask &#61; this.tasks.putIfAbsent(jobVertex.getID(), ejv);if (previousTask !&#61; null) {throw new JobException(String.format("Encountered two job vertices with ID %s : previous&#61;[%s] / new&#61;[%s]",jobVertex.getID(), ejv, previousTask));}for (IntermediateResult res : ejv.getProducedDataSets()) {IntermediateResult previousDataSet &#61; this.intermediateResults.putIfAbsent(res.getId(), res);if (previousDataSet !&#61; null) {throw new JobException(String.format("Encountered two intermediate data set with ID %s : previous&#61;[%s] / new&#61;[%s]",res.getId(), res, previousDataSet));}}this.verticesInCreationOrder.add(ejv);this.numVerticesTotal &#43;&#61; ejv.getParallelism();newExecJobVertices.add(ejv);}terminationFuture &#61; new CompletableFuture<>();failoverStrategy.notifyNewVertices(newExecJobVertices);}

关键性方法 new ExecutionJobVertex&#xff0c;除了进行了一些基本的赋值操作外&#xff0c;还并行化了 intermediateResult&#xff0c;并行化了 ExecutionVertex。
说白点&#xff0c;就是创建了几个特别相似的 intermediateResult 对象以及 ExecutionVertex 对象&#xff0c;具体如下

// 已经开始并行化了public ExecutionJobVertex(ExecutionGraph graph,JobVertex jobVertex,int defaultParallelism,Time timeout,long initialGlobalModVersion,long createTimestamp) throws JobException {if (graph &#61;&#61; null || jobVertex &#61;&#61; null) {throw new NullPointerException();}this.graph &#61; graph;this.jobVertex &#61; jobVertex;int vertexParallelism &#61; jobVertex.getParallelism();// 最终的并行度int numTaskVertices &#61; vertexParallelism > 0 ? vertexParallelism : defaultParallelism;final int configuredMaxParallelism &#61; jobVertex.getMaxParallelism();this.maxParallelismConfigured &#61; (VALUE_NOT_SET !&#61; configuredMaxParallelism);// if no max parallelism was configured by the user, we calculate and set a defaultsetMaxParallelismInternal(maxParallelismConfigured ?configuredMaxParallelism : KeyGroupRangeAssignment.computeDefaultMaxParallelism(numTaskVertices));// verify that our parallelism is not higher than the maximum parallelismif (numTaskVertices > maxParallelism) {throw new JobException(String.format("Vertex %s&#39;s parallelism (%s) is higher than the max parallelism (%s). Please lower the parallelism or increase the max parallelism.",jobVertex.getName(),numTaskVertices,maxParallelism));}this.parallelism &#61; numTaskVertices;this.taskVertices &#61; new ExecutionVertex[numTaskVertices];this.operatorIDs &#61; Collections.unmodifiableList(jobVertex.getOperatorIDs());this.userDefinedOperatorIds &#61; Collections.unmodifiableList(jobVertex.getUserDefinedOperatorIDs());this.inputs &#61; new ArrayList<>(jobVertex.getInputs().size());// take the sharing groupthis.slotSharingGroup &#61; jobVertex.getSlotSharingGroup();this.coLocationGroup &#61; jobVertex.getCoLocationGroup();// setup the coLocation groupif (coLocationGroup !&#61; null && slotSharingGroup &#61;&#61; null) {throw new JobException("Vertex uses a co-location constraint without using slot sharing");}// create the intermediate resultsthis.producedDataSets &#61; new IntermediateResult[jobVertex.getNumberOfProducedIntermediateDataSets()];// intermediateResult 开始并行化for (int i &#61; 0; i < jobVertex.getProducedDataSets().size(); i&#43;&#43;) {final IntermediateDataSet result &#61; jobVertex.getProducedDataSets().get(i);this.producedDataSets[i] &#61; new IntermediateResult(result.getId(),this,numTaskVertices,result.getResultType());}Configuration jobConfiguration &#61; graph.getJobConfiguration();int maxPriorAttemptsHistoryLength &#61; jobConfiguration !&#61; null ?jobConfiguration.getInteger(JobManagerOptions.MAX_ATTEMPTS_HISTORY_SIZE) :JobManagerOptions.MAX_ATTEMPTS_HISTORY_SIZE.defaultValue();// create all task vertices// 移动计算// ExecutionVertex 开始并行化for (int i &#61; 0; i < numTaskVertices; i&#43;&#43;) {ExecutionVertex vertex &#61; new ExecutionVertex(this,i,producedDataSets,timeout,initialGlobalModVersion,createTimestamp,maxPriorAttemptsHistoryLength);this.taskVertices[i] &#61; vertex;}// sanity check for the double referencing between intermediate result partitions and execution verticesfor (IntermediateResult ir : this.producedDataSets) {if (ir.getNumberOfAssignedPartitions() !&#61; parallelism) {throw new RuntimeException("The intermediate result&#39;s partitions were not correctly assigned.");}}// set up the input splits, if the vertex has anytry {&#64;SuppressWarnings("unchecked")InputSplitSource<InputSplit> splitSource &#61; (InputSplitSource<InputSplit>) jobVertex.getInputSplitSource();if (splitSource !&#61; null) {Thread currentThread &#61; Thread.currentThread();ClassLoader oldContextClassLoader &#61; currentThread.getContextClassLoader();currentThread.setContextClassLoader(graph.getUserClassLoader());try {inputSplits &#61; splitSource.createInputSplits(numTaskVertices);if (inputSplits !&#61; null) {splitAssigner &#61; splitSource.getInputSplitAssigner(inputSplits);}} finally {currentThread.setContextClassLoader(oldContextClassLoader);}}else {inputSplits &#61; null;}}catch (Throwable t) {throw new JobException("Creating the input splits caused an error: " &#43; t.getMessage(), t);}}

至此移动计算&#xff0c;就算清楚了


推荐阅读
author-avatar
php.com
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有