热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

35Spark系统运行循环流程

本节课内容:1.TaskScheduler工作原理2.TaskScheduler源码一、TaskScheduler工作原理总体调度图:通过前几节课的讲

本节课内容:

1.     TaskScheduler工作原理

2.     TaskScheduler源码

 

一、TaskScheduler工作原理

       总体调度图:


       通过前几节课的讲解,RDD和DAGScheduler以及Worker都已有深入的讲解,这节课我们主要讲解TaskScheduler的运行原理。

       回顾:

       DAGScheduler面向整个Job划分多个Stage,划分是从后往前的回溯过程;运行时从前往后运行的。每个Stage中有很多任务Task,Task是可以并行执行的。它们的执行逻辑完全相同的,只不过是处理的数据不同而已,DAGScheduler通过TaskSet的方式,把其构造的所有Task提交给底层调度器TaskScheduler。

TaskScheduler是一个trait,与具体的资源调度解耦合,这符合面向对象中依赖抽象不依赖具体的原则,带来底层资源调度器的可插拔性,导致Spark可以运行的众多的资源调度模式上,例如:StandAlone、Yarn、Mesos、Local、EC2或者其他自定义的资源调度器。

在StandAlone模式下,我们来看看TaskScheduler的一个实现TaskSchedulerImpl。

1.TaskScheduler的核心任务

TaskScheduler的核心任务是提交TaskSet到集群并汇报结果,主要负责Application的不同Job之间的调度。

具体来讲有以下几点:

(1)    为TaskSet创建和维护一个TaskSetManager并追踪任务的本地性以及错误信息;

(2)    Task执行失败时启动重试机制,以及遇到Straggle任务会在其他节点上启动备份任务;

(3)    向DAGScheduler汇报执行情况,包括在shuffle输出丢失的时候报告fetch failed错误等信息。

2.TaskScheduler的核心功能

       (1)注册当前程序。

TaskScheduler内部会握有SchedulerBackend引用,SchedulerBackend是一个trait,它主要负责管理Executor资源,从StandAlone模式来讲,具体实现是SparkDeploySchedulerBackend。SparkDeploySchedulerBackend在启动时会构造AppClient实例并在该实例start的时候启动ClientEndpoint(消息循环体),ClientEndpoint在启动时会向Master注册当前程序。

(1)   注册Executor信息。

SparkDeploySchedulerBackend的父类CoarseGrainedSchedulerBackend会在Start的时候实例化一个类型DriverEndpoint的消息循环体。DriverEndpoint就是我们程序运行时的Driver对象。SparkDeploySchedulerBackend是专门给来负责收集Worker上的资源信息,当ExecutorBackend启动的时候会发送RegisteredExecutor信息向Driver中的DriverBackend进行注册。(可以参考前几讲的Master注册部分。)此时SparkDeploySchedulerBackend就掌握了当前应用应用程序所拥有的计算资源,TaskScheduler就是通过SparkDeploySchedulerBackend拥有的计算资源来具体运行Task。

补充:SparkContext、DAGScheduler、TaskSchedulerImpl、SparkDeploySchedulerBackend在应用程序启动的时候值实例化一次,应用程序存在期间始终存在这些对象。SparkDeploySchedulerBackend是一个辅助类,主要是帮助TaskSchedulerImpl中的Task获取计算资源和发送Task到集群中

3.TaskScheduler的实例化时机

       TaskScheduler是在SparkContext实例化时进行实例化的,如TaskSchedulerImpl的实例化。(Spark1.6.0  SparkContext.scala  #521-#526)

// Create and start the scheduler
val (sched, ts) = SparkContext.createTaskScheduler(this, master)
_schedulerBackend = sched
_taskScheduler = ts
_dagScheduler = new DAGScheduler(this)
_heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)

