热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

35Spark系统运行循环流程

本节课内容:1.TaskScheduler工作原理2.TaskScheduler源码一、TaskScheduler工作原理总体调度图:通过前几节课的讲

本节课内容:

1.     TaskScheduler工作原理

2.     TaskScheduler源码

 

一、TaskScheduler工作原理

       总体调度图:


       通过前几节课的讲解,RDD和DAGScheduler以及Worker都已有深入的讲解,这节课我们主要讲解TaskScheduler的运行原理。

       回顾:

       DAGScheduler面向整个Job划分多个Stage,划分是从后往前的回溯过程;运行时从前往后运行的。每个Stage中有很多任务Task,Task是可以并行执行的。它们的执行逻辑完全相同的,只不过是处理的数据不同而已,DAGScheduler通过TaskSet的方式,把其构造的所有Task提交给底层调度器TaskScheduler。

TaskScheduler是一个trait,与具体的资源调度解耦合,这符合面向对象中依赖抽象不依赖具体的原则,带来底层资源调度器的可插拔性,导致Spark可以运行的众多的资源调度模式上,例如:StandAlone、Yarn、Mesos、Local、EC2或者其他自定义的资源调度器。

在StandAlone模式下,我们来看看TaskScheduler的一个实现TaskSchedulerImpl。

1.TaskScheduler的核心任务

TaskScheduler的核心任务是提交TaskSet到集群并汇报结果,主要负责Application的不同Job之间的调度。

具体来讲有以下几点:

(1)    为TaskSet创建和维护一个TaskSetManager并追踪任务的本地性以及错误信息;

(2)    Task执行失败时启动重试机制,以及遇到Straggle任务会在其他节点上启动备份任务;

(3)    向DAGScheduler汇报执行情况,包括在shuffle输出丢失的时候报告fetch failed错误等信息。

2.TaskScheduler的核心功能

       (1)注册当前程序。

TaskScheduler内部会握有SchedulerBackend引用,SchedulerBackend是一个trait,它主要负责管理Executor资源,从StandAlone模式来讲,具体实现是SparkDeploySchedulerBackend。SparkDeploySchedulerBackend在启动时会构造AppClient实例并在该实例start的时候启动ClientEndpoint(消息循环体),ClientEndpoint在启动时会向Master注册当前程序。

(1)   注册Executor信息。

SparkDeploySchedulerBackend的父类CoarseGrainedSchedulerBackend会在Start的时候实例化一个类型DriverEndpoint的消息循环体。DriverEndpoint就是我们程序运行时的Driver对象。SparkDeploySchedulerBackend是专门给来负责收集Worker上的资源信息,当ExecutorBackend启动的时候会发送RegisteredExecutor信息向Driver中的DriverBackend进行注册。(可以参考前几讲的Master注册部分。)此时SparkDeploySchedulerBackend就掌握了当前应用应用程序所拥有的计算资源,TaskScheduler就是通过SparkDeploySchedulerBackend拥有的计算资源来具体运行Task。

补充:SparkContext、DAGScheduler、TaskSchedulerImpl、SparkDeploySchedulerBackend在应用程序启动的时候值实例化一次,应用程序存在期间始终存在这些对象。SparkDeploySchedulerBackend是一个辅助类,主要是帮助TaskSchedulerImpl中的Task获取计算资源和发送Task到集群中

3.TaskScheduler的实例化时机

       TaskScheduler是在SparkContext实例化时进行实例化的,如TaskSchedulerImpl的实例化。(Spark1.6.0  SparkContext.scala  #521-#526)

// Create and start the scheduler
val (sched, ts) = SparkContext.createTaskScheduler(this, master)
_schedulerBackend = sched
_taskScheduler = ts
_dagScheduler = new DAGScheduler(this)
_heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)

