本节课内容:
1. TaskScheduler工作原理
2. TaskScheduler源码
总体调度图:
通过前几节课的讲解,RDD和DAGScheduler以及Worker都已有深入的讲解,这节课我们主要讲解TaskScheduler的运行原理。
回顾:
DAGScheduler面向整个Job划分多个Stage,划分是从后往前的回溯过程;运行时从前往后运行的。每个Stage中有很多任务Task,Task是可以并行执行的。它们的执行逻辑完全相同的,只不过是处理的数据不同而已,DAGScheduler通过TaskSet的方式,把其构造的所有Task提交给底层调度器TaskScheduler。
& TaskScheduler是一个trait,与具体的资源调度解耦合,这符合面向对象中依赖抽象不依赖具体的原则,带来底层资源调度器的可插拔性,导致Spark可以运行的众多的资源调度模式上,例如:StandAlone、Yarn、Mesos、Local、EC2或者其他自定义的资源调度器。
在StandAlone模式下,我们来看看TaskScheduler的一个实现TaskSchedulerImpl。
TaskScheduler的核心任务是提交TaskSet到集群并汇报结果,主要负责Application的不同Job之间的调度。
具体来讲有以下几点:
(1) 为TaskSet创建和维护一个TaskSetManager并追踪任务的本地性以及错误信息;
(2) Task执行失败时启动重试机制,以及遇到Straggle任务会在其他节点上启动备份任务;
(3) 向DAGScheduler汇报执行情况,包括在shuffle输出丢失的时候报告fetch failed错误等信息。
(1)注册当前程序。
& TaskScheduler内部会握有SchedulerBackend引用,SchedulerBackend是一个trait,它主要负责管理Executor资源,从StandAlone模式来讲,具体实现是SparkDeploySchedulerBackend。SparkDeploySchedulerBackend在启动时会构造AppClient实例并在该实例start的时候启动ClientEndpoint(消息循环体),ClientEndpoint在启动时会向Master注册当前程序。
(1) 注册Executor信息。
& SparkDeploySchedulerBackend的父类CoarseGrainedSchedulerBackend会在Start的时候实例化一个类型DriverEndpoint的消息循环体。DriverEndpoint就是我们程序运行时的Driver对象。SparkDeploySchedulerBackend是专门给来负责收集Worker上的资源信息,当ExecutorBackend启动的时候会发送RegisteredExecutor信息向Driver中的DriverBackend进行注册。(可以参考前几讲的Master注册部分。)此时SparkDeploySchedulerBackend就掌握了当前应用应用程序所拥有的计算资源,TaskScheduler就是通过SparkDeploySchedulerBackend拥有的计算资源来具体运行Task。
& 补充:SparkContext、DAGScheduler、TaskSchedulerImpl、SparkDeploySchedulerBackend在应用程序启动的时候值实例化一次,应用程序存在期间始终存在这些对象。SparkDeploySchedulerBackend是一个辅助类,主要是帮助TaskSchedulerImpl中的Task获取计算资源和发送Task到集群中
TaskScheduler是在SparkContext实例化时进行实例化的,如TaskSchedulerImpl的实例化。(Spark1.6.0 SparkContext.scala #521-#526)
// Create and start the scheduler
val (sched, ts) = SparkContext.createTaskScheduler(this, master)
_schedulerBackend = sched
_taskScheduler = ts
_dagScheduler = new DAGScheduler(this)
_heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)
private def createTaskScheduler(
sc: SparkContext,
master: String): (SchedulerBackend, TaskScheduler) = {
import SparkMasterRegex._
//省略部分代码
case SPARK_REGEX(sparkUrl) =>
val scheduler = new TaskSchedulerImpl(sc)
val masterUrls = sparkUrl.split(",").map("spark://" + _)
val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls)
//利用SparkDeploySchedulerBackend来初始化TaskScheduler
scheduler.initialize(backend) //1
(backend, scheduler)
}
//1被处调用
def initialize(backend: SchedulerBackend) {
this.backend = backend
// temporarily set rootPool name to empty
rootPool = new Pool("", schedulingMode, 0, 0) //2
//根据rootPool中的算法创建可调度对象
schedulableBuilder = {
schedulingMode match {
case SchedulingMode.FIFO =>
//FIFO模式
new FIFOSchedulableBuilder(rootPool)
case SchedulingMode.FAIR =>
//Fair模式
new FairSchedulableBuilder(rootPool, conf)
}
}
//创建调度池
schedulableBuilder.buildPools()
}创建调度池
(1)创建rootPool(实现调度算法)//2处被调用
private[spark] class Pool(
val poolName: String,
val schedulingMode: SchedulingMode,
initMinShare: Int,
initWeight: Int)
extends Schedulable
with Logging {
val schedulableQueue = new ConcurrentLinkedQueue[Schedulable]
val schedulableNameToSchedulable = new ConcurrentHashMap[String, Schedulable]
var weight = initWeight
var minShare = initMinShare
var runningTasks = 0
var priority = 0
// A pool's stage id is used to break the tie in scheduling.
var stageId = -1
var name = poolName
var parent: Pool = null
//根据不同的调度算法创建调度算法的实例
var taskSetSchedulingAlgorithm: SchedulingAlgorithm = {
schedulingMode match {
case SchedulingMode.FAIR =>
new FairSchedulingAlgorithm()
case SchedulingMode.FIFO =>
new FIFOSchedulingAlgorithm()
}
}(2)创建可调度对象
Org.apache.spark.scheduler.Pool包含了一组可以调度的实体。对于FIFO来说,rootPool包含了一组TaskSetManager;而对于FAIR来说,rootPool包含了一组Pool,这些Pool构成了一颗调度树,其中这棵树的叶子节点就是TaskSetManager。
(3)创建调度池
schedulableBuilder.buildPools()因调度方式而异,如果是FIFO,它的实现是空的如下:
private[spark] class FIFOSchedulableBuilder(val rootPool: Pool)因为rootPool并没有包含Pool,而是直接包含TaskSetManager:submitTasks直接将TaskSetManager添加到rootPool(调度队列,队列默认是先入先出)即可。
extends SchedulableBuilder with Logging {
override def buildPools() {
// nothing
}
//定义了如何将TaskSetManager加入到调度池中
override def addTaskSetManager(manager: Schedulable, properties: Properties) {
rootPool.addSchedulable(manager) //3
}
}
//将可调度对象加入到调度队列 3处被调用
override def addSchedulable(schedulable: Schedulable) {
require(schedulable != null)
schedulableQueue.add(schedulable)
schedulableNameToSchedulable.put(schedulable.name, schedulable)
schedulable.parent = this
}
而FAIR模式则需要在运行前先进行一定的配置。它需要在rootPool的基础上根据这个配置文件来构建这颗调度树。
具体实现见如下代码:override def buildPools() {
var is: Option[InputStream] = None
try {
is = Option {
//以spark.scheduler.allocation.file设置的文件名字来创建FileInputStream
schedulerAllocFile.map { f =>
new FileInputStream(f)
}.getOrElse {
//若spark.Scheduler.allocation.file没有设置,则直接以fairscheduler.xml创建
//FileInputStream
Utils.getSparkClassLoader.getResourceAsStream(DEFAULT_SCHEDULER_FILE)
}
}
//以is对应的内容创建Pool
is.foreach { i => buildFairSchedulerPool(i) }
} finally {
is.foreach(_.close())
}
创建名为“default”的Pool
buildDefaultPool()
}
(4)调度算法
private[spark] traitSchedulingAlgorithm {
def comparator(s1: Schedulable, s2:Schedulable): Boolean
}
从代码来看调度算法是一个trait,需要子类实现。其实质就是封装了一个比较函数。子类只需实现这个比较函数即可。
(a)FIFO
采用FIFO任务调度的顺序:
首先要保证JobID较小的先被调度,如果是同一个Job,那么StageID小的先被调度(同一个Job,可能多个Stage可以并行执行,比如Stage划分过程中Stage0和Stage1)。
调度算法:
private[spark] class FIFOSchedulingAlgorithm extends SchedulingAlgorithm {(2)FAIR
//比较可调度对象s1与s2,这里s1与s2其实就是TaskSetManager。
override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {
val priority1 = s1.priority //这个priority实际上就是Job ID
val priority2 = s2.priority //同上
var res = math.signum(priority1 - priority2) //首先比较Job ID
if (res == 0) { //如果Job ID相同,那么比较Stage ID
val stageId1 = s1.stageId
val stageId2 = s2.stageId
res = math.signum(stageId1 - stageId2)
}
if (res <0) {
true
} else {
false
}
}
}
对于FAIR,首先是挂在rootPool下面的pool先确定调度顺序,然后在每个pool内部使用相同的算法来确定TaskSetManager的调度顺序。
算法实现:
private[spark] class FairSchedulingAlgorithm extends SchedulingAlgorithm {
override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {
//最小共享,可以理解为执行需要的最小资源即CPU核数,其他相同时,所需最小核数小的优先
//调度
val minShare1 = s1.minShare
val minShare2 = s2.minShare
//运行的任务的数量
val runningTasks1 = s1.runningTasks
val runningTasks2 = s2.runningTasks
//查看是否有调度队列处于饥饿状态,看可分配的核数是否少于任务数,如果资源不够用,那么
//处于挨饿状态
val s1Needy = runningTasks1val s2Needy = runningTasks2 //计算ShareRatio, 最小资源占用比例,这里可以理解为偏向任务较轻的
val minShareRatio1 = runningTasks1.toDouble / math.max(minShare1, 1.0).toDouble
val minShareRatio2 = runningTasks2.toDouble / math.max(minShare2, 1.0).toDouble
//计算Task的Weight比重即权重,任务数相同,权重高的优先
val taskToWeightRatio1 = runningTasks1.toDouble / s1.weight.toDouble
val taskToWeightRatio2 = runningTasks2.toDouble / s2.weight.toDouble
var compare: Int = 0
//首先处于饥饿优先
if (s1Needy && !s2Needy) {
return true
} else if (!s1Needy && s2Needy) {
return false
} else if (s1Needy && s2Needy) {
//都处于挨饿状态则,需要资源占用比小的优先
compare = minShareRatio1.compareTo(minShareRatio2)
} else {
//都不挨饿,则比较权重比,比例低的优先
compare = taskToWeightRatio1.compareTo(taskToWeightRatio2)
}
if (compare <0) {
true
} else if (compare > 0) {
false
} else {
//如果都一样,那么比较名字,按照字母顺序比较,所以名字比较重要
s1.name}
}
}
注:
& 公平原则本着的原则就是谁最需要就给谁,所以挨饿者优先;
& 资源占用比这块有点费解,如果把他理解成一个贪心问题就容易理解了。对于都是出于挨饿状态的任务可以这样理解,负载大的即时给你资源你也不一定能有效缓解,莫不如给负载小的,让其快速使用,完成后可以释放更多的资源,这是一种贪心策略。如JobA和JobB的Task数量相同都是10,A的minShare是2,B的是5,那占用比为5和2,显然B的占用比更小,贪心的策略应该给B先调度处理;
& 对于都处于满足状态的,当然谁的权重有着更好的决定性,权重比低得优先(偏向权利大的);
& 如果所有上述的比较都相同,那么名字字典排序靠前的优先(哈哈,名字很重要哦);名字aaa要比abc优先,所以这里在给Pool或者TaskSetManager起名字的时候要考虑这一点
(备注来源:https://yq.aliyun.com/articles/6041)
补充:这两种调度的排序算法针对的可比较对象都是Schedule的具体对象,这里我们对这个对象Schedulable做简单的解释。
前面讲到,Schedulable可调度对象在Spark有两种形式:Pool和TaskSetManager。Pool是一个调度池,Pool里面还可以有子Pool,Spark中的rootPool即根节点默认是一个无名(default)的Pool。对于FIFO和FAIR有不同的层次。
对于FIFO模式的调度,rootPool管理的直接就是TaskSetManager,没有子Pool这个概念,就只有两层,rootPool和叶子节点TaskSetManager,实现如下所示。
但对于FAIR这种模式来说,是三层的,根节点是rootPool,为无名Pool,下一层为用户定义的Pool(不指定名称默认名称为default),再下一层才是TaskSetManager,即根调度池管理一组调度池,每个调度池管理自己的TaskSetManager,其实现如下所示。
这里的调度顺序是指在一个SparkContext之内的调度,一般情况下我们自行使用是不太会需要Pool这个概念的,因为不存在Pool之间的竞争,但如果我们提供一个Spark应用,大家都可以提交任务,服务端有一个常驻的任务,对应一个SparkContext,每个用户提交的任务都由其代理执行,那么针对每个用户提交的任务可以按照用户等级和任务优先级设置一个Pool,这样不同的用户的Pool之间就存在竞争关系了,可以用Pool的优先级来区分任务和用户的优先级了,**但要再强调一点名字很重要,因为FAIR机制中,如果其他比较无法判断,那么会按照名字来进行字典排序的**。(补充来源:https://yq.aliyun.com/articles/6041)
//SparkContext.scala
525) _dagScheduler = new DAGScheduler(this)
526) _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)
527)
528) // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
529) // constructor
530) _taskScheduler.start()
//TaskSchedulerImpl.scala
override def start() {
//启动SparkDeploySchedulerBackend
backend.start()
if (!isLocal && conf.getBoolean("spark.speculation", false)) {
logInfo("Starting speculative execution thread")
speculationScheduler.scheduleAtFixedRate(new Runnable {
override def run(): Unit = Utils.tryOrStopSparkContext(sc) {
checkSpeculatableTasks()
}
}, SPECULATION_INTERVAL_MS, SPECULATION_INTERVAL_MS, TimeUnit.MILLISECONDS)
}
}
override def start() {在client#start方法中最终会注册应用程序。
super.start()
launcherBackend.connect()
// The endpoint for executors to talk to us
val driverUrl = rpcEnv.uriOf(SparkEnv.driverActorSystemName,
RpcAddress(sc.conf.get("spark.driver.host"), sc.conf.get("spark.driver.port").toInt),
CoarseGrainedSchedulerBackend.ENDPOINT_NAME)
val args = Seq(
"--driver-url", driverUrl,
"--executor-id", "{{EXECUTOR_ID}}",
"--hostname", "{{HOSTNAME}}",
"--cores", "{{CORES}}",
"--app-id", "{{APP_ID}}",
"--worker-url", "{{WORKER_URL}}")
val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions")
.map(Utils.splitCommandString).getOrElse(Seq.empty)
val classPathEntries = sc.conf.getOption("spark.executor.extraClassPath")
.map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil)
val libraryPathEntries = sc.conf.getOption("spark.executor.extraLibraryPath")
// Start executors with a few necessary configs for registering with the scheduler
val sparkJavaOpts = Utils.sparkJavaOpts(conf, SparkConf.isExecutorStartupConf)
val javaOpts = sparkJavaOpts ++ extraJavaOpts
//定义分配资源的进程名称
val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend",
args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts)
.map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil)
//省略部分代码,详细内容参见前几节,Executor注册过程。
client = new AppClient(sc.env.rpcEnv, masters, appDesc, this, conf)
//注册应用程序
client.start()
launcherBackend.setState(SparkAppHandle.State.SUBMITTED)
waitForRegistration()
launcherBackend.setState(SparkAppHandle.State.RUNNING)
}
Task运行各个阶段交互过程
图35-6 资源分配过程
备注:有关FIFO、FAIR调度算法解析部分参考 张安站 --Spark技术内幕一书
-----------------------------------------EOF---------------------------------------------------------------------------------------------------------