热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

spark作业调度原理

概述        spark有多种方式调度各个计算所需的资源.         首先,每个application(即sparkContext实例)

概述

         spark有多种方式调度各个计算所需的资源.

         首先,每个application(即sparkContext实例)有一组独立的Executor进程。各种cluster manager(yarn Resource manager,standalone master等等)为application间的调度提供自己的方式。

         其次,一个application内的各个job(spark action实例)也可能并行执行,前提是各个job是在独立的线程中提交。这种情况在处理网络请求的场景下是正常的,如shark。spark提供了fairscheduler来调度application内的job。

 

调度application

         每个spark application有一组专有的executor jvm,这组jvm只用来跑这个application的任务,以及只存储这个application的数据。如果是多用户共享集群,不同的cluster manager,会有不同的资源分配(或调度)方式:

standalone模式:默认情况下,提交到集群的application以FIFO方式调度,每个被调度的application会使用所有的集群资源。你可以配置spark.cores.max来控制application的资源使用量;若未设置spark.core.max,也可以通过spark.deploy.defaultCores设置默认值。除了控制core的使用,也可以通过spark.executor.memory指定每个executor的内存使用量。

Mesos:在memos模式下,通过spark.mesos.coarse=true的配置可以启用静态分区(static partition)功能。同standalone模式一样,也可以通过spark.cores.max和spark.executor.memory分别配置core和内存的使用量。

yarn:在yarn模式下,–num-executors控制application使用的executor的数量,–executor-memory 和 –executor-cores分别控制core和内存使用。

 

         对于memos,还可以实现CPU核心动态共享(dynamic sharing of CPU cores)。这种模式下,每个application同样具有固定和独立的内存,只是在计算空闲时,可以把core分配给其他application使用。这种模式在具有大量不是很活跃的application的情况下很有用。此模式的一个缺点是:当一个application需要重新获得属于自己的核心时,可能由于借用core的application正在计算而需要等待,造成不可预测的延迟。开启这种模式,只需要使用memos://URL以及spark.mesos.coarse设置为false(默认值就是false)。

请注意,目前没有一种模式提供application间的内存共享。如果有这种需求,我们推荐使用单个sever application对外提供服务,以便共享一个RDD。shark JDBC就是用这种方式提供SQL查询的。

 

动态资源分配(Dynamic Resource Allocation)

 

         从spark1.2开始,可以根据application的负载动态地增加和减少分配给application的资源。也就是说,你的application在不需要资源的时候会把资源退还给集群,而在需要的时候重新申请获得资源。这在spark集群上有多个application时候很有用。当分配给某个application的资源处于空闲状态,这些资源会退还到集群的资源池从而被其他application使用。spark中的动态资源分配的粒度是executor,通过spark.dynamicAllocation.enabled=true即可开启。

 

         目前这个功能是关闭的,仅在yarn中有效。未来发行版本会在standalone和memos coarse-grained模式应用。虽然memos目前在fine-grained模式下有类似的动态资源共享,但是开启动态资源分配可以减少memos在粗粒度的调度延迟。

 

配置

         所有配置都在spark.dynamicAllocation.*的命名空间下。为了使用动态资源分配特性,application必须设置spark.dynamicAllocation.enabled=true,以及通过spark.dynamicAllocation.minExecutors 和spark.dynamicAllocation.maxExecutors分别设置executor数量的上下界。

         另外,spark应用必须使用独立的shuffle service。这个shuffle service的目的是保存executor产生的文件并提供给后续任务使用,从而使得executor可以安全的移除。要启用独立的shuffle sevice,需要设置spark.shuffle.service.enabled=true。在yarn中,实现该shuffle service的类是 org.apache.spark.yarn.network.YarnShuffleService,该服务会运行在所有的nodemanager。具体启动shuffle service 的步骤如下:

1.编译spark,同时指定YARN profile。

2.找到spark--yarn-shuffle.jar。这是应该是shuffle sevice。如果在编译时候没有指定–tgz,则这个jar包在 $SPARK_HOME/network/yarn/target/scala-目录下;否则就在打包后的发行版的lib目录。

3.拷贝上述jar包到yarn集群的所有nodemanager。

4.在每个nodemanager的yarn-site.xml配置文件里: yarn.nodemanager.aux-services配置项添加spark_shuffle ;配置项yarn.nodemanager.aux-services.spark_shuffle.class添加org.apache.spark.network.yarn.YarnShuffleService。另外设置所有和spark.shuffle.service.* 相关的配置项。

5.重启所有nodemanager。

 

资源分配策略

         从上层看,spark应该在不需要的时候减少executor,在需要的时候动态增加executor。虽然没有确切的方式去预测即将被去掉的executor会马上被重新用到,或者即将被添加的executor马上会空闲,我们需要一些启发式算法来动态增减executor。