在SparkContext#createTaskScheduler方法中会创建TaskScheduler和SparkDeploySchedulerBackend:
private def createTaskScheduler(
sc: SparkContext,
master: String): (SchedulerBackend, TaskScheduler) = {
import SparkMasterRegex._
//省略部分代码
case SPARK_REGEX(sparkUrl) =>
val scheduler = new TaskSchedulerImpl(sc)
val masterUrls = sparkUrl.split(",").map("spark://" + _)
val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls)
//利用SparkDeploySchedulerBackend来初始化TaskScheduler
scheduler.initialize(backend) //1
(backend, scheduler)
}

4.TaskScheduler初始化

//1被处调用
def initialize(backend: SchedulerBackend) {
this.backend = backend
// temporarily set rootPool name to empty
rootPool = new Pool("", schedulingMode, 0, 0) //2
//根据rootPool中的算法创建可调度对象
schedulableBuilder = {
schedulingMode match {
case SchedulingMode.FIFO =>
//FIFO模式
new FIFOSchedulableBuilder(rootPool)
case SchedulingMode.FAIR =>
//Fair模式
new FairSchedulableBuilder(rootPool, conf)
}
}
//创建调度池
schedulableBuilder.buildPools()
}


创建调度池

       (1)创建rootPool(实现调度算法)