在SparkContext#createTaskScheduler方法中会创建TaskScheduler和SparkDeploySchedulerBackend:
private def createTaskScheduler(
sc: SparkContext,
master: String): (SchedulerBackend, TaskScheduler) = {
import SparkMasterRegex._
//省略部分代码
case SPARK_REGEX(sparkUrl) =>
val scheduler = new TaskSchedulerImpl(sc)
val masterUrls = sparkUrl.split(",").map("spark://" + _)
val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls)
//利用SparkDeploySchedulerBackend来初始化TaskScheduler
scheduler.initialize(backend) //1
(backend, scheduler)
}

4.TaskScheduler初始化

//1被处调用
def initialize(backend: SchedulerBackend) {
this.backend = backend
// temporarily set rootPool name to empty
rootPool = new Pool("", schedulingMode, 0, 0) //2
//根据rootPool中的算法创建可调度对象
schedulableBuilder = {
schedulingMode match {
case SchedulingMode.FIFO =>
//FIFO模式
new FIFOSchedulableBuilder(rootPool)
case SchedulingMode.FAIR =>
//Fair模式
new FairSchedulableBuilder(rootPool, conf)
}
}
//创建调度池
schedulableBuilder.buildPools()
}


创建调度池

       (1)创建rootPool(实现调度算法)
