热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Spark源码(7)SparkContext初始化源码分析

一、SparkContext初始化上次阅读到Master通知Worker启动了一个Driver,就是启动了一个JVM,并且开始使用反射的方式执行Dri

一、SparkContext 初始化

上次阅读到 Master 通知 Worker 启动了一个 Driver,就是启动了一个 JVM,并且开始使用反射的方式执行 DriverWrapper 的 main 方法。

假设我们提交了一个 JavaWordCount 程序到集群上,其实接下来就开始执行这个方法:

然后开始创建 SparkContext

SparkSession spark = SparkSession.builder().appName("JavaWordCount").getOrCreate();

点进去,可以看到:

SparkContext.getOrCreate(sparkConf)

这里 new 了一个 SparkContext(),然后开始看 SparkContext 的构造方法,这个方法尤其的长,但我们得抓住重点。

重点就在 500 多行的这里,创建了 SchedulerBackend,TaskScheduler,DAGScheduler

下面简单的介绍下这三个核心组件的作用:

(1)TaskScheduler

TaskScheduler 是一个 trait,它的核心任务是提交 TaskSet 到集群并汇报结果,并且向 DAGScheduler 汇报任务的执行情况。

主要实现类是 TaskSchedulerImpl

(2)SchedulerBackend

SchedulerBackend 也是一个 trait,不同的部署模式有不同的实现,比如在 standalone 模式下,实现类是:StandaloneSchedulerBackend。

它内部有两个 Endpoint 组件,一个是 DriverEndpoint ,用来和 Worker 打交道,比如创建 Executor、提交 Task 等;一个是 ClientEndpoint 用来和 master 打交道。

SchedulerBackend 专门负责收集 Worker 上的资源信息,它知道每个任务在提交时,自己拥有多少资源,然后去具体运行 Task。

(3)DAGScheduler

DAGScheduler 的主要作用是:当执行 action 算子的时候,DAGScheduler 会从最后一个 rdd 开始往前递归寻找 ShuffleDependency,每找到一个就会划分一个 stage,最终达到的目的是,根据业务逻辑算子依赖关系,划分成一个个 stage,并且根据 stage 先后关系,把 stage 转化成 ShuffleMapTask 或者 ResultTask,封装成 TaskSet,交给 TaskScheduler 提交。

最后要注意的是,DAGScheduler 和 SchedulerBackend 都是 TaskSchedulerImpl 的成员变量。


二、TaskScheduler 的创建

下面回到创建 TaskScheduler 的地方:

val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)

这个方法中,主要是根据不同的部署模式,分别创建不同的 TaskScheduler 和 SchedulerBackend。

在 Standalone 模式下:

创建了 TaskSchedulerImpl 对象和 StandaloneSchedulerBackend 对象。

在 TaskSchedulerImpl 的构造方法中,看到了两个重要的属性,dagScheduler 和 backend,其他倒没有什么逻辑了。

然后看到 StandaloneSchedulerBackend 的构造方法

这里要注意一下继承关系,继承了 CoarseGrainedSchedulerBackend

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-GwvcTFpp-1637895026068)(C:/Users/wangkai/AppData/Roaming/Typora/typora-user-images/image-20210927222927252.png)]

然后父类里有一个很重要的属性,在执行构造的方法的时候,就会执行这个

image-20210927223019809

也就是说,在这句代码执行的时候:

val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)

其实,就创建了一个 DriverEndpoint 组件了,那么紧接着就需要看一下 DriverEndpoint 类的 onStart() 方法:

每间隔一段时间,就给自己发送了一个 ReviveOffers 消息

image-20210927223246758

这个消息的处理逻辑就很长了,需要单独一次来阅读了。可以一句话总结它干的事情:过滤出所有的资源,找到所有需要启动的 TaskSet,分配资源,启动任务。

然后再回来看 StandaloneSchedulerBackend 的构造方法,构造方法里没有逻辑,接着回到 SparkContext 往下看。

接下来,scheduler 调用了 initialize 方法:

scheduler.initialize(backend)

这个方法就把当前的 SchedulerBackend 绑定到自己的成员属性上:

接着 new 了 DAGScheduler

_dagScheduler = new DAGScheduler(this)

然后启动 TaskScheduler()

_taskScheduler.start()

三、TaskScheduler 的启动

首先看到实现类 TaskSchedulerImpl 类的 start() 方法

调动了内部的 SchedulerBackend 的启动方法:

这个方法中,首先封装了一个 CoarseGrainedExecutorBackend 类的启动命令

把这个 Command 封装到 ApplicationDescription,应用程序的描述信息中

然后把 ApplicationDescription 作为成员变量,并且还创建了一个 StandaloneAppClient 对象

image-20210927224615633

然后给启动了起来,new 了一个 ClientEndpoint() ,注意这是一个 Endpoint

image-20210927224712411

看一下它的 onStart() 方法,向 Master 注册

最终是给 Master 发送了一个 RegisterApplication 消息,消息里有 ApplicationDescription(应用程序的描述信息)


四、总结

这一切都做完之后,SparkContext() 最核心的地方就阅读完了,总结一下就是创建了 DAGScheduler、TaskScheduler、SchedulerBackend 三大组件。并且 SchedulerBackend 内部有两个 Endpoint,DriverEndpoint 和 ClientEndpoint,分别从来和 Worker 通信 、和 Master 通信。

下次我们来阅读 Master 处理 Driver 的注册应用程序的消息。

下面是目前的流程图:


推荐阅读
author-avatar
BB15107669916
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有