//2处被调用
private[spark] class Pool(
val poolName: String,
val schedulingMode: SchedulingMode,
initMinShare: Int,
initWeight: Int)
extends Schedulable
with Logging {

val schedulableQueue = new ConcurrentLinkedQueue[Schedulable]
val schedulableNameToSchedulable = new ConcurrentHashMap[String, Schedulable]
var weight = initWeight
var minShare = initMinShare
var runningTasks = 0
var priority = 0

// A pool's stage id is used to break the tie in scheduling.
var stageId = -1
var name = poolName
var parent: Pool = null
//根据不同的调度算法创建调度算法的实例
var taskSetSchedulingAlgorithm: SchedulingAlgorithm = {
schedulingMode match {
case SchedulingMode.FAIR =>
new FairSchedulingAlgorithm()
case SchedulingMode.FIFO =>
new FIFOSchedulingAlgorithm()
}
}

(2)创建可调度对象

Org.apache.spark.scheduler.Pool包含了一组可以调度的实体。对于FIFO来说,rootPool包含了一组TaskSetManager;而对于FAIR来说,rootPool包含了一组Pool,这些Pool构成了一颗调度树,其中这棵树的叶子节点就是TaskSetManager。

(3)创建调度池

schedulableBuilder.buildPools()因调度方式而异,如果是FIFO,它的实现是空的如下:

private[spark] class FIFOSchedulableBuilder(val rootPool: Pool)
extends SchedulableBuilder with Logging {
override def buildPools() {
// nothing
}
//定义了如何将TaskSetManager加入到调度池中
override def addTaskSetManager(manager: Schedulable, properties: Properties) {
rootPool.addSchedulable(manager) //3
}
}
因为rootPool并没有包含Pool,而是直接包含TaskSetManager:submitTasks直接将TaskSetManager添加到rootPool(调度队列,队列默认是先入先出)即可
 //将可调度对象加入到调度队列  3处被调用
override def addSchedulable(schedulable: Schedulable) {
require(schedulable != null)
schedulableQueue.add(schedulable)
schedulableNameToSchedulable.put(schedulable.name, schedulable)
schedulable.parent = this
}

       而FAIR模式则需要在运行前先进行一定的配置。它需要在rootPool的基础上根据这个配置文件来构建这颗调度树。

具体实现见如下代码:
override def buildPools() {
var is: Option[InputStream] = None
try {
is = Option {
//以spark.scheduler.allocation.file设置的文件名字来创建FileInputStream
schedulerAllocFile.map { f =>
new FileInputStream(f)
}.getOrElse {
//若spark.Scheduler.allocation.file没有设置,则直接以fairscheduler.xml创建
//FileInputStream
Utils.getSparkClassLoader.getResourceAsStream(DEFAULT_SCHEDULER_FILE)
}
}
//以is对应的内容创建Pool
is.foreach { i => buildFairSchedulerPool(i) }
} finally {
is.foreach(_.close())
}
创建名为“default”的Pool
buildDefaultPool()
}

(4)调度算法

private[spark] traitSchedulingAlgorithm {
def comparator(s1: Schedulable, s2:Schedulable): Boolean
}

从代码来看调度算法是一个trait,需要子类实现。其实质就是封装了一个比较函数。子类只需实现这个比较函数即可。

(a)FIFO

采用FIFO任务调度的顺序:

首先要保证JobID较小的先被调度,如果是同一个Job,那么StageID小的先被调度(同一个Job,可能多个Stage可以并行执行,比如Stage划分过程中Stage0和Stage1)。


调度算法:

private[spark] class FIFOSchedulingAlgorithm extends SchedulingAlgorithm {
//比较可调度对象s1与s2,这里s1与s2其实就是TaskSetManager。
override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {
val priority1 = s1.priority //这个priority实际上就是Job ID
val priority2 = s2.priority //同上
var res = math.signum(priority1 - priority2) //首先比较Job ID
if (res == 0) { //如果Job ID相同,那么比较Stage ID
val stageId1 = s1.stageId
val stageId2 = s2.stageId
res = math.signum(stageId1 - stageId2)
}
if (res <0) {
true
} else {
false
}
}
}
(2)FAIR

对于FAIR,首先是挂在rootPool下面的pool先确定调度顺序,然后在每个pool内部使用相同的算法来确定TaskSetManager的调度顺序。


算法实现:

private[spark] class FairSchedulingAlgorithm extends SchedulingAlgorithm {
override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {
//最小共享,可以理解为执行需要的最小资源即CPU核数,其他相同时,所需最小核数小的优先
//调度
val minShare1 = s1.minShare
val minShare2 = s2.minShare
//运行的任务的数量
val runningTasks1 = s1.runningTasks
val runningTasks2 = s2.runningTasks
//查看是否有调度队列处于饥饿状态,看可分配的核数是否少于任务数,如果资源不够用,那么
//处于挨饿状态
val s1Needy = runningTasks1 val s2Needy = runningTasks2 //计算ShareRatio, 最小资源占用比例,这里可以理解为偏向任务较轻的
val minShareRatio1 = runningTasks1.toDouble / math.max(minShare1, 1.0).toDouble
val minShareRatio2 = runningTasks2.toDouble / math.max(minShare2, 1.0).toDouble
//计算Task的Weight比重即权重,任务数相同,权重高的优先
val taskToWeightRatio1 = runningTasks1.toDouble / s1.weight.toDouble
val taskToWeightRatio2 = runningTasks2.toDouble / s2.weight.toDouble
var compare: Int = 0
//首先处于饥饿优先
if (s1Needy && !s2Needy) {
return true
} else if (!s1Needy && s2Needy) {
return false
} else if (s1Needy && s2Needy) {
//都处于挨饿状态则,需要资源占用比小的优先
compare = minShareRatio1.compareTo(minShareRatio2)
} else {
//都不挨饿,则比较权重比,比例低的优先
compare = taskToWeightRatio1.compareTo(taskToWeightRatio2)
}

if (compare <0) {
true
} else if (compare > 0) {
false
} else {
//如果都一样,那么比较名字,按照字母顺序比较,所以名字比较重要
s1.name }
}
}

注:

公平原则本着的原则就是谁最需要就给谁,所以挨饿者优先;

资源占用比这块有点费解,如果把他理解成一个贪心问题就容易理解了。对于都是出于挨饿状态的任务可以这样理解,负载大的即时给你资源你也不一定能有效缓解,莫不如给负载小的,让其快速使用,完成后可以释放更多的资源,这是一种贪心策略。如JobA和JobB的Task数量相同都是10,A的minShare是2,B的是5,那占用比为5和2,显然B的占用比更小,贪心的策略应该给B先调度处理;

对于都处于满足状态的,当然谁的权重有着更好的决定性,权重比低得优先(偏向权利大的);

如果所有上述的比较都相同,那么名字字典排序靠前的优先(哈哈,名字很重要哦);名字aaa要比abc优先,所以这里在给Pool或者TaskSetManager起名字的时候要考虑这一点

 (备注来源:https://yq.aliyun.com/articles/6041)

 补充:这两种调度的排序算法针对的可比较对象都是Schedule的具体对象,这里我们对这个对象Schedulable做简单的解释。

       前面讲到,Schedulable可调度对象在Spark有两种形式:Pool和TaskSetManager。Pool是一个调度池,Pool里面还可以有子Pool,Spark中的rootPool即根节点默认是一个无名(default)的Pool。对于FIFO和FAIR有不同的层次。

       对于FIFO模式的调度,rootPool管理的直接就是TaskSetManager,没有子Pool这个概念,就只有两层,rootPool和叶子节点TaskSetManager,实现如下所示。


但对于FAIR这种模式来说,是三层的,根节点是rootPool,为无名Pool,下一层为用户定义的Pool(不指定名称默认名称为default),再下一层才是TaskSetManager,即根调度池管理一组调度池,每个调度池管理自己的TaskSetManager,其实现如下所示。



这里的调度顺序是指在一个SparkContext之内的调度,一般情况下我们自行使用是不太会需要Pool这个概念的,因为不存在Pool之间的竞争,但如果我们提供一个Spark应用,大家都可以提交任务,服务端有一个常驻的任务,对应一个SparkContext,每个用户提交的任务都由其代理执行,那么针对每个用户提交的任务可以按照用户等级和任务优先级设置一个Pool,这样不同的用户的Pool之间就存在竞争关系了,可以用Pool的优先级来区分任务和用户的优先级了,**但要再强调一点名字很重要,因为FAIR机制中,如果其他比较无法判断,那么会按照名字来进行字典排序的**。(补充来源:https://yq.aliyun.com/articles/6041)

5.创建DAGScheduler,调用TaskScheduler#start方法(SparkContext初始化过程中)

//SparkContext.scala
525) _dagScheduler = new DAGScheduler(this)
526) _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)
527)
528) // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
529) // constructor
530) _taskScheduler.start()