//2处被调用
private[spark] class Pool(
val poolName: String,
val schedulingMode: SchedulingMode,
initMinShare: Int,
initWeight: Int)
extends Schedulable
with Logging {

val schedulableQueue = new ConcurrentLinkedQueue[Schedulable]
val schedulableNameToSchedulable = new ConcurrentHashMap[String, Schedulable]
var weight = initWeight
var minShare = initMinShare
var runningTasks = 0
var priority = 0

// A pool's stage id is used to break the tie in scheduling.
var stageId = -1
var name = poolName
var parent: Pool = null
//根据不同的调度算法创建调度算法的实例
var taskSetSchedulingAlgorithm: SchedulingAlgorithm = {
schedulingMode match {
case SchedulingMode.FAIR =>
new FairSchedulingAlgorithm()
case SchedulingMode.FIFO =>
new FIFOSchedulingAlgorithm()
}
}

(2)创建可调度对象

Org.apache.spark.scheduler.Pool包含了一组可以调度的实体。对于FIFO来说,rootPool包含了一组TaskSetManager;而对于FAIR来说,rootPool包含了一组Pool,这些Pool构成了一颗调度树,其中这棵树的叶子节点就是TaskSetManager。

(3)创建调度池

schedulableBuilder.buildPools()因调度方式而异,如果是FIFO,它的实现是空的如下:

private[spark] class FIFOSchedulableBuilder(val rootPool: Pool)
extends SchedulableBuilder with Logging {
override def buildPools() {
// nothing
}
//定义了如何将TaskSetManager加入到调度池中
override def addTaskSetManager(manager: Schedulable, properties: Properties) {
rootPool.addSchedulable(manager) //3
}
}
因为rootPool并没有包含Pool,而是直接包含TaskSetManager:submitTasks直接将TaskSetManager添加到rootPool(调度队列,队列默认是先入先出)即可
 //将可调度对象加入到调度队列  3处被调用
override def addSchedulable(schedulable: Schedulable) {
require(schedulable != null)
schedulableQueue.add(schedulable)
schedulableNameToSchedulable.put(schedulable.name, schedulable)
schedulable.parent = this
}

       而FAIR模式则需要在运行前先进行一定的配置。它需要在rootPool的基础上根据这个配置文件来构建这颗调度树。

具体实现见如下代码:
override def buildPools() {
var is: Option[InputStream] = None
try {
is = Option {
//以spark.scheduler.allocation.file设置的文件名字来创建FileInputStream
schedulerAllocFile.map { f =>
new FileInputStream(f)
}.getOrElse {
//若spark.Scheduler.allocation.file没有设置,则直接以fairscheduler.xml创建
//FileInputStream
Utils.getSparkClassLoader.getResourceAsStream(DEFAULT_SCHEDULER_FILE)
}
}
//以is对应的内容创建Pool
is.foreach { i => buildFairSchedulerPool(i) }
} finally {
is.foreach(_.close())
}
创建名为“default”的Pool
buildDefaultPool()
}

(4)调度算法

private[spark] traitSchedulingAlgorithm {
def comparator(s1: Schedulable, s2:Schedulable): Boolean
}

从代码来看调度算法是一个trait,需要子类实现。其实质就是封装了一个比较函数。子类只需实现这个比较函数即可。

(a)FIFO

采用FIFO任务调度的顺序:

首先要保证JobID较小的先被调度,如果是同一个Job,那么StageID小的先被调度(同一个Job,可能多个Stage可以并行执行,比如Stage划分过程中Stage0和Stage1)。


调度算法:

private[spark] class FIFOSchedulingAlgorithm extends SchedulingAlgorithm {
//比较可调度对象s1与s2,这里s1与s2其实就是TaskSetManager。
override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {
val priority1 = s1.priority //这个priority实际上就是Job ID
val priority2 = s2.priority //同上
var res = math.signum(priority1 - priority2) //首先比较Job ID
if (res == 0) { //如果Job ID相同,那么比较Stage ID
val stageId1 = s1.stageId
val stageId2 = s2.stageId
res = math.signum(stageId1 - stageId2)
}
if (res <0) {
true
} else {
false
}
}
}
(2)FAIR

对于FAIR,首先是挂在rootPool下面的pool先确定调度顺序,然后在每个pool内部使用相同的算法来确定TaskSetManager的调度顺序。


算法实现:

private[spark] class FairSchedulingAlgorithm extends SchedulingAlgorithm {
override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {
//最小共享,可以理解为执行需要的最小资源即CPU核数,其他相同时,所需最小核数小的优先
//调度
val minShare1 = s1.minShare
val minShare2 = s2.minShare
//运行的任务的数量
val runningTasks1 = s1.runningTasks
val runningTasks2 = s2.runningTasks
//查看是否有调度队列处于饥饿状态,看可分配的核数是否少于任务数,如果资源不够用,那么
//处于挨饿状态
val s1Needy = runningTasks1 val s2Needy = runningTasks2 //计算ShareRatio, 最小资源占用比例,这里可以理解为偏向任务较轻的
val minShareRatio1 = runningTasks1.toDouble / math.max(minShare1, 1.0).toDouble
val minShareRatio2 = runningTasks2.toDouble / math.max(minShare2, 1.0).toDouble
//计算Task的Weight比重即权重,任务数相同,权重高的优先
val taskToWeightRatio1 = runningTasks1.toDouble / s1.weight.toDouble
val taskToWeightRatio2 = runningTasks2.toDouble / s2.weight.toDouble
var compare: Int = 0
//首先处于饥饿优先
if (s1Needy && !s2Needy) {
return true
} else if (!s1Needy && s2Needy) {
return false
} else if (s1Needy && s2Needy) {
//都处于挨饿状态则,需要资源占用比小的优先
compare = minShareRatio1.compareTo(minShareRatio2)
} else {
//都不挨饿,则比较权重比,比例低的优先
compare = taskToWeightRatio1.compareTo(taskToWeightRatio2)
}

if (compare <0) {
true
} else if (compare > 0) {
false
} else {
//如果都一样,那么比较名字,按照字母顺序比较,所以名字比较重要
s1.name }
}
}

注:

公平原则本着的原则就是谁最需要就给谁,所以挨饿者优先;

资源占用比这块有点费解,如果把他理解成一个贪心问题就容易理解了。对于都是出于挨饿状态的任务可以这样理解,负载大的即时给你资源你也不一定能有效缓解,莫不如给负载小的,让其快速使用,完成后可以释放更多的资源,这是一种贪心策略。如JobA和JobB的Task数量相同都是10,A的minShare是2,B的是5,那占用比为5和2,显然B的占用比更小,贪心的策略应该给B先调度处理;

对于都处于满足状态的,当然谁的权重有着更好的决定性,权重比低得优先(偏向权利大的);

如果所有上述的比较都相同,那么名字字典排序靠前的优先(哈哈,名字很重要哦);名字aaa要比abc优先,所以这里在给Pool或者TaskSetManager起名字的时候要考虑这一点

 (备注来源:https://yq.aliyun.com/articles/6041)

 补充:这两种调度的排序算法针对的可比较对象都是Schedule的具体对象,这里我们对这个对象Schedulable做简单的解释。

       前面讲到,Schedulable可调度对象在Spark有两种形式:Pool和TaskSetManager。Pool是一个调度池,Pool里面还可以有子Pool,Spark中的rootPool即根节点默认是一个无名(default)的Pool。对于FIFO和FAIR有不同的层次。

       对于FIFO模式的调度,rootPool管理的直接就是TaskSetManager,没有子Pool这个概念,就只有两层,rootPool和叶子节点TaskSetManager,实现如下所示。


但对于FAIR这种模式来说,是三层的,根节点是rootPool,为无名Pool,下一层为用户定义的Pool(不指定名称默认名称为default),再下一层才是TaskSetManager,即根调度池管理一组调度池,每个调度池管理自己的TaskSetManager,其实现如下所示。



这里的调度顺序是指在一个SparkContext之内的调度,一般情况下我们自行使用是不太会需要Pool这个概念的,因为不存在Pool之间的竞争,但如果我们提供一个Spark应用,大家都可以提交任务,服务端有一个常驻的任务,对应一个SparkContext,每个用户提交的任务都由其代理执行,那么针对每个用户提交的任务可以按照用户等级和任务优先级设置一个Pool,这样不同的用户的Pool之间就存在竞争关系了,可以用Pool的优先级来区分任务和用户的优先级了,**但要再强调一点名字很重要,因为FAIR机制中,如果其他比较无法判断,那么会按照名字来进行字典排序的**。(补充来源:https://yq.aliyun.com/articles/6041)

5.创建DAGScheduler,调用TaskScheduler#start方法(SparkContext初始化过程中)

//SparkContext.scala
525) _dagScheduler = new DAGScheduler(this)
526) _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)
527)
528) // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
529) // constructor
530) _taskScheduler.start()