请求策略

         开启动态分配策略后,application会在task因没有足够资源被挂起的时候去动态申请资源,这种情况意味着该application现有的executor无法满足所有task并行运行。spark一轮一轮的申请资源,当有task挂起或等待spark.dynamicAllocation.schedulerBacklogTimeout时间的时候,会开始动态资源分配;之后会每隔spark.dynamicAllocation.sustainedSchedulerBacklogTimeout时间申请一次,直到申请到足够的资源。每次申请的资源量是指数增长的,即1,2,4,8等。

之所以采用指数增长,出于两方面考虑。其一,开始申请的少是考虑到可能application会马上得到满足;其次要成倍增加,是为了防止application需要很多资源,儿该方式可以在很少次数的申请之后得到满足。

 

删除executor策略

         很简单,当application的executor空闲时间超过spark.dynamicAllocation.executorIdleTimeout后,就会将其删除掉。

华丽的移除executor

         在动态资源分配之前,当application完成或者executor运行出错次数超过限定值时,executor就会安全退出,此时executor的所有状态将不再需要。在动态资源分配情况下,当executor被移除时,application还在执行。如果application要访问被移除executor的状态,则需要重新计算其状态。因此,spark需要一种机制确保移除executor之前保存executor的状态。

         这个需求对shuffle是很重要的。在shuffle过程中,spark executor首先保存它的map结果文件到本地磁盘上,然后以server的身份供其他executor来获取文件。对于某些executor执行比其他executor慢很多的情况,动态资源分配会移除空闲的executor,当后续任务需要被移除executor的结果文件时,就要重新计算结果文件了。

保存已移除executor的结果文件的方式就是使用独立的shuffle server。这个server是一个在所有nodemanager都会长期运行的进程。当开启独立shuffle server时,executor将会从该server直接读取文件而不是相互之间获取文件。这样executor产生的结果文件就会比executor具有更长的生命周期。

         另外,除了executor的shuffle文件,executor还会在磁盘或内存缓存数据。当executor被移除后,这些缓存的数据将不再可用。目前还没有解决方案。

 

 

application内调度

         在一个spark application内,由不同线程提交的各个job可以并行执行,job指的是action以及关联的task。

         默认情况下,spark调度器采用FIFO方式。每个job被分成多个stage,第一个job会得到所有可用的资源并计算,运行完成后第二个job开始运行。如果第一个job不需要所有资源,则第二个job可以马上执行;但是当第一个作业很大的时候,第二个作业则会有很大的延迟。

 

         从spark 0.8版本开始,spark开始支持公平调度job。这样,不同job的tasks以round-robin的方式调度,从而job之间以近似公平的方式被调度。这也意味着一个长时间运行的作业运行时提交一个短作业,则这个短作业也可以在合理的时间内完成。为了使用公平调度,需要设置spark.scheduler.mode=FAIR 。


val
 conf =newSparkConf().setMaster(…).setAppName(…)

conf.set(“spark.scheduler.mode”,“FAIR”)

val sc =newSparkContext(conf)

公平调度池

         公平调度支持把job归类到组,每个池里的job采用不同的调度策略。例如,可以为重要的jobs创建高优先级的池,或者把不同用户的job放到不同的组,然后给用户配置相同的资源量从而不至于某些用户的作业少而得到更少的资源。

         在无任何干预的情况下,job被提交到默认池。可以通过设置spark.scheduler.pool本地属性设置job的池。例如,sc.setLocalProperty(“spark.scheduler.pool”, “pool1”)。

         当设置了这个本地属性后,所有由这个线程提交的job都会进入这个池。清除一个池也很简单:sc.setLocalProperty(“spark.scheduler.pool”, null)。

 

池的默认行为

         默认情况下,各个池会用相同分量的集群资源,但池内jobs采用FIFO调度。例如,每个用户创建了一个池,则这些用户将有等量的集群资源运行自己的job,但是每个用户的作业将按顺序执行,而不会是后面的job在前面的job运行完之前运行。

 

池属性配置

         池的一些属性是可以修改的,每个池包含三个属性:

schedulingMode:FIFO或FAIR。

weight:这个用来控制此池相对于其他池对集群资源的使用量,默认值为1,即所有池平分集群资源。如果配置一个池的weight为2,则分配给该池的资源将是其他池的两倍。

minShare:除了全局的weight,也可以通过设置minShare来保证某个池的最少资源分配量。公平调度器会尽量满足所有池的minShare量,然后才会考虑按weight分配多余的资源。通过minShare可以保证一个池总是能得到一定数量的资源。该属性默认值为0.这个数值应该是core的数量。