//TaskSchedulerImpl.scala
override def start() {
//启动SparkDeploySchedulerBackend
backend.start()

if (!isLocal && conf.getBoolean("spark.speculation", false)) {
logInfo("Starting speculative execution thread")
speculationScheduler.scheduleAtFixedRate(new Runnable {
override def run(): Unit = Utils.tryOrStopSparkContext(sc) {
checkSpeculatableTasks()
}
}, SPECULATION_INTERVAL_MS, SPECULATION_INTERVAL_MS, TimeUnit.MILLISECONDS)
}
}


6.启动Executor

override def start() {
super.start()
launcherBackend.connect()

// The endpoint for executors to talk to us
val driverUrl = rpcEnv.uriOf(SparkEnv.driverActorSystemName,
RpcAddress(sc.conf.get("spark.driver.host"), sc.conf.get("spark.driver.port").toInt),
CoarseGrainedSchedulerBackend.ENDPOINT_NAME)
val args = Seq(
"--driver-url", driverUrl,
"--executor-id", "{{EXECUTOR_ID}}",
"--hostname", "{{HOSTNAME}}",
"--cores", "{{CORES}}",
"--app-id", "{{APP_ID}}",
"--worker-url", "{{WORKER_URL}}")
val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions")
.map(Utils.splitCommandString).getOrElse(Seq.empty)
val classPathEntries = sc.conf.getOption("spark.executor.extraClassPath")
.map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil)
val libraryPathEntries = sc.conf.getOption("spark.executor.extraLibraryPath")
// Start executors with a few necessary configs for registering with the scheduler
val sparkJavaOpts = Utils.sparkJavaOpts(conf, SparkConf.isExecutorStartupConf)
val javaOpts = sparkJavaOpts ++ extraJavaOpts
//定义分配资源的进程名称
val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend",
args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts)
.map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil)
//省略部分代码,详细内容参见前几节,Executor注册过程。
client = new AppClient(sc.env.rpcEnv, masters, appDesc, this, conf)
//注册应用程序
client.start()
launcherBackend.setState(SparkAppHandle.State.SUBMITTED)
waitForRegistration()
launcherBackend.setState(SparkAppHandle.State.RUNNING)
}
在client#start方法中最终会注册应用程序。