//TaskSchedulerImpl.scala
override def start() {
//启动SparkDeploySchedulerBackend
backend.start()

if (!isLocal && conf.getBoolean("spark.speculation", false)) {
logInfo("Starting speculative execution thread")
speculationScheduler.scheduleAtFixedRate(new Runnable {
override def run(): Unit = Utils.tryOrStopSparkContext(sc) {
checkSpeculatableTasks()
}
}, SPECULATION_INTERVAL_MS, SPECULATION_INTERVAL_MS, TimeUnit.MILLISECONDS)
}
}


6.启动Executor

override def start() {
super.start()
launcherBackend.connect()

// The endpoint for executors to talk to us
val driverUrl = rpcEnv.uriOf(SparkEnv.driverActorSystemName,
RpcAddress(sc.conf.get("spark.driver.host"), sc.conf.get("spark.driver.port").toInt),
CoarseGrainedSchedulerBackend.ENDPOINT_NAME)
val args = Seq(
"--driver-url", driverUrl,
"--executor-id", "{{EXECUTOR_ID}}",
"--hostname", "{{HOSTNAME}}",
"--cores", "{{CORES}}",
"--app-id", "{{APP_ID}}",
"--worker-url", "{{WORKER_URL}}")
val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions")
.map(Utils.splitCommandString).getOrElse(Seq.empty)
val classPathEntries = sc.conf.getOption("spark.executor.extraClassPath")
.map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil)
val libraryPathEntries = sc.conf.getOption("spark.executor.extraLibraryPath")
// Start executors with a few necessary configs for registering with the scheduler
val sparkJavaOpts = Utils.sparkJavaOpts(conf, SparkConf.isExecutorStartupConf)
val javaOpts = sparkJavaOpts ++ extraJavaOpts
//定义分配资源的进程名称
val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend",
args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts)
.map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil)
//省略部分代码,详细内容参见前几节,Executor注册过程。
client = new AppClient(sc.env.rpcEnv, masters, appDesc, this, conf)
//注册应用程序
client.start()
launcherBackend.setState(SparkAppHandle.State.SUBMITTED)
waitForRegistration()
launcherBackend.setState(SparkAppHandle.State.RUNNING)
}
在client#start方法中最终会注册应用程序。

