热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Spark源码剖析(二):任务分配(含源码)

SparkContext所做的准备在SparkContext(这里基于Spark的版本是1.3.1)主要做的工作是:1.创建SparkEnv,里面又一个很重要的对象ActorSys

SparkContext所做的准备

在SparkContext(这里基于Spark的版本是1.3.1)主要做的工作是:

1.创建SparkEnv,里面又一个很重要的对象ActorSystem

2.创建TaskScheduler,这里是根据提交的集群来创建相应的TaskScheduler

3.创建DAGScheduler

4.调用taskScheduler.start()方法启动

部分源码如下:

class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationClient {
//TODO 该方法创建了一个ActorSystem private[spark] def createSparkEnv(
conf: SparkConf,
isLocal: Boolean,
listenerBus: LiveListenerBus): SparkEnv = {
SparkEnv.createDriverEnv(conf, isLocal, listenerBus)
}
// Create and start the scheduler //TODO 创建一个TaskScheduler private[spark] var (schedulerBackend, taskScheduler) =
SparkContext.createTaskScheduler(this, master)
//TODO 通过ActorSystem创建了一个Actor,这个心跳是Executors和DriverActor的心跳 private val heartbeatReceiver = env.actorSystem.actorOf(
Props(new HeartbeatReceiver(taskScheduler)), "HeartbeatReceiver")
@volatile private[spark] var dagScheduler: DAGScheduler = _
try {
//TODO 创建了一个DAGScheduler,以后用来把DAG切分成Stage dagScheduler = new DAGScheduler(this)
} catch {
case e: Exception => {
try {
stop()
} finally {
throw new SparkException("Error while constructing DAGScheduler", e)
}
}
}
// start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's // constructor //TODO 启动taskScheduler taskScheduler.start()
}

SparkContext内的创建TaskScheduler内部流程

这方法是在SparkContext的构造函数中创建,下面是部分源码:

//TODO 根据提交任务时指定的URL创建相应的TaskScheduler
private def createTaskScheduler(sc: SparkContext,
master: String): (SchedulerBackend, TaskScheduler) =
{
//TODO spark的StandAlone模式
case SPARK_REGEX(sparkUrl) =>
//TODO 创建了一个TaskSchedulerImpl
val scheduler = new TaskSchedulerImpl(sc)
val masterUrls = sparkUrl.split(",").map("spark://" + _)
//TODO 创建了一个SparkDeploySchedulerBackend
val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls)
//TODO 调用initialize创建调度器
scheduler.initialize(backend)
(backend, scheduler)
}

这里根据相应的提交创建对应的TaskScheduler对象,然后创建一个SparkDeploySchedulerBacken对象,最后调用initialize()方法创建调度器,接下来我们来看一下创建调度器的内部方法发生了什么?

def initialize(backend: SchedulerBackend) {
this.backend = backend
// temporarily set rootPool name to empty
rootPool = new Pool("", schedulingMode, 0, 0)
schedulableBuilder = {
schedulingMode match {
//默认的调度模式是FIFO
case SchedulingMode.FIFO =>
new FIFOSchedulableBuilder(rootPool)
case SchedulingMode.FAIR =>
new FairSchedulableBuilder(rootPool, conf)
}
}
schedulableBuilder.buildPools()
}

可以看出,在内部方法里面选择的是调度模式,这里需要注意的是Spark默认的调度模式是FIFO.

启动TaskScheduler

在创建TaskScheduler方法以后,SparkContext构造器使用taskScheduler.start()启动任务调度器,接下来我们看一下内部发生了什么,这里需要注意的是,如果你看的并非Spark集群的实现,需要看的实现类是不不一样的;我在这里看的是TaskSchedulerImpl类。

