TaskScheduler是一个接口,DAGScheduler在提交TaskSet给底层调度器的时候是面向接口TaskScheduler。TaskSchduler的核心任务是提交Taskset到集群运算并汇报结果
源码分析
第一步:TaskScheduler 提交tasks的入口 submitTasks
源码地址:org.apache.spark.scheduler.TaskSchedulerImpl.scala
/**
* TaskScheduler提交任务入口
*/
override def submitTasks(taskSet: TaskSet) {
val tasks = taskSet.tasks
logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
this.synchronized {
/**
* 为每一个taskSet创建一个taskSetManager
* taskSetManager实际上,会负责它的TaskSet执行状况的监视与管理
* 当tasks失败的时候重试task,(直到超过重试次数限制)
*/
val manager = createTaskSetManager(taskSet, maxTaskFailures)
val stage = taskSet.stageId
// 构建一个
val stageTaskSets =
taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])
// 将这个创建TaskSetManager放入到映射中
stageTaskSets(taskSet.stageAttemptId) = manager
val cOnflictingTaskSet= stageTaskSets.exists {
case (_, ts) =>
ts.taskSet != taskSet && !ts.isZombie
}
if (conflictingTaskSet) {
throw new IllegalStateException(s"more than one active taskSet for stage $stage:" +
s" ${stageTaskSets.toSeq.map { _._2.taskSet.id }.mkString(",")}")
}
/**
* 申请任务调度,有FIFO和FAIR两种策略
* 根据executor的空闲资源状态及locality策略将task分配给executor。
* 调度的数据结构封装为Pool类,
*
* 对于FIFO,Pool就是TaskSetManager的队列
* 对于Fair,则是TaskSetManager组成的树
*
* Pool维护TaskSet的优先级,等待executor接受资源offer(resourceOffer)的时候出列并提交executor计算
*
*/
schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
// 不是本地且没有接收task,启动一个timer定时调度,如果一直没有task就警告,直到有task
if (!isLocal && !hasReceivedTask) {
starvationTimer.scheduleAtFixedRate(new TimerTask() {
override def run() {
if (!hasLaunchedTask) {
logWarning("Initial job has not accepted any resources; " +
"check your cluster UI to ensure that workers are registered " +
"and have sufficient resources")
} else {
this.cancel()
}
}
}, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)
}
hasReceivedTask = true
}
/**
* 创建TaskScheduler的时候,一件非常重要的事情就是 TaskSchedulerImpl创建一个StandaloneSchedulerBackend,
* 这里的backend就是StandaloneSchedulerBackend(executor启动后会反向注册到StandaloneSchedulerBackend)
* 而且这个backend是负责创建AppClient,向Master注册Application的
*/
backend.reviveOffers()
}
TaskSetManager功能:在TaskSchedulerImpl中,对一个单独TaskSet的任务进行调度。这个类负责追踪每一个task,如果task失败话会负责重试task,直到超过重试次数的限制,并且通过延迟调度,为这个TaskSet处理本地化调度机制,主要接口是resourceOffers在这个接口中TaskSet会希望在一个节点上运行一个任务,并且接受任务的状态改变消息,来知道它负责的task的状态改变了
第二步:backend.reviveOffers()方法(StandaloneSchedulerBackend父类CoarseGrainedSchedulerBackend中)
源码地址:org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.scala
override def reviveOffers() {
driverEndpoint.send(ReviveOffers)
}
var driverEndpoint: RpcEndpointRef = null
protected def minRegisteredRatio: Double = _minRegisteredRatio
override def start() {
val properties = new ArrayBuffer[(String, String)]
for ((key, value) <- scheduler.sc.conf.getAll) {
if (key.startsWith("spark.")) {
properties += ((key, value))
}
}
// TODO (prashant) send conf instead of properties
driverEndpoint = createDriverEndpointRef(properties)
}
protected def createDriverEndpointRef(
properties: ArrayBuffer[(String, String)]): RpcEndpointRef = {
rpcEnv.setupEndpoint(ENDPOINT_NAME, createDriverEndpoint(properties))
}
protected def createDriverEndpoint(properties: Seq[(String, String)]): DriverEndpoint = {
new DriverEndpoint(rpcEnv, properties)
}
第三步:DriverEndpoint类中receive()方法
override def receive: PartialFunction[Any, Unit] = {
// 如果接收StatusUpdate消息,用于状态更新
case StatusUpdate(executorId, taskId, state, data) =>
// 调用TaskSchedulerImpl#statusUpdate进行更新
scheduler.statusUpdate(taskId, state, data.value)
// 如果Task处于完成状态
if (TaskState.isFinished(state)) {
// 通过executor id获取ExecutorData
executorDataMap.get(executorId) match {
// 如果存在数据
case Some(executorInfo) =>
// 则更新executor的cpu核数
executorInfo.freeCores += scheduler.CPUS_PER_TASK
// 获取集群中可用的executor列表,发起task
makeOffers(executorId)
case NOne=>
// Ignoring the update since we don't know about the executor.
logWarning(s"Ignored task status update ($taskId state $state) " +
s"from unknown executor with ID $executorId")
}
}
// 如果发送ReviveOffers消息
case ReviveOffers =>
// 获取集群中可用的executor列表,发起task
makeOffers()
// 如果是KillTask消息,表示kill掉这个task
case KillTask(taskId, executorId, interruptThread, reason) =>
executorDataMap.get(executorId) match {
// 向Executor发送KillTask的消息
case Some(executorInfo) =>
executorInfo.executorEndpoint.send(
KillTask(taskId, executorId, interruptThread, reason))
case NOne=>
// Ignoring the task kill since the executor is not registered.
logWarning(s"Attempted to kill task $taskId for unknown executor $executorId.")
}
......
}
第三步:makeOffers()方法
//makeOffers获取有效的executor,开始发起任务
private def makeOffers() {
// Make sure no executor is killed while some task is launching on it
/**
* 1,过滤出活着的Executors
* 2,新建WorkerOffer对象,调用scheduler.resourceOffers()分配资源到各个Executor上去;
* 3,分配好task到Executor上之后,执行自己的lauchTasks(),将分配的task发送launchTasks信息;
*/
val taskDescs = CoarseGrainedSchedulerBackend.this.synchronized {
// Filter out executors under killing
// 当tasks在正在启动的时候,确保没有Executor被杀掉,过滤掉被杀掉的Executor
val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
// application所有可用的executor,并且封装成workOffer(代表了所有可用的每个Executor可用的CPU资源)
val workOffers = activeExecutors.map {
case (id, executorData) =>
new WorkerOffer(id, executorData.executorHost, executorData.freeCores,
Some(executorData.executorAddress.hostPort))
}.toIndexedSeq
// 调用scheduler.resourceOffers()分配资源到各个Executor上去;
scheduler.resourceOffers(workOffers)
}
if (!taskDescs.isEmpty) {
// 启动 tasks
launchTasks(taskDescs)
}
}
第四步:task分配算法 resourceOffers
计算每一个TaskSetMangaer的本地化级别(locality_level);并且对task set尝试使用最小的本地化级别(locality_level), 将task set的task在executor上启动;如果启动不了,放大本地化级别,以此类推直到某种本地化级别尝试成功。
本地化级别分类:
def resourceOffers(offers: IndexedSeq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
// Mark each slave as alive and remember its hostname
// Also track if new executor is added
var newExecAvail = false
// 遍历有可用资源的Executor
for (o <- offers) {
// 如果没有包含了这个executor的host,初始化一个集合,存放host
if (!hostToExecutors.contains(o.host)) {
hostToExecutors(o.host) = new HashSet[String]()
}
// 如果
if (!executorIdToRunningTaskIds.contains(o.executorId)) {
// 通知DAGScheduler添加Executors
hostToExecutors(o.host) += o.executorId
executorAdded(o.executorId, o.host)
executorIdToHost(o.executorId) = o.host
executorIdToRunningTaskIds(o.executorId) = HashSet[Long]()
newExecAvail = true
}
// 遍历主机所在机架
for (rack <- getRackForHost(o.host)) {
// 更新hosts和机架的映射
hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += o.host
}
}
// Before making any offers, remove any nodes from the blacklist whose blacklist has expired. Do
// this here to avoid a separate thread and added synchronization overhead, and also because
// updating the blacklist is only relevant when task offers are being made.
blacklistTrackerOpt.foreach(_.applyBlacklistTimeout())
val filteredOffers = blacklistTrackerOpt.map { blacklistTracker =>
offers.filter { offer =>
!blacklistTracker.isNodeBlacklisted(offer.host) &&
!blacklistTracker.isExecutorBlacklisted(offer.executorId)
}
}.getOrElse(offers)
//首先,将可用的Executor进行Shuffle,尽可能打散做到负载均衡
val shuffledOffers = shuffleOffers(filteredOffers)
// Build a list of tasks to assign to each worker.
/**
* 构建分配给每个worker的任务列表。
* tasks可以理解为是一个二维数组arrayBuffer,元素又是一个arrayBuffer
* 每个子arrayBuffer的数量是固定的,也就是Executor可用的核数
*/
val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores / CPUS_PER_TASK))
// 有效可用的CPU核数
val availableCpus = shuffledOffers.map(o => o.cores).toArray
val availableSlots = shuffledOffers.map(o => o.cores / CPUS_PER_TASK).sum
/**
* 从rootpool中取出排序之后的TaskSet
* TaskScheduler初始化的时候,创建完TaskSchedulerImpl之后执行的一个initialize()这个方法中会创建一个调度池
* 这里相当于是说,所提交的taskset,首先会放到这个调度池中之后
* 调度task的分配算法的时候,会从这个调度池中,取出排序好队的taskset
*/
val sortedTaskSets = rootPool.getSortedTaskSetQueue
// 如果有新加入的executor,需要重新计算数据本地性
for (taskSet <- sortedTaskSets) {
logDebug("parentName: %s, name: %s, runningTasks: %s".format(
taskSet.parent.name, taskSet.name, taskSet.runningTasks))
if (newExecAvail) {
taskSet.executorAdded()
}
}
// Take each TaskSet in our scheduling order, and then offer it each node in increasing order
// of locality levels so that it gets a chance to launch local tasks on all of them.
// NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY
/**
* 任务分配算法的核心
*
* 双重循环遍历所有的taskset,已经每一种本地化级别
* 本地化级别分类:
* 1. PROCESS_LOCAL : 进程本地化,RDD的Partition与task进入一个Executor内,速度最快
* 2. NODE_LOCAL : 节点本地化,RDD的Partition与task不在一个Executor,即不在一个进程,但是在一个Worker上
* 3. NO_PREF : 无所谓本地化级别
* 4. RACK_LOCAL : 机架本地化,至少RDD的Partition与task在一个机架上
* 5. ANY :任意的本地化级别
*/
for (taskSet <- sortedTaskSets) {
// Skip the barrier taskSet if the available slots are less than the number of pending tasks.
if (taskSet.isBarrier && availableSlots
// TODO SPARK-24819 If the job requires more slots than available (both busy and free
// slots), fail the job on submit.
logInfo(s"Skip current round of resource offers for barrier stage ${taskSet.stageId} " +
s"because the barrier taskSet requires ${taskSet.numTasks} slots, while the total " +
s"number of available slots is $availableSlots.")
} else {
var launchedAnyTask = false
// Record all the executor IDs assigned barrier tasks on.
val addressesWithDescs = ArrayBuffer[(String, TaskDescription)]()
// 对于每一个taskSet,从最好的一个本地化级别 开始遍历
for (currentMaxLocality <- taskSet.myLocalityLevels) {
var launchedTaskAtCurrentMaxLocality = false
do {
/**
* 对于当前的taskSet,尝试优先使用最小的本地化级别,将taskSet的task,在executor上启动
* 如果启动不了,就跳出do while 循环,进入下一种本地化级别,也就是放大本地化级别
* 以此类推,直到尝试将taskSet在某种本地化级别下,将task在executor上全部启动
*/
launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(
taskSet,
currentMaxLocality, shuffledOffers, availableCpus, tasks, addressesWithDescs)
launchedAnyTask |= launchedTaskAtCurrentMaxLocality
} while (launchedTaskAtCurrentMaxLocality)
}
// 如果这个task在任何本地化级别都启动不了,有可能在黑名单
if (!launchedAnyTask) {
taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
}
if (launchedAnyTask && taskSet.isBarrier) {
// Check whether the barrier tasks are partially launched.
// TODO SPARK-24818 handle the assert failure case (that can happen when some locality
// requirements are not fulfilled, and we should revert the launched tasks).
require(
addressesWithDescs.size == taskSet.numTasks,
s"Skip current round of resource offers for barrier stage ${taskSet.stageId} " +
s"because only ${addressesWithDescs.size} out of a total number of " +
s"${taskSet.numTasks} tasks got resource offers. The resource offers may have " +
"been blacklisted or cannot fulfill task locality requirements.")
// materialize the barrier coordinator.
maybeInitBarrierCoordinator()
// Update the taskInfos into all the barrier task properties.
val addressesStr = addressesWithDescs
// Addresses ordered by partitionId
.sortBy(_._2.partitionId)
.map(_._1)
.mkString(",")
addressesWithDescs.foreach(_._2.properties.setProperty("addresses", addressesStr))
logInfo(s"Successfully scheduled all the ${addressesWithDescs.size} tasks for barrier " +
s"stage ${taskSet.stageId}.")
}
}
}
// TODO SPARK-24823 Cancel a job that contains barrier stage(s) if the barrier tasks don't get
// launched within a configured time.
if (tasks.size > 0) {
hasLaunchedTask = true
}
return tasks
}
主要策略如下:
第五步:resourceOfferSingleTaskSet()方法
private def resourceOfferSingleTaskSet(
taskSet: TaskSetManager,
maxLocality: TaskLocality,
shuffledOffers: Seq[WorkerOffer],
availableCpus: Array[Int],
tasks: IndexedSeq[ArrayBuffer[TaskDescription]],
addressesWithDescs: ArrayBuffer[(String, TaskDescription)]): Boolean = {
var launchedTask = false
// nodes and executors that are blacklisted for the entire application have already been
// filtered out by this point
// 遍历所有executor
for (i <- 0 until shuffledOffers.size) {
// 获取executorId和host
val execId = shuffledOffers(i).executorId
val host = shuffledOffers(i).host
// 如果当前executor的cpu数量大于每个task要使用的cpu数量,默认是1
if (availableCpus(i) >= CPUS_PER_TASK) {
try {
//调用resourceOffer方法找到在executor上,哪些TaskSet的task可以通过当前本地化级别启动
// 遍历在该executor上当前本地化级别可以运行的task
for (task <- taskSet.resourceOffer(execId, host, maxLocality)) {
// 如果存在,则把每一个task放入要在当前executor运行的task二维数组里面,即指定executor要运行的task
tasks(i) += task
// 将相应的分配信息加入内存缓存
val tid = task.taskId
taskIdToTaskSetManager.put(tid, taskSet)
taskIdToExecutorId(tid) = execId
executorIdToRunningTaskIds(execId).add(tid)
availableCpus(i) -= CPUS_PER_TASK
assert(availableCpus(i) >= 0)
// Only update hosts for a barrier task.
if (taskSet.isBarrier) {
// The executor address is expected to be non empty.
addressesWithDescs += (shuffledOffers(i).address.get -> task)
}
launchedTask = true
}
} catch {
case e: TaskNotSerializableException =>
logError(s"Resource offer failed, task set ${taskSet.name} was not serializable")
// Do not offer resources for this task, but don't throw an error to allow other
// task sets to be submitted.
return launchedTask
}
}
}
return launchedTask
}
第六步:resourceOffer()方法
/**
* 判断这个executor在这个本地化级别之前的等待时间是多少
* 如果说,本地化级别的等待时间在一定范围内,那么就认为task使用本地化级别可以在executor启动
*/
@throws[TaskNotSerializableException]
def resourceOffer(
execId: String,
host: String,
maxLocality: TaskLocality.TaskLocality)
: Option[TaskDescription] =
{
val offerBlacklisted = taskSetBlacklistHelperOpt.exists { blacklist =>
blacklist.isNodeBlacklistedForTaskSet(host) ||
blacklist.isExecutorBlacklistedForTaskSet(execId)
}
if (!isZombie && !offerBlacklisted) {
val curTime = clock.getTimeMillis()
var allowedLocality = maxLocality //记录本地性 process_local
if (maxLocality != TaskLocality.NO_PREF) {
allowedLocality = getAllowedLocalityLevel(curTime) //寻找当前允许的本地性
if (allowedLocality > maxLocality) {
// 如果允许的本地性低,还是用原来的本地性
// 假设getAllowedLocalityLevel返回的是NODE_LOCAL, 比原来PROCESS低,还是用PROCESS
// We're not allowed to search for farther-away tasks
allowedLocality = maxLocality
}
}
......
}
第七步:getAllowedLocalityLevel()方法
/**
* 重新计算当前时间节点的最高本地性级别,由于存在延迟调度,所以我们需要根据基于等待时间的延迟调度算法来获取当前的本地性
*/
private def getAllowedLocalityLevel(curTime: Long): TaskLocality.TaskLocality = {
// Remove the scheduled or finished tasks lazily
def tasksNeedToBeScheduledFrom(pendingTaskIds: ArrayBuffer[Int]): Boolean = {
var indexOffset = pendingTaskIds.size
while (indexOffset > 0) {
indexOffset -= 1
val index = pendingTaskIds(indexOffset)
if (copiesRunning(index) == 0 && !successful(index)) {
return true
} else {
pendingTaskIds.remove(indexOffset)
}
}
false
}
// Walk through the list of tasks that can be scheduled at each location and returns true
// if there are any tasks that still need to be scheduled. Lazily cleans up tasks that have
// already been scheduled.
def moreTasksToRunIn(pendingTasks: HashMap[String, ArrayBuffer[Int]]): Boolean = {
val emptyKeys = new ArrayBuffer[String]
val hasTasks = pendingTasks.exists {
case (id: String, tasks: ArrayBuffer[Int]) =>
if (tasksNeedToBeScheduledFrom(tasks)) {
true
} else {
emptyKeys += id
false
}
}
// The key could be executorId, host or rackId
emptyKeys.foreach(id => pendingTasks.remove(id))
hasTasks
}
while (currentLocalityIndex
case TaskLocality.PROCESS_LOCAL => moreTasksToRunIn(pendingTasksForExecutor)
case TaskLocality.NODE_LOCAL => moreTasksToRunIn(pendingTasksForHost)
case TaskLocality.NO_PREF => pendingTasksWithNoPrefs.nonEmpty
case TaskLocality.RACK_LOCAL => moreTasksToRunIn(pendingTasksForRack)
}
if (!moreTasks) {
// This is a performance optimization: if there are no more tasks that can
// be scheduled at a particular locality level, there is no point in waiting
// for the locality wait timeout (SPARK-4939).
lastLaunchTime = curTime
logDebug(s"No tasks for locality level ${myLocalityLevels(currentLocalityIndex)}, " +
s"so moving to locality level ${myLocalityLevels(currentLocalityIndex + 1)}")
currentLocalityIndex += 1
} else if (curTime - lastLaunchTime >= localityWaits(currentLocalityIndex)) {
// Jump to the next locality level, and reset lastLaunchTime so that the next locality
// wait timer doesn't immediately expire
lastLaunchTime += localityWaits(currentLocalityIndex)
logDebug(s"Moving to ${myLocalityLevels(currentLocalityIndex + 1)} after waiting for " +
s"${localityWaits(currentLocalityIndex)}ms")
currentLocalityIndex += 1
} else {
return myLocalityLevels(currentLocalityIndex)
}
}
myLocalityLevels(currentLocalityIndex)
}
第八步:dequeueTask()方法
/**
* 根据不同的Task的本地性级别进行不同的处理
*/
private def dequeueTask(execId: String, host: String, maxLocality: TaskLocality.Value)
: Option[(Int, TaskLocality.Value, Boolean)] =
{
// dequeueTaskFromList()方法:从给定的列表中取消一个挂起的任务并返回它的索引。如果列表为空,则返回None。
// PROCESS_LOCAL: 数据在同一个 JVM 中,即同一个 executor 上。这是最佳数据 locality。
for (index <- dequeueTaskFromList(execId, host, getPendingTasksForExecutor(execId))) {
return Some((index, TaskLocality.PROCESS_LOCAL, false))
}
// NODE_LOCAL: 数据在同一个节点上。比如数据在同一个节点的另一个 executor上;或在 HDFS 上,
// 恰好有 block 在同一个节点上。速度比 PROCESS_LOCAL 稍慢,因为数据需要在不同进程之间传递或从文件中读取
if (TaskLocality.isAllowed(maxLocality, TaskLocality.NODE_LOCAL)) {
for (index <- dequeueTaskFromList(execId, host, getPendingTasksForHost(host))) {
return Some((index, TaskLocality.NODE_LOCAL, false))
}
}
// NO_PREF: 数据从哪里访问都一样快,不需要位置优先
if (TaskLocality.isAllowed(maxLocality, TaskLocality.NO_PREF)) {
// Look for noPref tasks after NODE_LOCAL for minimize cross-rack traffic
for (index <- dequeueTaskFromList(execId, host, pendingTasksWithNoPrefs)) {
return Some((index, TaskLocality.PROCESS_LOCAL, false))
}
}
// RACK_LOCAL: 数据在同一机架的不同节点上。需要通过网络传输数据及文件 IO,比 NODE_LOCAL 慢
if (TaskLocality.isAllowed(maxLocality, TaskLocality.RACK_LOCAL)) {
for {
rack <- sched.getRackForHost(host)
index <- dequeueTaskFromList(execId, host, getPendingTasksForRack(rack))
} {
return Some((index, TaskLocality.RACK_LOCAL, false))
}
}
// ANY: 数据在非同一机架的网络上,速度最慢
if (TaskLocality.isAllowed(maxLocality, TaskLocality.ANY)) {
for (index <- dequeueTaskFromList(execId, host, allPendingTasks)) {
return Some((index, TaskLocality.ANY, false))
}
}
// 如果所有其他任务都安排好了,就去找一个推测的任务。
// find a speculative task if all others tasks have been scheduled
dequeueSpeculativeTask(execId, host, maxLocality).map {
case (taskIndex, allowedLocality) => (taskIndex, allowedLocality, true)}
}
第九步:提交task到相应的executor上 launchTasks
/**
* 根据分配好的情况,去executor上启动相应的task
*/
private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
for (task <- tasks.flatten) {
//首先将每一个要执行的task信息,统一进行序列化
val serializedTask = TaskDescription.encode(task)
if (serializedTask.limit() >= maxRpcMessageSize) {
Option(scheduler.taskIdToTaskSetManager.get(task.taskId)).foreach { taskSetMgr =>
try {
var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " +
"spark.rpc.message.maxSize (%d bytes). Consider increasing " +
"spark.rpc.message.maxSize or using broadcast variables for large values."
msg = msg.format(task.taskId, task.index, serializedTask.limit(), maxRpcMessageSize)
taskSetMgr.abort(msg)
} catch {
case e: Exception => logError("Exception in error callback", e)
}
}
} else {
//找到对应的executor
val executorData = executorDataMap(task.executorId)
//将executor的资源减去使用的cpu资源
executorData.freeCores -= scheduler.CPUS_PER_TASK
logDebug(s"Launching task ${task.taskId} on executor id: ${task.executorId} hostname: " +
s"${executorData.executorHost}.")
//向executor上发LaunchTask的信息,在executor上启动task
executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
}
}
}
先将task进行序列化, 如果当前task序列化后的大小超过了128MB-200KB,跳过当前task,并把对应的taskSetManager置为zombie模式,若大小不超过限制,则发送消息到executor启动task执行