二、总结

在SparkContext实例化的时候调用createTaskScheduler来创建TaskSchedulerImpl和SparkDeploySchedulerBackend,同时在SparkContext实例化的时候会调用TaskSchedulerImpl#start方法,在该方法中会调用SparkDeploySchedulerBackend#start;在这个start方法中会创建AppClient对象并调用AppClient#start方法,这时会创建ClientEndpoint,在创建ClientEndpoint时会传入来指定具体为当前应用程序启动的Executor进程的入口类的名称为CoarseGrainedExecutorBackend,然后ClientEndpoint启动并通过tryRegisterMaster来注册当前的应用程序到Master中,Master接收到注册信息后,如果可以运行程序,则会为该程序生成JobID并通过Schedule来分配计算资源,具体计算资源分配是通过应用程序运行方式、Memory、core等配置信息来决定的,最后Master会发送指令给Worker;Worker中为当前应用程序分配计算资源时,首先分配ExecutorRunner,ExecutorRunner内部会通过Thread的方式构建ProcessBuilder来启动另一个JVM进程,这个JVM进程启动时候会加载的main方法所在的类的名称就是创建ClientEndpoint传入的Command指定的入口类CoarseGrainedExecutorBackend,此时JVM在通过ProcessBuilder启动的时候获得了CoarseGrainedExecutorBackend后,加载并调用其中的main方法。在main方法中会实例化CoarseGrainedExecutorBackend本身这个消息循环体,而其实例化时会通过回调onStart向DriverEndpoint发送RegisterExecutor来注册当前的CoarseGrainedExecutorBackend,此时DriverEndpoint收到该注册信息并保存在了SparkDeployScheduler实例的内存数据结构中,这样Driver就获得了计算资源!


Task运行各个阶段交互过程





                                                                   图35-6 资源分配过程





备注:有关FIFO、FAIR调度算法解析部分参考 张安站 --Spark技术内幕一书


-----------------------------------------EOF---------------------------------------------------------------------------------------------------------