private[spark] class TaskSchedulerImpl(
val sc: SparkContext,
val maxTaskFailures: Int,
isLocal: Boolean = false)
extends TaskScheduler with Logging
{
override def start() {
//TODO 首先掉用SparkDeploySchedulerBackend的start方法
backend.start()
if (!isLocal && conf.getBoolean("spark.speculation", false)) {
logInfo("Starting speculative execution thread")
import sc.env.actorSystem.dispatcher
sc.env.actorSystem.scheduler.schedule(SPECULATION_INTERVAL milliseconds,
SPECULATION_INTERVAL milliseconds) {
Utils.tryOrExit { checkSpeculatableTasks() }
}
}
}

这里首先调用SparkDeploySchedulerBackend的start()方法,我们继续追踪内部源码

override def start() {
//TODO 首先调用父类的start方法来创建DriverActor
super.start()
// The endpoint for executors to talk to us
//TODO 准备一些参数,以后把这些参数封装到一个对象中,然后将该对象发送给Master
val driverUrl = AkkaUtils.address(
AkkaUtils.protocol(actorSystem),
SparkEnv.driverActorSystemName,
conf.get("spark.driver.host"),
conf.get("spark.driver.port"),
CoarseGrainedSchedulerBackend.ACTOR_NAME)
val args = Seq(
"--driver-url", driverUrl,
"--executor-id", "{{EXECUTOR_ID}}",
"--hostname", "{{HOSTNAME}}",
"--cores", "{{CORES}}",
"--app-id", "{{APP_ID}}",
"--worker-url", "{{WORKER_URL}}")
val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions")
.map(Utils.splitCommandString).getOrElse(Seq.empty)
val classPathEntries = sc.conf.getOption("spark.executor.extraClassPath")
.map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil)
val libraryPathEntries = sc.conf.getOption("spark.executor.extraLibraryPath")
.map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil)
// When testing, expose the parent class path to the child. This is processed by
// compute-classpath.{cmd,sh} and makes all needed jars available to child processes
// when the assembly is built with the "*-provided" profiles enabled.
val testingClassPath =
if (sys.props.contains("spark.testing")) {
sys.props("java.class.path").split(java.io.File.pathSeparator).toSeq
} else {
Nil
}
// Start executors with a few necessary configs for registering with the scheduler
val sparkJavaOpts = Utils.sparkJavaOpts(conf, SparkConf.isExecutorStartupConf)
val javaOpts = sparkJavaOpts ++ extraJavaOpts
//TODO 重要:这个参数是以后Executor的实现类
val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend",
args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts)
val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("")
//TODO 把参数封装到ApplicationDescription
val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
appUIAddress, sc.eventLogDir, sc.eventLogCodec)
//TODO 创建一个AppClient把ApplicationDescription通过主构造器传进去
client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf)
//TODO 然后调用AppClient的start方法,在start方法中创建了一个ClientActor用于与Master通信
client.start()
waitForRegistration()
}

这个方法内部首先调用父类的start()方法,我们可以继续跟进,查看父类的start()方法内部,Spark干了些什么。这里我们需要看一下CoarseGrainedSchedulerBackend类的start()方法

override def start() {
val properties = new ArrayBuffer[(String, String)]
for ((key, value) <- scheduler.sc.conf.getAll) {
if (key.startsWith("spark.")) {
properties += ((key, value))
}
}
// TODO (prashant) send conf instead of properties
// TODO 通过ActorSystem创建DriverActor
driverActor = actorSystem.actorOf(
Props(new DriverActor(properties)), name = CoarseGrainedSchedulerBackend.ACTOR_NAME)
}

上面创建了一个DirverActor对象,这个对象是用于和Executor通信的,这里的ActorSystem是在SparkContext的SparkEnv创建的。

接下来,Spark需要封装一些我们设置的参数,具体如下:

//TODO 准备一些参数,以后把这些参数封装到一个对象中,然后将该对象发送给Master
val driverUrl = AkkaUtils.address(
AkkaUtils.protocol(actorSystem),
SparkEnv.driverActorSystemName,
conf.get("spark.driver.host"),
conf.get("spark.driver.port"),
CoarseGrainedSchedulerBackend.ACTOR_NAME)
val args = Seq(
"--driver-url", driverUrl,
"--executor-id", "{{EXECUTOR_ID}}",
"--hostname", "{{HOSTNAME}}",
"--cores", "{{CORES}}",
"--app-id", "{{APP_ID}}",
"--worker-url", "{{WORKER_URL}}")
val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions")
.map(Utils.splitCommandString).getOrElse(Seq.empty)
val classPathEntries = sc.conf.getOption("spark.executor.extraClassPath")
.map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil)
val libraryPathEntries = sc.conf.getOption("spark.executor.extraLibraryPath")
.map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil)
// When testing, expose the parent class path to the child. This is processed by
// compute-classpath.{cmd,sh} and makes all needed jars available to child processes
// when the assembly is built with the "*-provided" profiles enabled.
val testingClassPath =
if (sys.props.contains("spark.testing")) {
sys.props("java.class.path").split(java.io.File.pathSeparator).toSeq
} else {
Nil}

然后,需要封装一个很重要的参数,主要主要用于以后Worker进程启动Executor进程,具体如下:

val sparkJavaOpts = Utils.sparkJavaOpts(conf, SparkConf.isExecutorStartupConf)
val javaOpts = sparkJavaOpts ++ extraJavaOpts
//TODO 重要:这个参数是以后Executor的实现类
val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend",
args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts)
val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("")
//TODO 把参数封装到ApplicationDescription
val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
appUIAddress, sc.eventLogDir, sc.eventLogCodec)
//TODO 创建一个AppClient把ApplicationDescription通过主构造器传进去
client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf)
//TODO 然后调用AppClient的start方法,在start方法中创建了一个ClientActor用于与Master通信
client.start()

