热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

2.Master主备机制切换源码分析

先看下原理图:从Master的 completeRecovery方法开始分析,代码如下:***完成主备机切换,当主Master挂掉的时候完成StandByMaster的启动*def

先看下原理图:

2.Master主备机制切换源码分析

从Master的 completeRecovery方法开始分析 , 代码如下:

  1. /**
  2. * 完成主备机切换 , 当主Master挂掉的时候完成StandByMaster的启动
  3. */
  4. def completeRecovery() {
  5. // Ensure "only-once" recovery semantics using a short synchronization period.
  6. synchronized {
  7. if (state != RecoveryState.RECOVERING) { return }
  8. state = RecoveryState.COMPLETING_RECOVERY
  9. }
  10. // Kill off any workers and apps that didn't respond to us.
  11. // 将Application和Worker的信息为UNKNOW的过滤出来,然后便利每一个信息
  12. // 分别调用finishApplication和removeWorker对可能出现死掉或者有故障的Application和Worker进行清理
  13. // 总结 : 1.从内存缓存中(HashMap)移除Worker和Application信息; 2.从相关的组件(Executor和Driver)的内存缓存结构中移除; 3.从持久化存储中移除
  14. workers.filter(_.state == WorkerState.UNKNOWN).foreach(removeWorker)
  15. apps.filter(_.state == ApplicationState.UNKNOWN).foreach(finishApplication)
  16. // Reschedule drivers which were not claimed by any workers
  17. // 重新发布Driver
  18. drivers.filter(_.worker.isEmpty).foreach { d =>
  19. logWarning(s"Driver ${d.id} was not found after master recovery")
  20. if (d.desc.supervise) {
  21. logWarning(s"Re-launching ${d.id}")
  22. relaunchDriver(d)
  23. } else {
  24. removeDriver(d.id, DriverState.ERROR, None)
  25. logWarning(s"Did not re-launch ${d.id} because it was not supervised")
  26. }
  27. }
  28. // 将master的状态更改为ALIVE
  29. state = RecoveryState.ALIVE
  30. // master重新进行资源调度
  31. schedule()
  32. logInfo("Recovery complete - resuming operations!")
  33. }

然后是对worker信息移除 , 代码如下:

  1. /**
  2. * 清理掉UNKNOW状态的worker
  3. */
  4. def removeWorker(worker: WorkerInfo) {
  5. logInfo("Removing worker " + worker.id + " on " + worker.host + ":" + worker.port)
  6. // 设置状态为DEAD
  7. worker.setState(WorkerState.DEAD)
  8. // idToWorker为所有worker信息的缓存队列 , 其实就是一HashMap , 将传递过来的worker的ID从该缓存队列中移除掉
  9. idToWorker -= worker.id
  10. // addressToWorker同idToWorker一样 , 缓存所有worker的地址 , 这里也是将传递过来的worker的address从该缓存队列中移除掉
  11. addressToWorker -= worker.actor.path.address
  12. // 遍历worker中所有的executor , 告诉App所依赖运行的executor信息丢失并移除掉
  13. for (exec <- worker.executors.values) {
  14. logInfo("Telling app of lost executor: " + exec.id)
  15. exec.application.driver ! ExecutorUpdated(
  16. exec.id, ExecutorState.LOST, Some("worker lost"), None)
  17. exec.application.removeExecutor(exec)
  18. }
  19. // 遍历worker中所有的driver , 若被StandByMaster监控则重新启动 , 没有被监控则移除掉driver
  20. for (driver <- worker.drivers.values) {
  21. if (driver.desc.supervise) {
  22. logInfo(s"Re-launching ${driver.id}")
  23. relaunchDriver(driver)
  24. } else {
  25. logInfo(s"Not re-launching ${driver.id} because it was not supervised")
  26. removeDriver(driver.id, DriverState.ERROR, None)
  27. }
  28. }
  29. // 最后将worker的持久化信息移除掉
  30. persistenceEngine.removeWorker(worker)
  31. }

