热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

hadoop作业调度-源码分析

TaskScheduler是作业调度器的抽象基类.具体的实现有:JobQueueTaskScheduler:默认的FIFO调度队列LimitTasksPerJobTaskSc

 

TaskScheduler是作业调度器的抽象基类. 具体的实现有:

  • JobQueueTaskScheduler: 默认的FIFO调度队列
  • LimitTasksPerJobTaskScheduler: 扩展自JobQueueTaskScheduler, 可以对每个Job的task总数作限制.
  • CapacityScheduler: Yahoo开发的一个基于容量的作业调度器
  • FairScheduler: 公平调度器, 保证小任务得到快速响应, 大任务保证服务水平, 由facebook开发.

 

JobTracker维护一个TaskScheduler的实例,并托管其生命周期.

TaskScheduler实例伴随JobTracker一起构造

Code
    JobTracker(JobConf conf) throws IOException, InterruptedException {
        
        
// 创建调度器
        Class extends TaskScheduler> schedulerClass = conf.getClass("mapred.jobtracker.taskScheduler",   
            JobQueueTaskScheduler.
class, TaskScheduler.class);
        taskScheduler 
= (TaskScheduler) ReflectionUtils.newInstance(schedulerClass, conf);
        
    }

 

启动JobTracker的主方法中, 构造JobTracker对象并给taskScheduler设置TaskTrackerManager实例字段:

Code
public static JobTracker startTracker(JobConf conf) throws IOException, InterruptedException {
    JobTracker result 
= null;
    
while (true) {
        
            result 
= new JobTracker(conf);
            
// 给taskScheduler设置TaskTrackerManager实例
            result.taskScheduler.setTaskTrackerManager(result);
            
break;
        
        Thread.sleep(
1000);
    }
    
if (result != null) {
        JobEndNotifier.startNotifier();
    }
    
return result;
}

 

 

在offerService中回调其start()方法:

public   void  offerService()  throws  InterruptedException, IOException {
    taskScheduler.start();
    
}

 

taskScheduler在JobTracker.heartbeat方法中进行任务指派:

 

Code
public synchronized HeartbeatResponse heartbeat(TaskTrackerStatus status, 
    
boolean restarted, boolean initialContact, 
    
boolean acceptNewTasks, short responseId) throws IOException {
    
    
// 如果当次心跳允许指派任务,且当前上报心跳的taskTracker可接收新task(传入的acceptNewTasks为true)
    //, 并且其不在黑名单中,
则进入调度
    if (recoveryManager.shouldSchedule() && acceptNewTasks && !isBlacklisted) {
        TaskTrackerStatus taskTrackerStatus 
= getTaskTracker(trackerName);
        
if (taskTrackerStatus == null) {
            LOG.warn(
"Unknown task tracker polling; ignoring: " + trackerName);
        } 
else {
            List
<Task> tasks = getSetupAndCleanupTasks(taskTrackerStatus);
            
if (tasks == null) {
                
// 调用具体的taskScheduler实现来为当前taskTracker指派任务
                tasks = taskScheduler.assignTasks(taskTrackerStatus);
            }
            
if (tasks != null) {
                
for (Task task : tasks) {
                    expireLaunchingTasks.addNewTask(task.getTaskID());
                    LOG.debug(trackerName 
+ " -> LaunchTask: " + task.getTaskID());
                    actions.add(
new LaunchTaskAction(task));
                }
            }
        }
    }
    ...
}

 

 

 

taskScheduler销毁:

void  close()  throws  IOException {
    
    
if  (taskScheduler  !=   null ) {
        taskScheduler.terminate();
    }
    
}

 

taskScheduler中还有一个方法getJobsFromQueue(String queueName), 当前用于JobClient查询指定任务队列的Job信息.

TaskScheduler的实现的关键是分派任务的策略. 这体现在assignTasks方法中. TaskScheduler所需要的关于集群信息由
传入的TaskTrackerManager提供, TaskScheduler运行所需要的关于job的信息由JobQueueJobInProgressListener的Collection getJobQueue()方法提供.


JobQueueTaskScheduler是默认使用的TaskScheduler. 是一个单纯的FIFO.

其任务分配的规则相当简单:
对于Map任务:

1. 需要预留部分 task slot 以供失效任务,或应对边际效应的speculative任务之用.

2. 如果为当前TT找到 local task, 在不超过TT并发指标, 且满足集群task slot预留的基础上, 可继续为此 tt 分配 task.
    如果分配给当前TT的是remote task, 则当次分派完毕(也就是只会为当前TT分配一个任务)。

对于reduce任务:

  规则同上,不同之处在于不管有没有找到 local task, 每次heartbeat仅为给定 TT 分配一个task.

 

这里再补充一点信息, 在实际运营当中,当有某台机器的磁盘出现read only file-system时, 整个Job都会被挂死.
原因是因为机器任务失败后,其TaskTracker仍会不断和JobTracker进行心跳来领任务,而对于 ROFS这种情况,其后续Task的执行也必然错误.
这样,这个TT不断的错误,直到超过Job中容忍的错误上限时整个Job失败.
当前job的失败警戒值为mapred.max.map.failures还有mapred.max.reduce.failures.当job中map或reduce总共有这么多次失败后,job就会宣告失败.
对于tasktracker来讲,有个属性mapred.max.tasktracker.failures用来指定对于给定的job单个tt上最多能失败多少次, 如果超过阀值此tt会被拉入给定jobtracker的黑名单.

对于ROFS错误,要么是文件系统错误,要么是硬盘损坏。
对于文件系统错误,首先可以通过fsck进行文件系统修复,如果修复未果,只能重启,如果重启未果,则需要进行操作系统修复。
如果是磁盘损坏,若支持热插拔最好,否则只能关机换盘.