推荐阅读
  • 本文介绍了Java工具类库Hutool,该工具包封装了对文件、流、加密解密、转码、正则、线程、XML等JDK方法的封装,并提供了各种Util工具类。同时,还介绍了Hutool的组件,包括动态代理、布隆过滤、缓存、定时任务等功能。该工具包可以简化Java代码,提高开发效率。 ... [详细]
  • Spring特性实现接口多类的动态调用详解
    本文详细介绍了如何使用Spring特性实现接口多类的动态调用。通过对Spring IoC容器的基础类BeanFactory和ApplicationContext的介绍,以及getBeansOfType方法的应用,解决了在实际工作中遇到的接口及多个实现类的问题。同时,文章还提到了SPI使用的不便之处,并介绍了借助ApplicationContext实现需求的方法。阅读本文,你将了解到Spring特性的实现原理和实际应用方式。 ... [详细]
  • ZSI.generate.Wsdl2PythonError: unsupported local simpleType restriction ... [详细]
  • 阿,里,云,物,联网,net,core,客户端,czgl,aliiotclient, ... [详细]
  • baresip android编译、运行教程1语音通话
    本文介绍了如何在安卓平台上编译和运行baresip android,包括下载相关的sdk和ndk,修改ndk路径和输出目录,以及创建一个c++的安卓工程并将目录考到cpp下。详细步骤可参考给出的链接和文档。 ... [详细]
  • JavaSE笔试题-接口、抽象类、多态等问题解答
    本文解答了JavaSE笔试题中关于接口、抽象类、多态等问题。包括Math类的取整数方法、接口是否可继承、抽象类是否可实现接口、抽象类是否可继承具体类、抽象类中是否可以有静态main方法等问题。同时介绍了面向对象的特征,以及Java中实现多态的机制。 ... [详细]
  • 本文讨论了一个关于cuowu类的问题,作者在使用cuowu类时遇到了错误提示和使用AdjustmentListener的问题。文章提供了16个解决方案,并给出了两个可能导致错误的原因。 ... [详细]
  • 本文探讨了C语言中指针的应用与价值,指针在C语言中具有灵活性和可变性,通过指针可以操作系统内存和控制外部I/O端口。文章介绍了指针变量和指针的指向变量的含义和用法,以及判断变量数据类型和指向变量或成员变量的类型的方法。还讨论了指针访问数组元素和下标法数组元素的等价关系,以及指针作为函数参数可以改变主调函数变量的值的特点。此外,文章还提到了指针在动态存储分配、链表创建和相关操作中的应用,以及类成员指针与外部变量的区分方法。通过本文的阐述,读者可以更好地理解和应用C语言中的指针。 ... [详细]
  • 个人学习使用:谨慎参考1Client类importcom.thoughtworks.gauge.Step;importcom.thoughtworks.gauge.T ... [详细]
  • HDFS2.x新特性
    一、集群间数据拷贝scp实现两个远程主机之间的文件复制scp-rhello.txtroothadoop103:useratguiguhello.txt推pushscp-rr ... [详细]
  • [大整数乘法] java代码实现
    本文介绍了使用java代码实现大整数乘法的过程,同时也涉及到大整数加法和大整数减法的计算方法。通过分治算法来提高计算效率,并对算法的时间复杂度进行了研究。详细代码实现请参考文章链接。 ... [详细]
  • Android源码深入理解JNI技术的概述和应用
    本文介绍了Android源码中的JNI技术,包括概述和应用。JNI是Java Native Interface的缩写,是一种技术,可以实现Java程序调用Native语言写的函数,以及Native程序调用Java层的函数。在Android平台上,JNI充当了连接Java世界和Native世界的桥梁。本文通过分析Android源码中的相关文件和位置,深入探讨了JNI技术在Android开发中的重要性和应用场景。 ... [详细]
  • 前景:当UI一个查询条件为多项选择,或录入多个条件的时候,比如查询所有名称里面包含以下动态条件,需要模糊查询里面每一项时比如是这样一个数组条件:newstring[]{兴业银行, ... [详细]
  • 本文介绍了如何使用C#制作Java+Mysql+Tomcat环境安装程序,实现一键式安装。通过将JDK、Mysql、Tomcat三者制作成一个安装包,解决了客户在安装软件时的复杂配置和繁琐问题,便于管理软件版本和系统集成。具体步骤包括配置JDK环境变量和安装Mysql服务,其中使用了MySQL Server 5.5社区版和my.ini文件。安装方法为通过命令行将目录转到mysql的bin目录下,执行mysqld --install MySQL5命令。 ... [详细]
  • Imtryingtofigureoutawaytogeneratetorrentfilesfromabucket,usingtheAWSSDKforGo.我正 ... [详细]
author-avatar
游你精彩_980_469
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有