作者:BB15107669916 | 来源:互联网 | 2023-10-10 19:45
一、SparkContext 初始化
上次阅读到 Master 通知 Worker 启动了一个 Driver,就是启动了一个 JVM,并且开始使用反射的方式执行 DriverWrapper 的 main 方法。
假设我们提交了一个 JavaWordCount 程序到集群上,其实接下来就开始执行这个方法:
![](https://img8.php1.cn/3cdc5/1e70e/ae9/94965d9709f43be1.png)
然后开始创建 SparkContext
SparkSession spark = SparkSession.builder().appName("JavaWordCount").getOrCreate();
点进去,可以看到:
SparkContext.getOrCreate(sparkConf)
![](https://img8.php1.cn/3cdc5/1e70e/ae9/b752346cb05416e2.png)
这里 new 了一个 SparkContext(),然后开始看 SparkContext 的构造方法,这个方法尤其的长,但我们得抓住重点。
重点就在 500 多行的这里,创建了 SchedulerBackend,TaskScheduler,DAGScheduler
![](https://img8.php1.cn/3cdc5/1e70e/ae9/e2b5ae90f2aa01a9.png)
下面简单的介绍下这三个核心组件的作用:
(1)TaskScheduler
TaskScheduler 是一个 trait,它的核心任务是提交 TaskSet 到集群并汇报结果,并且向 DAGScheduler 汇报任务的执行情况。
主要实现类是 TaskSchedulerImpl
(2)SchedulerBackend
SchedulerBackend 也是一个 trait,不同的部署模式有不同的实现,比如在 standalone 模式下,实现类是:StandaloneSchedulerBackend。
它内部有两个 Endpoint 组件,一个是 DriverEndpoint ,用来和 Worker 打交道,比如创建 Executor、提交 Task 等;一个是 ClientEndpoint 用来和 master 打交道。
SchedulerBackend 专门负责收集 Worker 上的资源信息,它知道每个任务在提交时,自己拥有多少资源,然后去具体运行 Task。
(3)DAGScheduler
DAGScheduler 的主要作用是:当执行 action 算子的时候,DAGScheduler 会从最后一个 rdd 开始往前递归寻找 ShuffleDependency,每找到一个就会划分一个 stage,最终达到的目的是,根据业务逻辑算子依赖关系,划分成一个个 stage,并且根据 stage 先后关系,把 stage 转化成 ShuffleMapTask 或者 ResultTask,封装成 TaskSet,交给 TaskScheduler 提交。
最后要注意的是,DAGScheduler 和 SchedulerBackend 都是 TaskSchedulerImpl 的成员变量。
二、TaskScheduler 的创建
下面回到创建 TaskScheduler 的地方:
val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
这个方法中,主要是根据不同的部署模式,分别创建不同的 TaskScheduler 和 SchedulerBackend。
在 Standalone 模式下:
![](https://img8.php1.cn/3cdc5/1e70e/ae9/794ff63ab515f85f.png)
创建了 TaskSchedulerImpl 对象和 StandaloneSchedulerBackend 对象。
在 TaskSchedulerImpl 的构造方法中,看到了两个重要的属性,dagScheduler 和 backend,其他倒没有什么逻辑了。
![](https://img8.php1.cn/3cdc5/1e70e/ae9/58e5d3681882a272.png)
然后看到 StandaloneSchedulerBackend 的构造方法
这里要注意一下继承关系,继承了 CoarseGrainedSchedulerBackend
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-GwvcTFpp-1637895026068)(C:/Users/wangkai/AppData/Roaming/Typora/typora-user-images/image-20210927222927252.png)]
然后父类里有一个很重要的属性,在执行构造的方法的时候,就会执行这个
![image-20210927223019809](https://img8.php1.cn/3cdc5/1e70e/ae9/283311456313789b.png)
也就是说,在这句代码执行的时候:
val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)
其实,就创建了一个 DriverEndpoint 组件了,那么紧接着就需要看一下 DriverEndpoint 类的 onStart() 方法:
每间隔一段时间,就给自己发送了一个 ReviveOffers 消息
![image-20210927223246758](https://img8.php1.cn/3cdc5/1e70e/ae9/f414f64b1f2ceb00.png)
这个消息的处理逻辑就很长了,需要单独一次来阅读了。可以一句话总结它干的事情:过滤出所有的资源,找到所有需要启动的 TaskSet,分配资源,启动任务。
然后再回来看 StandaloneSchedulerBackend 的构造方法,构造方法里没有逻辑,接着回到 SparkContext 往下看。
接下来,scheduler 调用了 initialize 方法:
scheduler.initialize(backend)
这个方法就把当前的 SchedulerBackend 绑定到自己的成员属性上:
![](https://img8.php1.cn/3cdc5/1e70e/ae9/c49f198501917335.png)
接着 new 了 DAGScheduler
_dagScheduler = new DAGScheduler(this)
然后启动 TaskScheduler()
_taskScheduler.start()
三、TaskScheduler 的启动
首先看到实现类 TaskSchedulerImpl 类的 start() 方法
调动了内部的 SchedulerBackend 的启动方法:
![](https://img8.php1.cn/3cdc5/1e70e/ae9/35e375d7dc344234.png)
这个方法中,首先封装了一个 CoarseGrainedExecutorBackend 类的启动命令
![](https://img8.php1.cn/3cdc5/1e70e/ae9/9c155664178b6a58.png)
把这个 Command 封装到 ApplicationDescription,应用程序的描述信息中
![](https://img8.php1.cn/3cdc5/1e70e/ae9/a2e798a301e28682.png)
然后把 ApplicationDescription 作为成员变量,并且还创建了一个 StandaloneAppClient 对象
![image-20210927224615633](https://img8.php1.cn/3cdc5/1e70e/ae9/2f50fd93d37e2286.png)
然后给启动了起来,new 了一个 ClientEndpoint() ,注意这是一个 Endpoint
![image-20210927224712411](https://img8.php1.cn/3cdc5/1e70e/ae9/c962e8c84ef7a914.png)
看一下它的 onStart() 方法,向 Master 注册
![](https://img8.php1.cn/3cdc5/1e70e/ae9/dd8df6cc89d1f1bf.png)
最终是给 Master 发送了一个 RegisterApplication 消息,消息里有 ApplicationDescription(应用程序的描述信息)
![](https://img8.php1.cn/3cdc5/1e70e/ae9/2984bb531fe20c34.png)
四、总结
这一切都做完之后,SparkContext() 最核心的地方就阅读完了,总结一下就是创建了 DAGScheduler、TaskScheduler、SchedulerBackend 三大组件。并且 SchedulerBackend 内部有两个 Endpoint,DriverEndpoint 和 ClientEndpoint,分别从来和 Worker 通信 、和 Master 通信。
下次我们来阅读 Master 处理 Driver 的注册应用程序的消息。
下面是目前的流程图:
![](https://img8.php1.cn/3cdc5/1e70e/ae9/eebfd4e0d77c2352.png)