然后创建一个AppClient的对象,然后和通信,这里调用client.start()方法就是调用AppClient的生命周期方法。

接下来,我们将要从preStart()方法开始看起来:

//TODO ClientActor的生命周期方法
override def preStart() {
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
try {
//TODO ClientActor向Master注册
registerWithMaster()
} catch {
case e: Exception =>
logWarning("Failed to connect to master", e)
markDisconnected()
context.stop(self)
}
}

可以看出,这里主要是向Master注册,我们具体看一下注册方法的内部:

def registerWithMaster() {
//TODO 向Master注册
tryRegisterAllMasters()
import context.dispatcher
var retries = 0
registratiOnRetryTimer= Some {
context.system.scheduler.schedule(REGISTRATION_TIMEOUT, REGISTRATION_TIMEOUT) {
Utils.tryOrExit {
retries += 1
if (registered) {
registrationRetryTimer.foreach(_.cancel())
} else if (retries >= REGISTRATION_RETRIES) {
markDead("All masters are unresponsive! Giving up.")
} else {
tryRegisterAllMasters()
}
}
}
}
}
def tryRegisterAllMasters() {
for (masterAkkaUrl <- masterAkkaUrls) {
logInfo("Connecting to master " + masterAkkaUrl + "...")
//TODO 循环所有Master地址,跟Master建立连接
val actor = context.actorSelection(masterAkkaUrl)
//TODO 拿到了Master的一个引用,然后向Master发送注册应用的请求,所有的参数都封装到appDescription
actor ! RegisterApplication(appDescription)
}
}

主要的流程是:

1.向所有的Master发送注册信息,RegisterApplication(appDescription),RegisterApplication是一个用例类,appDescription是上述我们分析的封装参数

2.如果超过一定的次数,那么将会报错

下面,我们就需要看一下Master收到注册信息后,如何处理了!以下代码是Master类中

//TODO ClientActor发送过来的注册应用的消息
case RegisterApplication(description) => {
if (state == RecoveryState.STANDBY) {
// ignore, don't send response
} else {
logInfo("Registering app " + description.name)
//TODO 首先把应用的信息放到内存中存储
val app = createApplication(description, sender)
registerApplication(app)
logInfo("Registered app " + description.name + " with ID " + app.id)
//TODO 利用持久化引擎保存
persistenceEngine.addApplication(app)
//TODO Master向ClientActor发送注册成功的消息
sender ! RegisteredApplication(app.id, masterUrl)
//TODO 重要:Master开始调度资源,其实就是把任务启动到哪些Worker上
schedule()
}
}

这里面代码的主要流程是将我们提交的程序配置信息保存,并且持久化,这主要是保证高可用,然后在向Driver发送消息表示注册成功,然后调用schedule()方法,这是一个非常重要的方法。

这里有两种调度的方式,一种是尽量打散,一种是尽量集中。

//TODO 下面是两种调度方式,一中是尽量打散,另一种是尽量集中
// Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
// in the queue, then the second app, etc.
if (spreadOutApps) {
// Try to spread out each app among all the nodes, until it has all its cores
for (app <- waitingApps if app.coresLeft > 0) {
val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
.filter(canUse(app, _)).sortBy(_.coresFree).reverse
val numUsable = usableWorkers.length
val assigned = new Array[Int](numUsable) // Number of cores to give on each node
var toAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)
var pos = 0
while (toAssign > 0) {
if (usableWorkers(pos).coresFree - assigned(pos) > 0) {
toAssign -= 1
assigned(pos) += 1
}
pos = (pos + 1) % numUsable
}
// Now that we've decided how many cores to give on each node, let's actually give them
for (pos <- 0 until numUsable) {
if (assigned(pos) > 0) {
val exec = app.addExecutor(usableWorkers(pos), assigned(pos))
//TODO Master发送消息让Worker启动Executor
launchExecutor(usableWorkers(pos), exec)
app.state = ApplicationState.RUNNING
}
}
}
} else {
// Pack each app into as few nodes as possible until we've assigned all its cores
for (worker <- workers if worker.coresFree > 0 && worker.state == WorkerState.ALIVE) {
for (app <- waitingApps if app.coresLeft > 0) {
if (canUse(app, worker)) {
val coresToUse = math.min(worker.coresFree, app.coresLeft)
if (coresToUse > 0) {
val exec = app.addExecutor(worker, coresToUse)
//TODO Master发送消息让Worker启动Executor
launchExecutor(worker, exec)
app.state = ApplicationState.RUNNING
}
}
}
}
}
}

尽量打散方法的逻辑是:

1.首先进行对Worker进行筛选,通过是否存活,是否已经启动该应用程序,CPU核数大小倒叙排列

