作者:手机用户2502862657 | 来源:互联网 | 2023-09-03 15:36
By云端上的男人—DT大数据梦工厂上一次笔者大致阐述了一下DAGScheduler中Stage的划分,这次笔者将会阐述一下关于一个Job作业在提交的过程中所涉及到的一些参与相关的实
By云端上的男人—DT大数据梦工厂
上一次笔者大致阐述了一下DAGScheduler中Stage的划分,这次笔者将会阐述一下关于一个Job作业在提交的过程中所涉及到的一些参与相关的实体,如图所示是Driver端Job提交的流程图。
图1
笔者所写的博客面对的人员是有scala基础以及对进程和线程有相关概念的人员和spark会基本使用的人员,如果大家有的对scala不是很熟悉的话,请大家观看家林老师的相关的scala教程链接:链接:http://pan.baidu.com/s/1c2irB9I 密码:t2bf
Spark基础相关的链接链接:链接:http://pan.baidu.com/s/1dEG5m7J 密码:atdx
如图所示,job提交大致划分为10个步骤。
1:用户程序有action算子触发一系列的语句,都是可以看成一个job。例子如下
sc.parallelize(1to100,4).map( ( _ ,1) ).reduceByKey( _ + _ ,3).collect
(注:一个用户程序可以有多个Job,而且在用户程序中生成的Stage可以被多个Job使用,不过笔者在这里目前不讨论这些情况,只讨论最简洁的使用即可)
2:由1提交的语句除去action算子,然后把所有的生成的RDD提交给DAGScheduler实体,并按照宽依赖进行Stage划分。如图所示,两个ShuffleMapStage,一个ResultStage。(这些内容读者可以阅读笔者的上次博客)。
3:把生成的Stage提交给TaskSchedulerImpl实体,TaskSchedulerImpl实体会把Stage封装在一个TaskSetManager实体中,然后TaskSchedulerImpl实体会经过图中4步骤会把已封装TaskSetManager的实体放入到Pool实体中。这个Pool实体则是代表着真正管理所有的Stage。也就是说,在每个Stage之下,Spark又把Stage划分为多个Task。这些Task又分为ShuffleMapTask和ResultTask两种。如上图所示。
4:如步骤3中所示,但是笔者在这里只考虑的是FIFO模式(即TaskSetManager是按照)。关于FAIR公平调度模式,读者可以自行搜索。FIFO意义在于多个Job之间的运行是先入先出,即先到的Job,先获得资源,然后运行完之后在第二个Job开始运行,以此类推,Stage之间因为有依赖,所以其计算也必须有依赖(一般情况),所以在真正把计算任务提交的时候,就需要约束这个条件,即先计算依赖的Stage的任务(Task),然后再计算当前这个Stage所代表的Task。
5:把生成的Task传递给CoarseGrainedSchedulerBackend实体,该实体通过步骤6负责把Task任务传递给driver所对应的还活着的计算节点中。
6:笔者之前阐述过RpcEndPoint相关的知识,而CoarseGrainedSchedulerBackend中存在着这样的一个实体。该实体则会与外部 的RpcEndPoint实体通信,通过Netty框架把我们的Task任务传输过去。
7:既然我们的计算集群是计算任务的,那么想必就必须有结果返回给Driver(注:这里的Driver就是CoarseGrainedSchedulerBackend才对。所以该步骤就是要把计算后的结果返回给Driver。但是笔者在阅读源码的时候看到,计算节点返回的结果因Task任务运行的状态的不同是返回不同的结果。或者说是一类是真正用户需要的结果,则一类则是计算失败返回的失败结果的原因。不过慢慢的笔者也理解这里面所涉及的思路,如果是正常把结果返回了,框架的确并不要关注其他的事情。只有到返回的结果是失败的内容的时候,框架就需要考虑容错。这种思路是很常见。
8:由Driver接收到的结果路由到TaskSchedulerImpl实体来帮忙做处理。Driver
实体只是处理一些在其实体中的元数据的信息,然后如步骤7所说的,根据返回结果的不同让TaskSchedulerImpl实体做不同的处理。
9:其实TaskSchedulerImpl实体只是处理一些在其实体中的元数据的信息而已,真正做结果的处理是路由到TaskResultGetter实体。如上所说,接收到结果一类是真正的数据,另一类则是任务的失败的一些信息。
10:9把接收的数据的结果处理完后,又把控制权交给了TaskSchedulerImpl实体,该实体会根据9得到的结果的类型来做相应的不同的处理的步骤。
。。。。还有好多细节的步骤,笔者并不能一一细说,希望读者自己尝试着试试。或者笔者以后有时间我们在继续这底层的细节。
接下来,我们看一下Driver把Task任务传递到CoarseGrainedExecutorBackend实体端流程,即我们把视角转到了计算节点中的某一个实体来讨论。
1:和上面图中的最后的一步做衔接,CoarseGrainedSchedulerBackend实体把Task任务传递到集群某个CoarseGrainedExecutorBackend实体的计算节点中以便于计算。
2:和上面某些地方类似CoarseGrainedExecutorBackend实体本身就是一个RpcEndpoint实体,是用于接收和处理消息来使用的,也就是说CoarseGrainedExecutorBackend实体把真正要计算的事情交给了Exeutor实体来做计算。
3:如果读者熟悉Java中线程池技术的话,或许会知道一个接口即:
图3
在java中,这样的接口具体实现类,以及相应的工厂方法我们更为的熟悉,即Executors工厂类,在Saprk中,我们也看到了一个类,而且名字是一样,图4所示。这两个类的思想极为相似,即把任务提交给Executor实体(或者是其继承类),然后让其中的某个线程来执行所需要的结果。而在我们的Spark中,是通过使用TaskRunner在此封装来做任务的计算,而真正要计算的还是Driver(在这里CoarseGrainedSchedulerBackend是主要作用)传递过来的Task。
图4
4:这一步骤就是TaskRunner把Task计算好的任务的结果返回给Driver,在这里笔者需要说明说明的是,Task计算的任务 有可能成功,也有一些则不会 ,所以有了两类结果。即之前所说的是Task计算后的结果,一类则是Task计算失败需要返回的失败原因的结果。
5:这一步则是依靠CoarseGrainedExecutorBackend实体,把计算结果反馈给我们的Driver,以便于Driver本身的处理。