作者:klolo先生 | 来源:互联网 | 2023-09-07 19:02
先看下原理图:从Master的 completeRecovery方法开始分析,代码如下:***完成主备机切换,当主Master挂掉的时候完成StandByMaster的启动*def
先看下原理图:
从Master的 completeRecovery方法开始分析 , 代码如下:
/**
* 完成主备机切换 , 当主Master挂掉的时候完成StandByMaster的启动
*/
def completeRecovery() {
// Ensure "only-once" recovery semantics using a short synchronization period.
synchronized {
if (state != RecoveryState.RECOVERING) { return }
state = RecoveryState.COMPLETING_RECOVERY
}
// Kill off any workers and apps that didn't respond to us.
// 将Application和Worker的信息为UNKNOW的过滤出来,然后便利每一个信息
// 分别调用finishApplication和removeWorker对可能出现死掉或者有故障的Application和Worker进行清理
// 总结 : 1.从内存缓存中(HashMap)移除Worker和Application信息; 2.从相关的组件(Executor和Driver)的内存缓存结构中移除; 3.从持久化存储中移除
workers.filter(_.state == WorkerState.UNKNOWN).foreach(removeWorker)
apps.filter(_.state == ApplicationState.UNKNOWN).foreach(finishApplication)
// Reschedule drivers which were not claimed by any workers
// 重新发布Driver
drivers.filter(_.worker.isEmpty).foreach { d =>
logWarning(s"Driver ${d.id} was not found after master recovery")
if (d.desc.supervise) {
logWarning(s"Re-launching ${d.id}")
relaunchDriver(d)
} else {
removeDriver(d.id, DriverState.ERROR, None)
logWarning(s"Did not re-launch ${d.id} because it was not supervised")
}
}
// 将master的状态更改为ALIVE
state = RecoveryState.ALIVE
// master重新进行资源调度
schedule()
logInfo("Recovery complete - resuming operations!")
}
然后是对worker信息移除 , 代码如下:
/**
* 清理掉UNKNOW状态的worker
*/
def removeWorker(worker: WorkerInfo) {
logInfo("Removing worker " + worker.id + " on " + worker.host + ":" + worker.port)
// 设置状态为DEAD
worker.setState(WorkerState.DEAD)
// idToWorker为所有worker信息的缓存队列 , 其实就是一HashMap , 将传递过来的worker的ID从该缓存队列中移除掉
idToWorker -= worker.id
// addressToWorker同idToWorker一样 , 缓存所有worker的地址 , 这里也是将传递过来的worker的address从该缓存队列中移除掉
addressToWorker -= worker.actor.path.address
// 遍历worker中所有的executor , 告诉App所依赖运行的executor信息丢失并移除掉
for (exec <- worker.executors.values) {
logInfo("Telling app of lost executor: " + exec.id)
exec.application.driver ! ExecutorUpdated(
exec.id, ExecutorState.LOST, Some("worker lost"), None)
exec.application.removeExecutor(exec)
}
// 遍历worker中所有的driver , 若被StandByMaster监控则重新启动 , 没有被监控则移除掉driver
for (driver <- worker.drivers.values) {
if (driver.desc.supervise) {
logInfo(s"Re-launching ${driver.id}")
relaunchDriver(driver)
} else {
logInfo(s"Not re-launching ${driver.id} because it was not supervised")
removeDriver(driver.id, DriverState.ERROR, None)
}
}
// 最后将worker的持久化信息移除掉
persistenceEngine.removeWorker(worker)
}
接着是Application信息移除 , 代码如下:
/**
* 结束掉UNKNOW状态的Application
*/
def finishApplication(app: ApplicationInfo) {
// 只有这一行代码 , 将Application的状态更改为FINISH , 调用app的重构移除方法
removeApplication(app, ApplicationState.FINISHED)
}
/**
* 结束掉UNKNOW状态的Application
*/
def removeApplication(app: ApplicationInfo, state: ApplicationState.Value) {
// 检查master的Application缓存队列(HashSet)中是否包含传递过来的app信息
if (apps.contains(app)) {
logInfo("Removing app " + app.id)
// 在master的Application缓存队列中移除传递过来的app相关信息
apps -= app
idToApp -= app.id
actorToApp -= app.driver
addressToApp -= app.driver.path.address
if (completedApps.size >= RETAINED_APPLICATIONS) {
val toRemove = math.max(RETAINED_APPLICATIONS / 10, 1)
completedApps.take(toRemove).foreach( a => {
appIdToUI.remove(a.id).foreach { ui => webUi.detachSparkUI(ui) }
applicationMetricsSystem.removeSource(a.appSource)
})
completedApps.trimStart(toRemove)
}
completedApps += app // Remember it in our history
waitingApps -= app
// If application events are logged, use them to rebuild the UI
rebuildSparkUI(app)
// 移除app所依赖的executor信息 , 获取executor的actor发送消息给masterUrl杀掉该executor
for (exec <- app.executors.values) {
exec.worker.removeExecutor(exec)
exec.worker.actor ! KillExecutor(masterUrl, exec.application.id, exec.id)
exec.state = ExecutorState.KILLED
}
// 获取app所依赖的driver发送结束掉该App的信息
app.markFinished(state)
if (state != ApplicationState.FINISHED) {
app.driver ! ApplicationRemoved(state.toString)
}
// 从持久化中移除掉app信息
persistenceEngine.removeApplication(app)
// 重新调度
schedule()
// Tell all workers that the application has finished, so they can clean up any app state.
// 告知每一个worker节点该App已经结束掉
workers.foreach { w =>
w.actor ! ApplicationFinished(app.id)
}
}
}
上面两端代码中需要对WorkerInfo和ApplicationInfo信息进行详细了解一下 , 源码如下:
private[spark] class WorkerInfo(
val id: String,
val host: String,
val port: Int,
val cores: Int,
val memory: Int,
val actor: ActorRef,
val webUiPort: Int,
val publicAddress: String)
private[spark] class ApplicationInfo(
val startTime: Long,
val id: String,
val desc: ApplicationDescription,
val submitDate: Date,
val driver: ActorRef,
defaultCores: Int)
最后就是Driver信息的移除和被监控的Driver重新启动代码 , 其实在上面的第一段代码completeRecover中已经贴出:
// Reschedule drivers which were not claimed by any workers
// 重新发布Driver
drivers.filter(_.worker.isEmpty).foreach { d =>
logWarning(s"Driver ${d.id} was not found after master recovery")
if (d.desc.supervise) {
logWarning(s"Re-launching ${d.id}")
relaunchDriver(d)
} else {
removeDriver(d.id, DriverState.ERROR, None)
logWarning(s"Did not re-launch ${d.id} because it was not supervised")
}
}
// 将master的状态更改为ALIVE
state = RecoveryState.ALIVE
// master重新进行资源调度
schedule()
logInfo("Recovery complete - resuming operations!")