推荐阅读
  • 本指南从零开始介绍Scala编程语言的基础知识,重点讲解了Scala解释器REPL(读取-求值-打印-循环)的使用方法。REPL是Scala开发中的重要工具,能够帮助初学者快速理解和实践Scala的基本语法和特性。通过详细的示例和练习,读者将能够熟练掌握Scala的基础概念和编程技巧。 ... [详细]
  • 浅析python实现布隆过滤器及Redis中的缓存穿透原理_python
    本文带你了解了位图的实现,布隆过滤器的原理及Python中的使用,以及布隆过滤器如何应对Redis中的缓存穿透,相信你对布隆过滤 ... [详细]
  • 本文介绍了在 Java 编程中遇到的一个常见错误:对象无法转换为 long 类型,并提供了详细的解决方案。 ... [详细]
  • 本文介绍了如何使用Python的Paramiko库批量更新多台服务器的登录密码。通过示例代码展示了具体实现方法,确保了操作的高效性和安全性。Paramiko库提供了强大的SSH2协议支持,使得远程服务器管理变得更加便捷。此外,文章还详细说明了代码的各个部分,帮助读者更好地理解和应用这一技术。 ... [详细]
  • 属性类 `Properties` 是 `Hashtable` 类的子类,用于存储键值对形式的数据。该类在 Java 中广泛应用于配置文件的读取与写入,支持字符串类型的键和值。通过 `Properties` 类,开发者可以方便地进行配置信息的管理,确保应用程序的灵活性和可维护性。此外,`Properties` 类还提供了加载和保存属性文件的方法,使其在实际开发中具有较高的实用价值。 ... [详细]
  • 如何使用 `org.eclipse.rdf4j.query.impl.MapBindingSet.getValue()` 方法及其代码示例详解 ... [详细]
  • Python 伦理黑客技术:深入探讨后门攻击(第三部分)
    在《Python 伦理黑客技术:深入探讨后门攻击(第三部分)》中,作者详细分析了后门攻击中的Socket问题。由于TCP协议基于流,难以确定消息批次的结束点,这给后门攻击的实现带来了挑战。为了解决这一问题,文章提出了一系列有效的技术方案,包括使用特定的分隔符和长度前缀,以确保数据包的准确传输和解析。这些方法不仅提高了攻击的隐蔽性和可靠性,还为安全研究人员提供了宝贵的参考。 ... [详细]
  • 优化后的标题:深入探讨网关安全:将微服务升级为OAuth2资源服务器的最佳实践
    本文深入探讨了如何将微服务升级为OAuth2资源服务器,以订单服务为例,详细介绍了在POM文件中添加 `spring-cloud-starter-oauth2` 依赖,并配置Spring Security以实现对微服务的保护。通过这一过程,不仅增强了系统的安全性,还提高了资源访问的可控性和灵活性。文章还讨论了最佳实践,包括如何配置OAuth2客户端和资源服务器,以及如何处理常见的安全问题和错误。 ... [详细]
  • 利用 Python Socket 实现 ICMP 协议下的网络通信
    在计算机网络课程的2.1实验中,学生需要通过Python Socket编程实现一种基于ICMP协议的网络通信功能。与操作系统自带的Ping命令类似,该实验要求学生开发一个简化的、非标准的ICMP通信程序,以加深对ICMP协议及其在网络通信中的应用的理解。通过这一实验,学生将掌握如何使用Python Socket库来构建和解析ICMP数据包,并实现基本的网络探测功能。 ... [详细]
  • 使用Maven JAR插件将单个或多个文件及其依赖项合并为一个可引用的JAR包
    本文介绍了如何利用Maven中的maven-assembly-plugin插件将单个或多个Java文件及其依赖项打包成一个可引用的JAR文件。首先,需要创建一个新的Maven项目,并将待打包的Java文件复制到该项目中。通过配置maven-assembly-plugin,可以实现将所有文件及其依赖项合并为一个独立的JAR包,方便在其他项目中引用和使用。此外,该方法还支持自定义装配描述符,以满足不同场景下的需求。 ... [详细]
  • 在Java Web服务开发中,Apache CXF 和 Axis2 是两个广泛使用的框架。CXF 由于其与 Spring 框架的无缝集成能力,以及更简便的部署方式,成为了许多开发者的首选。本文将详细介绍如何使用 CXF 框架进行 Web 服务的开发,包括环境搭建、服务发布和客户端调用等关键步骤,为开发者提供一个全面的实践指南。 ... [详细]
  • 本文介绍了如何利用ObjectMapper实现JSON与JavaBean之间的高效转换。ObjectMapper是Jackson库的核心组件,能够便捷地将Java对象序列化为JSON格式,并支持从JSON、XML以及文件等多种数据源反序列化为Java对象。此外,还探讨了在实际应用中如何优化转换性能,以提升系统整体效率。 ... [详细]
  • 第二章:Kafka基础入门与核心概念解析
    本章节主要介绍了Kafka的基本概念及其核心特性。Kafka是一种分布式消息发布和订阅系统,以其卓越的性能和高吞吐量而著称。最初,Kafka被设计用于LinkedIn的活动流和运营数据处理,旨在高效地管理和传输大规模的数据流。这些数据主要包括用户活动记录、系统日志和其他实时信息。通过深入解析Kafka的设计原理和应用场景,读者将能够更好地理解其在现代大数据架构中的重要地位。 ... [详细]
  • 投融资周报 | Circle 达成 4 亿美元融资协议,唯一艺术平台 A 轮融资超千万美元 ... [详细]
  • 本文详细探讨了Zebra路由软件中的线程机制及其实际应用。通过对Zebra线程模型的深入分析,揭示了其在高效处理网络路由任务中的关键作用。文章还介绍了线程同步与通信机制,以及如何通过优化线程管理提升系统性能。此外,结合具体应用场景,展示了Zebra线程机制在复杂网络环境下的优势和灵活性。 ... [详细]
author-avatar
beat_小然
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有