2.创建一个待分配资源的数组,这个数组大小的可用的Worker的数量

3.然后通过轮询的方法进行分配

尽量集中的方法的逻辑是:

1.首先是根据Worker的可用内存和是否已经启动该应用程序来筛选Worker

2.尽量将一个Worker的所有资源分配,如果一个Worker分配后还是不能满足,继续第二个Worker

3.然后再继续进行分配其他应用

启动Executor

在任务分配完成后,就是启动Executor

def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc) {
logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
//TODO 记录该Worker使用的资源
worker.addExecutor(exec)
//TODO Master发送消息给Worker,把参数通过case class传递给Worker,让他启动Executor,
worker.actor ! LaunchExecutor(masterUrl,
exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory)
//TODO Master向ClientActor发送消息,告诉它Executor已经启动了
exec.application.driver ! ExecutorAdded(
exec.id, worker.id, worker.hostPort, exec.cores, exec.memory)
}

主要的逻辑如下:

1.记录Worker使用的资源

2.将上述封装的信息发送给Worker,然后让Worker启动Executor

3.最后通知Driver,Executor已经启动了。


推荐阅读
  • 优化ListView性能
    本文深入探讨了如何通过多种技术手段优化ListView的性能,包括视图复用、ViewHolder模式、分批加载数据、图片优化及内存管理等。这些方法能够显著提升应用的响应速度和用户体验。 ... [详细]
  • Explore a common issue encountered when implementing an OAuth 1.0a API, specifically the inability to encode null objects and how to resolve it. ... [详细]
  • 本文详细介绍了Java中org.eclipse.ui.forms.widgets.ExpandableComposite类的addExpansionListener()方法,并提供了多个实际代码示例,帮助开发者更好地理解和使用该方法。这些示例来源于多个知名开源项目,具有很高的参考价值。 ... [详细]
  • 本文详细介绍了Akka中的BackoffSupervisor机制,探讨其在处理持久化失败和Actor重启时的应用。通过具体示例,展示了如何配置和使用BackoffSupervisor以实现更细粒度的异常处理。 ... [详细]
  • 深入解析JVM垃圾收集器
    本文基于《深入理解Java虚拟机:JVM高级特性与最佳实践》第二版,详细探讨了JVM中不同类型的垃圾收集器及其工作原理。通过介绍各种垃圾收集器的特性和应用场景,帮助读者更好地理解和优化JVM内存管理。 ... [详细]
  • 深入解析Android自定义View面试题
    本文探讨了Android Launcher开发中自定义View的重要性,并通过一道经典的面试题,帮助开发者更好地理解自定义View的实现细节。文章不仅涵盖了基础知识,还提供了实际操作建议。 ... [详细]
  • 本文详细介绍了 GWT 中 PopupPanel 类的 onKeyDownPreview 方法,提供了多个代码示例及应用场景,帮助开发者更好地理解和使用该方法。 ... [详细]
  • 本文介绍了Java并发库中的阻塞队列(BlockingQueue)及其典型应用场景。通过具体实例,展示了如何利用LinkedBlockingQueue实现线程间高效、安全的数据传递,并结合线程池和原子类优化性能。 ... [详细]
  • 1.如何在运行状态查看源代码?查看函数的源代码,我们通常会使用IDE来完成。比如在PyCharm中,你可以Ctrl+鼠标点击进入函数的源代码。那如果没有IDE呢?当我们想使用一个函 ... [详细]
  • 主要用了2个类来实现的,话不多说,直接看运行结果,然后在奉上源代码1.Index.javaimportjava.awt.Color;im ... [详细]
  • 本文详细介绍了Java中org.neo4j.helpers.collection.Iterators.single()方法的功能、使用场景及代码示例,帮助开发者更好地理解和应用该方法。 ... [详细]
  • 本文将介绍如何编写一些有趣的VBScript脚本,这些脚本可以在朋友之间进行无害的恶作剧。通过简单的代码示例,帮助您了解VBScript的基本语法和功能。 ... [详细]
  • Explore how Matterverse is redefining the metaverse experience, creating immersive and meaningful virtual environments that foster genuine connections and economic opportunities. ... [详细]
  • 本文介绍如何使用Objective-C结合dispatch库进行并发编程,以提高素数计数任务的效率。通过对比纯C代码与引入并发机制后的代码,展示dispatch库的强大功能。 ... [详细]
  • 深入解析Spring Cloud Ribbon负载均衡机制
    本文详细介绍了Spring Cloud中的Ribbon组件如何实现服务调用的负载均衡。通过分析其工作原理、源码结构及配置方式,帮助读者理解Ribbon在分布式系统中的重要作用。 ... [详细]
author-avatar
淡漠初夏0_176
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有