对于分布式框架来说,我们经常听到的一句话就是:移动计算,不移动数据。那么对于 Flink 来说是如何移动计算的呢?我们一起重点看一下 ExecuteGraph
基本概念
ExecutionJobVertex:表示 JobGraph 的一个计算顶点,每个 ExecutionJobVertex 可能会有很多个 并行的 ExecutionVertex
ExecutionVertex:表示一个并行的 subtask
Execution: 表示 ExecutionVertex 的一次尝试执行
Graph 变化
源代码
由 一文搞定 Flink Job 提交全流程 我们可以知道在 创建 jobMaster 的同时还 create executionGraph ,一路追踪至 ExecutionGraphBuilder.buildGraph 方法
......
List<JobVertex> sortedTopology &#61; jobGraph.getVerticesSortedTopologicallyFromSources();if (log.isDebugEnabled()) {log.debug("Adding {} vertices from job graph {} ({}).", sortedTopology.size(), jobName, jobId);}executionGraph.attachJobGraph(sortedTopology);......
进入 attachJobGraph
public void attachJobGraph(List<JobVertex> topologiallySorted) throws JobException {assertRunningInJobMasterMainThread();LOG.debug("Attaching {} topologically sorted vertices to existing job graph with {} " &#43;"vertices and {} intermediate results.",topologiallySorted.size(),tasks.size(),intermediateResults.size());final ArrayList<ExecutionJobVertex> newExecJobVertices &#61; new ArrayList<>(topologiallySorted.size());final long createTimestamp &#61; System.currentTimeMillis();for (JobVertex jobVertex : topologiallySorted) {if (jobVertex.isInputVertex() && !jobVertex.isStoppable()) {this.isStoppable &#61; false;}ExecutionJobVertex ejv &#61; new ExecutionJobVertex(this,jobVertex,1,rpcTimeout,globalModVersion,createTimestamp);ejv.connectToPredecessors(this.intermediateResults);ExecutionJobVertex previousTask &#61; this.tasks.putIfAbsent(jobVertex.getID(), ejv);if (previousTask !&#61; null) {throw new JobException(String.format("Encountered two job vertices with ID %s : previous&#61;[%s] / new&#61;[%s]",jobVertex.getID(), ejv, previousTask));}for (IntermediateResult res : ejv.getProducedDataSets()) {IntermediateResult previousDataSet &#61; this.intermediateResults.putIfAbsent(res.getId(), res);if (previousDataSet !&#61; null) {throw new JobException(String.format("Encountered two intermediate data set with ID %s : previous&#61;[%s] / new&#61;[%s]",res.getId(), res, previousDataSet));}}this.verticesInCreationOrder.add(ejv);this.numVerticesTotal &#43;&#61; ejv.getParallelism();newExecJobVertices.add(ejv);}terminationFuture &#61; new CompletableFuture<>();failoverStrategy.notifyNewVertices(newExecJobVertices);}
关键性方法 new ExecutionJobVertex&#xff0c;除了进行了一些基本的赋值操作外&#xff0c;还并行化了 intermediateResult&#xff0c;并行化了 ExecutionVertex。
说白点&#xff0c;就是创建了几个特别相似的 intermediateResult 对象以及 ExecutionVertex 对象&#xff0c;具体如下
public ExecutionJobVertex(ExecutionGraph graph,JobVertex jobVertex,int defaultParallelism,Time timeout,long initialGlobalModVersion,long createTimestamp) throws JobException {if (graph &#61;&#61; null || jobVertex &#61;&#61; null) {throw new NullPointerException();}this.graph &#61; graph;this.jobVertex &#61; jobVertex;int vertexParallelism &#61; jobVertex.getParallelism();int numTaskVertices &#61; vertexParallelism > 0 ? vertexParallelism : defaultParallelism;final int configuredMaxParallelism &#61; jobVertex.getMaxParallelism();this.maxParallelismConfigured &#61; (VALUE_NOT_SET !&#61; configuredMaxParallelism);setMaxParallelismInternal(maxParallelismConfigured ?configuredMaxParallelism : KeyGroupRangeAssignment.computeDefaultMaxParallelism(numTaskVertices));if (numTaskVertices > maxParallelism) {throw new JobException(String.format("Vertex %s&#39;s parallelism (%s) is higher than the max parallelism (%s). Please lower the parallelism or increase the max parallelism.",jobVertex.getName(),numTaskVertices,maxParallelism));}this.parallelism &#61; numTaskVertices;this.taskVertices &#61; new ExecutionVertex[numTaskVertices];this.operatorIDs &#61; Collections.unmodifiableList(jobVertex.getOperatorIDs());this.userDefinedOperatorIds &#61; Collections.unmodifiableList(jobVertex.getUserDefinedOperatorIDs());this.inputs &#61; new ArrayList<>(jobVertex.getInputs().size());this.slotSharingGroup &#61; jobVertex.getSlotSharingGroup();this.coLocationGroup &#61; jobVertex.getCoLocationGroup();if (coLocationGroup !&#61; null && slotSharingGroup &#61;&#61; null) {throw new JobException("Vertex uses a co-location constraint without using slot sharing");}this.producedDataSets &#61; new IntermediateResult[jobVertex.getNumberOfProducedIntermediateDataSets()];for (int i &#61; 0; i < jobVertex.getProducedDataSets().size(); i&#43;&#43;) {final IntermediateDataSet result &#61; jobVertex.getProducedDataSets().get(i);this.producedDataSets[i] &#61; new IntermediateResult(result.getId(),this,numTaskVertices,result.getResultType());}Configuration jobConfiguration &#61; graph.getJobConfiguration();int maxPriorAttemptsHistoryLength &#61; jobConfiguration !&#61; null ?jobConfiguration.getInteger(JobManagerOptions.MAX_ATTEMPTS_HISTORY_SIZE) :JobManagerOptions.MAX_ATTEMPTS_HISTORY_SIZE.defaultValue();for (int i &#61; 0; i < numTaskVertices; i&#43;&#43;) {ExecutionVertex vertex &#61; new ExecutionVertex(this,i,producedDataSets,timeout,initialGlobalModVersion,createTimestamp,maxPriorAttemptsHistoryLength);this.taskVertices[i] &#61; vertex;}for (IntermediateResult ir : this.producedDataSets) {if (ir.getNumberOfAssignedPartitions() !&#61; parallelism) {throw new RuntimeException("The intermediate result&#39;s partitions were not correctly assigned.");}}try {&#64;SuppressWarnings("unchecked")InputSplitSource<InputSplit> splitSource &#61; (InputSplitSource<InputSplit>) jobVertex.getInputSplitSource();if (splitSource !&#61; null) {Thread currentThread &#61; Thread.currentThread();ClassLoader oldContextClassLoader &#61; currentThread.getContextClassLoader();currentThread.setContextClassLoader(graph.getUserClassLoader());try {inputSplits &#61; splitSource.createInputSplits(numTaskVertices);if (inputSplits !&#61; null) {splitAssigner &#61; splitSource.getInputSplitAssigner(inputSplits);}} finally {currentThread.setContextClassLoader(oldContextClassLoader);}}else {inputSplits &#61; null;}}catch (Throwable t) {throw new JobException("Creating the input splits caused an error: " &#43; t.getMessage(), t);}}
至此移动计算&#xff0c;就算清楚了