在SparkContext(这里基于Spark的版本是1.3.1)主要做的工作是:
1.创建SparkEnv,里面又一个很重要的对象ActorSystem
2.创建TaskScheduler,这里是根据提交的集群来创建相应的TaskScheduler
3.创建DAGScheduler
4.调用taskScheduler.start()方法启动
部分源码如下:
class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationClient {
//TODO 该方法创建了一个ActorSystem private[spark] def createSparkEnv(
conf: SparkConf,
isLocal: Boolean,
listenerBus: LiveListenerBus): SparkEnv = {
SparkEnv.createDriverEnv(conf, isLocal, listenerBus)
}
// Create and start the scheduler //TODO 创建一个TaskScheduler private[spark] var (schedulerBackend, taskScheduler) =
SparkContext.createTaskScheduler(this, master)
//TODO 通过ActorSystem创建了一个Actor,这个心跳是Executors和DriverActor的心跳 private val heartbeatReceiver = env.actorSystem.actorOf(
Props(new HeartbeatReceiver(taskScheduler)), "HeartbeatReceiver")
@volatile private[spark] var dagScheduler: DAGScheduler = _
try {
//TODO 创建了一个DAGScheduler,以后用来把DAG切分成Stage dagScheduler = new DAGScheduler(this)
} catch {
case e: Exception => {
try {
stop()
} finally {
throw new SparkException("Error while constructing DAGScheduler", e)
}
}
}
// start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's // constructor //TODO 启动taskScheduler taskScheduler.start()
}
这方法是在SparkContext的构造函数中创建,下面是部分源码:
//TODO 根据提交任务时指定的URL创建相应的TaskScheduler
private def createTaskScheduler(sc: SparkContext,
master: String): (SchedulerBackend, TaskScheduler) =
{
//TODO spark的StandAlone模式
case SPARK_REGEX(sparkUrl) =>
//TODO 创建了一个TaskSchedulerImpl
val scheduler = new TaskSchedulerImpl(sc)
val masterUrls = sparkUrl.split(",").map("spark://" + _)
//TODO 创建了一个SparkDeploySchedulerBackend
val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls)
//TODO 调用initialize创建调度器
scheduler.initialize(backend)
(backend, scheduler)
}
这里根据相应的提交创建对应的TaskScheduler对象,然后创建一个SparkDeploySchedulerBacken对象,最后调用initialize()方法创建调度器,接下来我们来看一下创建调度器的内部方法发生了什么?
def initialize(backend: SchedulerBackend) {
this.backend = backend
// temporarily set rootPool name to empty
rootPool = new Pool("", schedulingMode, 0, 0)
schedulableBuilder = {
schedulingMode match {
//默认的调度模式是FIFO
case SchedulingMode.FIFO =>
new FIFOSchedulableBuilder(rootPool)
case SchedulingMode.FAIR =>
new FairSchedulableBuilder(rootPool, conf)
}
}
schedulableBuilder.buildPools()
}
可以看出,在内部方法里面选择的是调度模式,这里需要注意的是Spark默认的调度模式是FIFO.
在创建TaskScheduler方法以后,SparkContext构造器使用taskScheduler.start()
启动任务调度器,接下来我们看一下内部发生了什么,这里需要注意的是,如果你看的并非Spark集群的实现,需要看的实现类是不不一样的;我在这里看的是TaskSchedulerImpl类。
private[spark] class TaskSchedulerImpl(
val sc: SparkContext,
val maxTaskFailures: Int,
isLocal: Boolean = false)
extends TaskScheduler with Logging
{
override def start() {
//TODO 首先掉用SparkDeploySchedulerBackend的start方法
backend.start()
if (!isLocal && conf.getBoolean("spark.speculation", false)) {
logInfo("Starting speculative execution thread")
import sc.env.actorSystem.dispatcher
sc.env.actorSystem.scheduler.schedule(SPECULATION_INTERVAL milliseconds,
SPECULATION_INTERVAL milliseconds) {
Utils.tryOrExit { checkSpeculatableTasks() }
}
}
}
这里首先调用SparkDeploySchedulerBackend的start()方法,我们继续追踪内部源码
override def start() {
//TODO 首先调用父类的start方法来创建DriverActor
super.start()
// The endpoint for executors to talk to us
//TODO 准备一些参数,以后把这些参数封装到一个对象中,然后将该对象发送给Master
val driverUrl = AkkaUtils.address(
AkkaUtils.protocol(actorSystem),
SparkEnv.driverActorSystemName,
conf.get("spark.driver.host"),
conf.get("spark.driver.port"),
CoarseGrainedSchedulerBackend.ACTOR_NAME)
val args = Seq(
"--driver-url", driverUrl,
"--executor-id", "{{EXECUTOR_ID}}",
"--hostname", "{{HOSTNAME}}",
"--cores", "{{CORES}}",
"--app-id", "{{APP_ID}}",
"--worker-url", "{{WORKER_URL}}")
val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions")
.map(Utils.splitCommandString).getOrElse(Seq.empty)
val classPathEntries = sc.conf.getOption("spark.executor.extraClassPath")
.map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil)
val libraryPathEntries = sc.conf.getOption("spark.executor.extraLibraryPath")
.map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil)
// When testing, expose the parent class path to the child. This is processed by
// compute-classpath.{cmd,sh} and makes all needed jars available to child processes
// when the assembly is built with the "*-provided" profiles enabled.
val testingClassPath =
if (sys.props.contains("spark.testing")) {
sys.props("java.class.path").split(java.io.File.pathSeparator).toSeq
} else {
Nil
}
// Start executors with a few necessary configs for registering with the scheduler
val sparkJavaOpts = Utils.sparkJavaOpts(conf, SparkConf.isExecutorStartupConf)
val javaOpts = sparkJavaOpts ++ extraJavaOpts
//TODO 重要:这个参数是以后Executor的实现类
val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend",
args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts)
val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("")
//TODO 把参数封装到ApplicationDescription
val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
appUIAddress, sc.eventLogDir, sc.eventLogCodec)
//TODO 创建一个AppClient把ApplicationDescription通过主构造器传进去
client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf)
//TODO 然后调用AppClient的start方法,在start方法中创建了一个ClientActor用于与Master通信
client.start()
waitForRegistration()
}
这个方法内部首先调用父类的start()方法,我们可以继续跟进,查看父类的start()方法内部,Spark干了些什么。这里我们需要看一下CoarseGrainedSchedulerBackend类的start()方法
override def start() {
val properties = new ArrayBuffer[(String, String)]
for ((key, value) <- scheduler.sc.conf.getAll) {
if (key.startsWith("spark.")) {
properties += ((key, value))
}
}
// TODO (prashant) send conf instead of properties
// TODO 通过ActorSystem创建DriverActor
driverActor = actorSystem.actorOf(
Props(new DriverActor(properties)), name = CoarseGrainedSchedulerBackend.ACTOR_NAME)
}
上面创建了一个DirverActor对象,这个对象是用于和Executor通信的,这里的ActorSystem是在SparkContext的SparkEnv创建的。
接下来,Spark需要封装一些我们设置的参数,具体如下:
//TODO 准备一些参数,以后把这些参数封装到一个对象中,然后将该对象发送给Master
val driverUrl = AkkaUtils.address(
AkkaUtils.protocol(actorSystem),
SparkEnv.driverActorSystemName,
conf.get("spark.driver.host"),
conf.get("spark.driver.port"),
CoarseGrainedSchedulerBackend.ACTOR_NAME)
val args = Seq(
"--driver-url", driverUrl,
"--executor-id", "{{EXECUTOR_ID}}",
"--hostname", "{{HOSTNAME}}",
"--cores", "{{CORES}}",
"--app-id", "{{APP_ID}}",
"--worker-url", "{{WORKER_URL}}")
val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions")
.map(Utils.splitCommandString).getOrElse(Seq.empty)
val classPathEntries = sc.conf.getOption("spark.executor.extraClassPath")
.map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil)
val libraryPathEntries = sc.conf.getOption("spark.executor.extraLibraryPath")
.map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil)
// When testing, expose the parent class path to the child. This is processed by
// compute-classpath.{cmd,sh} and makes all needed jars available to child processes
// when the assembly is built with the "*-provided" profiles enabled.
val testingClassPath =
if (sys.props.contains("spark.testing")) {
sys.props("java.class.path").split(java.io.File.pathSeparator).toSeq
} else {
Nil}
然后,需要封装一个很重要的参数,主要主要用于以后Worker进程启动Executor进程,具体如下:
val sparkJavaOpts = Utils.sparkJavaOpts(conf, SparkConf.isExecutorStartupConf)
val javaOpts = sparkJavaOpts ++ extraJavaOpts
//TODO 重要:这个参数是以后Executor的实现类
val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend",
args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts)
val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("")
//TODO 把参数封装到ApplicationDescription
val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
appUIAddress, sc.eventLogDir, sc.eventLogCodec)
//TODO 创建一个AppClient把ApplicationDescription通过主构造器传进去
client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf)
//TODO 然后调用AppClient的start方法,在start方法中创建了一个ClientActor用于与Master通信
client.start()
然后创建一个AppClient的对象,然后和通信,这里调用client.start()方法就是调用AppClient的生命周期方法。
接下来,我们将要从preStart()方法开始看起来:
//TODO ClientActor的生命周期方法
override def preStart() {
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
try {
//TODO ClientActor向Master注册
registerWithMaster()
} catch {
case e: Exception =>
logWarning("Failed to connect to master", e)
markDisconnected()
context.stop(self)
}
}
可以看出,这里主要是向Master注册,我们具体看一下注册方法的内部:
def registerWithMaster() {
//TODO 向Master注册
tryRegisterAllMasters()
import context.dispatcher
var retries = 0
registratiOnRetryTimer= Some {
context.system.scheduler.schedule(REGISTRATION_TIMEOUT, REGISTRATION_TIMEOUT) {
Utils.tryOrExit {
retries += 1
if (registered) {
registrationRetryTimer.foreach(_.cancel())
} else if (retries >= REGISTRATION_RETRIES) {
markDead("All masters are unresponsive! Giving up.")
} else {
tryRegisterAllMasters()
}
}
}
}
}
def tryRegisterAllMasters() {
for (masterAkkaUrl <- masterAkkaUrls) {
logInfo("Connecting to master " + masterAkkaUrl + "...")
//TODO 循环所有Master地址,跟Master建立连接
val actor = context.actorSelection(masterAkkaUrl)
//TODO 拿到了Master的一个引用,然后向Master发送注册应用的请求,所有的参数都封装到appDescription
actor ! RegisterApplication(appDescription)
}
}
主要的流程是:
1.向所有的Master发送注册信息,RegisterApplication(appDescription),RegisterApplication是一个用例类,appDescription是上述我们分析的封装参数
2.如果超过一定的次数,那么将会报错
下面,我们就需要看一下Master收到注册信息后,如何处理了!以下代码是Master类中
//TODO ClientActor发送过来的注册应用的消息
case RegisterApplication(description) => {
if (state == RecoveryState.STANDBY) {
// ignore, don't send response
} else {
logInfo("Registering app " + description.name)
//TODO 首先把应用的信息放到内存中存储
val app = createApplication(description, sender)
registerApplication(app)
logInfo("Registered app " + description.name + " with ID " + app.id)
//TODO 利用持久化引擎保存
persistenceEngine.addApplication(app)
//TODO Master向ClientActor发送注册成功的消息
sender ! RegisteredApplication(app.id, masterUrl)
//TODO 重要:Master开始调度资源,其实就是把任务启动到哪些Worker上
schedule()
}
}
这里面代码的主要流程是将我们提交的程序配置信息保存,并且持久化,这主要是保证高可用,然后在向Driver发送消息表示注册成功,然后调用schedule()方法,这是一个非常重要的方法。
这里有两种调度的方式,一种是尽量打散,一种是尽量集中。
//TODO 下面是两种调度方式,一中是尽量打散,另一种是尽量集中
// Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
// in the queue, then the second app, etc.
if (spreadOutApps) {
// Try to spread out each app among all the nodes, until it has all its cores
for (app <- waitingApps if app.coresLeft > 0) {
val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
.filter(canUse(app, _)).sortBy(_.coresFree).reverse
val numUsable = usableWorkers.length
val assigned = new Array[Int](numUsable) // Number of cores to give on each node
var toAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)
var pos = 0
while (toAssign > 0) {
if (usableWorkers(pos).coresFree - assigned(pos) > 0) {
toAssign -= 1
assigned(pos) += 1
}
pos = (pos + 1) % numUsable
}
// Now that we've decided how many cores to give on each node, let's actually give them
for (pos <- 0 until numUsable) {
if (assigned(pos) > 0) {
val exec = app.addExecutor(usableWorkers(pos), assigned(pos))
//TODO Master发送消息让Worker启动Executor
launchExecutor(usableWorkers(pos), exec)
app.state = ApplicationState.RUNNING
}
}
}
} else {
// Pack each app into as few nodes as possible until we've assigned all its cores
for (worker <- workers if worker.coresFree > 0 && worker.state == WorkerState.ALIVE) {
for (app <- waitingApps if app.coresLeft > 0) {
if (canUse(app, worker)) {
val coresToUse = math.min(worker.coresFree, app.coresLeft)
if (coresToUse > 0) {
val exec = app.addExecutor(worker, coresToUse)
//TODO Master发送消息让Worker启动Executor
launchExecutor(worker, exec)
app.state = ApplicationState.RUNNING
}
}
}
}
}
}
尽量打散方法的逻辑是:
1.首先进行对Worker进行筛选,通过是否存活,是否已经启动该应用程序,CPU核数大小倒叙排列
2.创建一个待分配资源的数组,这个数组大小的可用的Worker的数量
3.然后通过轮询的方法进行分配
尽量集中的方法的逻辑是:
1.首先是根据Worker的可用内存和是否已经启动该应用程序来筛选Worker
2.尽量将一个Worker的所有资源分配,如果一个Worker分配后还是不能满足,继续第二个Worker
3.然后再继续进行分配其他应用
在任务分配完成后,就是启动Executor
def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc) {
logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
//TODO 记录该Worker使用的资源
worker.addExecutor(exec)
//TODO Master发送消息给Worker,把参数通过case class传递给Worker,让他启动Executor,
worker.actor ! LaunchExecutor(masterUrl,
exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory)
//TODO Master向ClientActor发送消息,告诉它Executor已经启动了
exec.application.driver ! ExecutorAdded(
exec.id, worker.id, worker.hostPort, exec.cores, exec.memory)
}
主要的逻辑如下:
1.记录Worker使用的资源
2.将上述封装的信息发送给Worker,然后让Worker启动Executor
3.最后通知Driver,Executor已经启动了。