二、总结

在SparkContext实例化的时候调用createTaskScheduler来创建TaskSchedulerImpl和SparkDeploySchedulerBackend,同时在SparkContext实例化的时候会调用TaskSchedulerImpl#start方法,在该方法中会调用SparkDeploySchedulerBackend#start;在这个start方法中会创建AppClient对象并调用AppClient#start方法,这时会创建ClientEndpoint,在创建ClientEndpoint时会传入来指定具体为当前应用程序启动的Executor进程的入口类的名称为CoarseGrainedExecutorBackend,然后ClientEndpoint启动并通过tryRegisterMaster来注册当前的应用程序到Master中,Master接收到注册信息后,如果可以运行程序,则会为该程序生成JobID并通过Schedule来分配计算资源,具体计算资源分配是通过应用程序运行方式、Memory、core等配置信息来决定的,最后Master会发送指令给Worker;Worker中为当前应用程序分配计算资源时,首先分配ExecutorRunner,ExecutorRunner内部会通过Thread的方式构建ProcessBuilder来启动另一个JVM进程,这个JVM进程启动时候会加载的main方法所在的类的名称就是创建ClientEndpoint传入的Command指定的入口类CoarseGrainedExecutorBackend,此时JVM在通过ProcessBuilder启动的时候获得了CoarseGrainedExecutorBackend后,加载并调用其中的main方法。在main方法中会实例化CoarseGrainedExecutorBackend本身这个消息循环体,而其实例化时会通过回调onStart向DriverEndpoint发送RegisterExecutor来注册当前的CoarseGrainedExecutorBackend,此时DriverEndpoint收到该注册信息并保存在了SparkDeployScheduler实例的内存数据结构中,这样Driver就获得了计算资源!


Task运行各个阶段交互过程





                                                                   图35-6 资源分配过程





备注:有关FIFO、FAIR调度算法解析部分参考 张安站 --Spark技术内幕一书


-----------------------------------------EOF---------------------------------------------------------------------------------------------------------