推荐阅读
  • 使用 ListView 浏览安卓系统中的回收站文件 ... [详细]
  • 本文详细介绍了一种利用 ESP8266 01S 模块构建 Web 服务器的成功实践方案。通过具体的代码示例和详细的步骤说明,帮助读者快速掌握该模块的使用方法。在疫情期间,作者重新审视并研究了这一未被充分利用的模块,最终成功实现了 Web 服务器的功能。本文不仅提供了完整的代码实现,还涵盖了调试过程中遇到的常见问题及其解决方法,为初学者提供了宝贵的参考。 ... [详细]
  • 在Java Web服务开发中,Apache CXF 和 Axis2 是两个广泛使用的框架。CXF 由于其与 Spring 框架的无缝集成能力,以及更简便的部署方式,成为了许多开发者的首选。本文将详细介绍如何使用 CXF 框架进行 Web 服务的开发,包括环境搭建、服务发布和客户端调用等关键步骤,为开发者提供一个全面的实践指南。 ... [详细]
  • Squaretest:自动生成功能测试代码的高效插件
    本文将介绍一款名为Squaretest的高效插件,该工具能够自动生成功能测试代码。使用这款插件的主要原因是公司近期加强了代码质量的管控,对各项目进行了严格的单元测试评估。Squaretest不仅提高了测试代码的生成效率,还显著提升了代码的质量和可靠性。 ... [详细]
  • ButterKnife 是一款用于 Android 开发的注解库,主要用于简化视图和事件绑定。本文详细介绍了 ButterKnife 的基础用法,包括如何通过注解实现字段和方法的绑定,以及在实际项目中的应用示例。此外,文章还提到了截至 2016 年 4 月 29 日,ButterKnife 的最新版本为 8.0.1,为开发者提供了最新的功能和性能优化。 ... [详细]
  • 属性类 `Properties` 是 `Hashtable` 类的子类,用于存储键值对形式的数据。该类在 Java 中广泛应用于配置文件的读取与写入,支持字符串类型的键和值。通过 `Properties` 类,开发者可以方便地进行配置信息的管理,确保应用程序的灵活性和可维护性。此外,`Properties` 类还提供了加载和保存属性文件的方法,使其在实际开发中具有较高的实用价值。 ... [详细]
  • 本文详细解析了Java类加载系统的父子委托机制。在Java程序中,.java源代码文件编译后会生成对应的.class字节码文件,这些字节码文件需要通过类加载器(ClassLoader)进行加载。ClassLoader采用双亲委派模型,确保类的加载过程既高效又安全,避免了类的重复加载和潜在的安全风险。该机制在Java虚拟机中扮演着至关重要的角色,确保了类加载的一致性和可靠性。 ... [详细]
  • 本文介绍了一种自定义的Android圆形进度条视图,支持在进度条上显示数字,并在圆心位置展示文字内容。通过自定义绘图和组件组合的方式实现,详细展示了自定义View的开发流程和关键技术点。示例代码和效果展示将在文章末尾提供。 ... [详细]
  • Spring框架中枚举参数的正确使用方法与技巧
    本文详细阐述了在Spring Boot框架中正确使用枚举参数的方法与技巧,旨在帮助开发者更高效地掌握和应用枚举类型的数据传递,适合对Spring Boot感兴趣的读者深入学习。 ... [详细]
  • 深入剖析Java中SimpleDateFormat在多线程环境下的潜在风险与解决方案
    深入剖析Java中SimpleDateFormat在多线程环境下的潜在风险与解决方案 ... [详细]
  • 在Cisco IOS XR系统中,存在提供服务的服务器和使用这些服务的客户端。本文深入探讨了进程与线程状态转换机制,分析了其在系统性能优化中的关键作用,并提出了改进措施,以提高系统的响应速度和资源利用率。通过详细研究状态转换的各个环节,本文为开发人员和系统管理员提供了实用的指导,旨在提升整体系统效率和稳定性。 ... [详细]
  • 本文深入解析了WCF Binding模型中的绑定元素,详细介绍了信道、信道管理器、信道监听器和信道工厂的概念与作用。从对象创建的角度来看,信道管理器负责信道的生成。具体而言,客户端的信道通过信道工厂进行实例化,而服务端则通过信道监听器来接收请求。文章还探讨了这些组件之间的交互机制及其在WCF通信中的重要性。 ... [详细]
  • 在处理 XML 数据时,如果需要解析 `` 标签的内容,可以采用 Pull 解析方法。Pull 解析是一种高效的 XML 解析方式,适用于流式数据处理。具体实现中,可以通过 Java 的 `XmlPullParser` 或其他类似的库来逐步读取和解析 XML 文档中的 `` 元素。这样不仅能够提高解析效率,还能减少内存占用。本文将详细介绍如何使用 Pull 解析方法来提取 `` 标签的内容,并提供一个示例代码,帮助开发者快速解决问题。 ... [详细]
  • 本文探讨了如何利用Java代码获取当前本地操作系统中正在运行的进程列表及其详细信息。通过引入必要的包和类,开发者可以轻松地实现这一功能,为系统监控和管理提供有力支持。示例代码展示了具体实现方法,适用于需要了解系统进程状态的开发人员。 ... [详细]
  • 深入解析C#中app.config文件的配置与修改方法
    在C#开发过程中,经常需要对系统的配置文件进行读写操作,如系统初始化参数的修改或运行时参数的更新。本文将详细介绍如何在C#中正确配置和修改app.config文件,包括其结构、常见用法以及最佳实践。此外,还将探讨exe.config文件的生成机制及其在不同环境下的应用,帮助开发者更好地管理和维护应用程序的配置信息。 ... [详细]
author-avatar
你是我的小心肝啊哈哈
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有