接着是Application信息移除 , 代码如下:

  1. /**
  2. * 结束掉UNKNOW状态的Application
  3. */
  4. def finishApplication(app: ApplicationInfo) {
  5. // 只有这一行代码 , 将Application的状态更改为FINISH , 调用app的重构移除方法
  6. removeApplication(app, ApplicationState.FINISHED)
  7. }
  8. /**
  9. * 结束掉UNKNOW状态的Application
  10. */
  11. def removeApplication(app: ApplicationInfo, state: ApplicationState.Value) {
  12. // 检查master的Application缓存队列(HashSet)中是否包含传递过来的app信息
  13. if (apps.contains(app)) {
  14. logInfo("Removing app " + app.id)
  15. // 在master的Application缓存队列中移除传递过来的app相关信息
  16. apps -= app
  17. idToApp -= app.id
  18. actorToApp -= app.driver
  19. addressToApp -= app.driver.path.address
  20. if (completedApps.size >= RETAINED_APPLICATIONS) {
  21. val toRemove = math.max(RETAINED_APPLICATIONS / 10, 1)
  22. completedApps.take(toRemove).foreach( a => {
  23. appIdToUI.remove(a.id).foreach { ui => webUi.detachSparkUI(ui) }
  24. applicationMetricsSystem.removeSource(a.appSource)
  25. })
  26. completedApps.trimStart(toRemove)
  27. }
  28. completedApps += app // Remember it in our history
  29. waitingApps -= app
  30. // If application events are logged, use them to rebuild the UI
  31. rebuildSparkUI(app)
  32. // 移除app所依赖的executor信息 , 获取executor的actor发送消息给masterUrl杀掉该executor
  33. for (exec <- app.executors.values) {
  34. exec.worker.removeExecutor(exec)
  35. exec.worker.actor ! KillExecutor(masterUrl, exec.application.id, exec.id)
  36. exec.state = ExecutorState.KILLED
  37. }
  38. // 获取app所依赖的driver发送结束掉该App的信息
  39. app.markFinished(state)
  40. if (state != ApplicationState.FINISHED) {
  41. app.driver ! ApplicationRemoved(state.toString)
  42. }
  43. // 从持久化中移除掉app信息
  44. persistenceEngine.removeApplication(app)
  45. // 重新调度
  46. schedule()
  47. // Tell all workers that the application has finished, so they can clean up any app state.
  48. // 告知每一个worker节点该App已经结束掉
  49. workers.foreach { w =>
  50. w.actor ! ApplicationFinished(app.id)
  51. }
  52. }
  53. }

上面两端代码中需要对WorkerInfo和ApplicationInfo信息进行详细了解一下 , 源码如下:

  1. private[spark] class WorkerInfo(
  2. val id: String,
  3. val host: String,
  4. val port: Int,
  5. val cores: Int,
  6. val memory: Int,
  7. val actor: ActorRef,
  8. val webUiPort: Int,
  9. val publicAddress: String)

  1. private[spark] class ApplicationInfo(
  2. val startTime: Long,
  3. val id: String,
  4. val desc: ApplicationDescription,
  5. val submitDate: Date,
  6. val driver: ActorRef,
  7. defaultCores: Int)

最后就是Driver信息的移除和被监控的Driver重新启动代码 , 其实在上面的第一段代码completeRecover中已经贴出:

  1. // Reschedule drivers which were not claimed by any workers
  2. // 重新发布Driver
  3. drivers.filter(_.worker.isEmpty).foreach { d =>
  4. logWarning(s"Driver ${d.id} was not found after master recovery")
  5. if (d.desc.supervise) {
  6. logWarning(s"Re-launching ${d.id}")
  7. relaunchDriver(d)
  8. } else {
  9. removeDriver(d.id, DriverState.ERROR, None)
  10. logWarning(s"Did not re-launch ${d.id} because it was not supervised")
  11. }
  12. }
  13. // 将master的状态更改为ALIVE
  14. state = RecoveryState.ALIVE
  15. // master重新进行资源调度
  16. schedule()
  17. logInfo("Recovery complete - resuming operations!")