推荐阅读
  • 本文介绍了在处理不规则数据时如何使用Python自动提取文本中的时间日期,包括使用dateutil.parser模块统一日期字符串格式和使用datefinder模块提取日期。同时,还介绍了一段使用正则表达式的代码,可以支持中文日期和一些特殊的时间识别,例如'2012年12月12日'、'3小时前'、'在2012/12/13哈哈'等。 ... [详细]
  • 本文由编程笔记#小编为大家整理,主要介绍了logistic回归(线性和非线性)相关的知识,包括线性logistic回归的代码和数据集的分布情况。希望对你有一定的参考价值。 ... [详细]
  • 《数据结构》学习笔记3——串匹配算法性能评估
    本文主要讨论串匹配算法的性能评估,包括模式匹配、字符种类数量、算法复杂度等内容。通过借助C++中的头文件和库,可以实现对串的匹配操作。其中蛮力算法的复杂度为O(m*n),通过随机取出长度为m的子串作为模式P,在文本T中进行匹配,统计平均复杂度。对于成功和失败的匹配分别进行测试,分析其平均复杂度。详情请参考相关学习资源。 ... [详细]
  • 个人学习使用:谨慎参考1Client类importcom.thoughtworks.gauge.Step;importcom.thoughtworks.gauge.T ... [详细]
  • [大整数乘法] java代码实现
    本文介绍了使用java代码实现大整数乘法的过程,同时也涉及到大整数加法和大整数减法的计算方法。通过分治算法来提高计算效率,并对算法的时间复杂度进行了研究。详细代码实现请参考文章链接。 ... [详细]
  • Android开发实现的计时器功能示例
    本文分享了Android开发实现的计时器功能示例,包括效果图、布局和按钮的使用。通过使用Chronometer控件,可以实现计时器功能。该示例适用于Android平台,供开发者参考。 ... [详细]
  • Python爬虫中使用正则表达式的方法和注意事项
    本文介绍了在Python爬虫中使用正则表达式的方法和注意事项。首先解释了爬虫的四个主要步骤,并强调了正则表达式在数据处理中的重要性。然后详细介绍了正则表达式的概念和用法,包括检索、替换和过滤文本的功能。同时提到了re模块是Python内置的用于处理正则表达式的模块,并给出了使用正则表达式时需要注意的特殊字符转义和原始字符串的用法。通过本文的学习,读者可以掌握在Python爬虫中使用正则表达式的技巧和方法。 ... [详细]
  • 本文介绍了使用Spark实现低配版高斯朴素贝叶斯模型的原因和原理。随着数据量的增大,单机上运行高斯朴素贝叶斯模型会变得很慢,因此考虑使用Spark来加速运行。然而,Spark的MLlib并没有实现高斯朴素贝叶斯模型,因此需要自己动手实现。文章还介绍了朴素贝叶斯的原理和公式,并对具有多个特征和类别的模型进行了讨论。最后,作者总结了实现低配版高斯朴素贝叶斯模型的步骤。 ... [详细]
  • 本文详细介绍了Python中正则表达式和re模块的使用方法。首先解释了转义符的作用,以及如何在字符串中包含特殊字符。然后介绍了re模块的功能和常用方法。通过学习本文,读者可以掌握正则表达式的基本概念和使用技巧,进一步提高Python编程能力。 ... [详细]
  • 超级简单加解密工具的方案和功能
    本文介绍了一个超级简单的加解密工具的方案和功能。该工具可以读取文件头,并根据特定长度进行加密,加密后将加密部分写入源文件。同时,该工具也支持解密操作。加密和解密过程是可逆的。本文还提到了一些相关的功能和使用方法,并给出了Python代码示例。 ... [详细]
  • HashMap的扩容知识详解
    本文详细介绍了HashMap的扩容知识,包括扩容的概述、扩容条件以及1.7版本中的扩容方法。通过学习本文,读者可以全面了解HashMap的扩容机制,提升对HashMap的理解和应用能力。 ... [详细]
  • 如何优化Webpack打包后的代码分割
    本文介绍了如何通过优化Webpack的代码分割来减小打包后的文件大小。主要包括拆分业务逻辑代码和引入第三方包的代码、配置Webpack插件、异步代码的处理、代码分割重命名、配置vendors和cacheGroups等方面的内容。通过合理配置和优化,可以有效减小打包后的文件大小,提高应用的加载速度。 ... [详细]
  • 1,关于死锁的理解死锁,我们可以简单的理解为是两个线程同时使用同一资源,两个线程又得不到相应的资源而造成永无相互等待的情况。 2,模拟死锁背景介绍:我们创建一个朋友 ... [详细]
  • 展开全部下面的代码是创建一个立方体Thisexamplescreatesanddisplaysasimplebox.#Thefirstlineloadstheinit_disp ... [详细]
  • 在重复造轮子的情况下用ProxyServlet反向代理来减少工作量
    像不少公司内部不同团队都会自己研发自己工具产品,当各个产品逐渐成熟,到达了一定的发展瓶颈,同时每个产品都有着自己的入口,用户 ... [详细]
author-avatar
beat_小然
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有