推荐阅读
  • 本文介绍了Python高级网络编程及TCP/IP协议簇的OSI七层模型。首先简单介绍了七层模型的各层及其封装解封装过程。然后讨论了程序开发中涉及到的网络通信内容,主要包括TCP协议、UDP协议和IPV4协议。最后还介绍了socket编程、聊天socket实现、远程执行命令、上传文件、socketserver及其源码分析等相关内容。 ... [详细]
  • LVS实现负载均衡的原理LVS负载均衡负载均衡集群是LoadBalance集群。是一种将网络上的访问流量分布于各个节点,以降低服务器压力,更好的向客户端 ... [详细]
  • PHP图片截取方法及应用实例
    本文介绍了使用PHP动态切割JPEG图片的方法,并提供了应用实例,包括截取视频图、提取文章内容中的图片地址、裁切图片等问题。详细介绍了相关的PHP函数和参数的使用,以及图片切割的具体步骤。同时,还提供了一些注意事项和优化建议。通过本文的学习,读者可以掌握PHP图片截取的技巧,实现自己的需求。 ... [详细]
  • 本文介绍了Java工具类库Hutool,该工具包封装了对文件、流、加密解密、转码、正则、线程、XML等JDK方法的封装,并提供了各种Util工具类。同时,还介绍了Hutool的组件,包括动态代理、布隆过滤、缓存、定时任务等功能。该工具包可以简化Java代码,提高开发效率。 ... [详细]
  • 本文介绍了Redis的基础数据结构string的应用场景,并以面试的形式进行问答讲解,帮助读者更好地理解和应用Redis。同时,描述了一位面试者的心理状态和面试官的行为。 ... [详细]
  • 本文介绍了使用PHP实现断点续传乱序合并文件的方法和源码。由于网络原因,文件需要分割成多个部分发送,因此无法按顺序接收。文章中提供了merge2.php的源码,通过使用shuffle函数打乱文件读取顺序,实现了乱序合并文件的功能。同时,还介绍了filesize、glob、unlink、fopen等相关函数的使用。阅读本文可以了解如何使用PHP实现断点续传乱序合并文件的具体步骤。 ... [详细]
  • Spring特性实现接口多类的动态调用详解
    本文详细介绍了如何使用Spring特性实现接口多类的动态调用。通过对Spring IoC容器的基础类BeanFactory和ApplicationContext的介绍,以及getBeansOfType方法的应用,解决了在实际工作中遇到的接口及多个实现类的问题。同时,文章还提到了SPI使用的不便之处,并介绍了借助ApplicationContext实现需求的方法。阅读本文,你将了解到Spring特性的实现原理和实际应用方式。 ... [详细]
  • 本文介绍了django中视图函数的使用方法,包括如何接收Web请求并返回Web响应,以及如何处理GET请求和POST请求。同时还介绍了urls.py和views.py文件的配置方式。 ... [详细]
  • 本文介绍了操作系统的定义和功能,包括操作系统的本质、用户界面以及系统调用的分类。同时还介绍了进程和线程的区别,包括进程和线程的定义和作用。 ... [详细]
  • Todayatworksomeonetriedtoconvincemethat:今天在工作中有人试图说服我:{$obj->getTableInfo()}isfine ... [详细]
  • Asp.net Mvc Framework 七 (Filter及其执行顺序) 的应用示例
    本文介绍了在Asp.net Mvc中应用Filter功能进行登录判断、用户权限控制、输出缓存、防盗链、防蜘蛛、本地化设置等操作的示例,并解释了Filter的执行顺序。通过示例代码,详细说明了如何使用Filter来实现这些功能。 ... [详细]
  • Centos下安装memcached+memcached教程
    本文介绍了在Centos下安装memcached和使用memcached的教程,详细解释了memcached的工作原理,包括缓存数据和对象、减少数据库读取次数、提高网站速度等。同时,还对memcached的快速和高效率进行了解释,与传统的文件型数据库相比,memcached作为一个内存型数据库,具有更高的读取速度。 ... [详细]
  • 本文介绍了在Android开发中使用软引用和弱引用的应用。如果一个对象只具有软引用,那么只有在内存不够的情况下才会被回收,可以用来实现内存敏感的高速缓存;而如果一个对象只具有弱引用,不管内存是否足够,都会被垃圾回收器回收。软引用和弱引用还可以与引用队列联合使用,当被引用的对象被回收时,会将引用加入到关联的引用队列中。软引用和弱引用的根本区别在于生命周期的长短,弱引用的对象可能随时被回收,而软引用的对象只有在内存不够时才会被回收。 ... [详细]
  • 本文讨论了在shiro java配置中加入Shiro listener后启动失败的问题。作者引入了一系列jar包,并在web.xml中配置了相关内容,但启动后却无法正常运行。文章提供了具体引入的jar包和web.xml的配置内容,并指出可能的错误原因。该问题可能与jar包版本不兼容、web.xml配置错误等有关。 ... [详细]
  • 本文介绍了在Oracle数据库中创建序列时如何选择cache或nocache参数。cache参数可以提高序列的存取速度,但可能会导致序列丢失;nocache参数可以避免序列丢失,但在高并发访问时可能导致性能问题。文章详细解释了两者的区别和使用场景。 ... [详细]
author-avatar
klolo先生
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有