RDD是Spark最基本也是最根本的数据抽象,它具备像MapReduce等数据流模型的容错性,并且允许开发人员在大型集群上执行基于内存的计算。现有的数据流系统对两种应用的处理并不高效:一是迭代式算法;二是交互式数据挖掘工具。这两种情况下,将数据保存在内存中能够极大地提高性能。为了有效地实现容错,RDD提供了一种高度受限的共享内存,即RDD是只读的,并且只能通过其他RDD上的批量操作来创建。
一般来说,分布式数据集的容错性有两种方式:数据检查点和记录数据的更新。我们面向的是大规模数据分析,数据检查点操作成本很高:需要通过数据中心的网络连接在机器之间复制庞大的数据集,而网络带宽往往比内存带宽低得多,同时还需要消耗更多的存储资源(在内存中复制数据可以减少需要缓存的数据量,而存储到磁盘则会降低应用程序速度)。所以,我们选择记录更新的方式。但是,如果更新太多,记录更新成本也不低。因此,RDD只支持粗粒度转换,即在大量记录上执行的单个操作。将创建RDD的一系列转换记录下来(即Lineage),以便恢复丢失的分区。
理解RDD的容错性
系谱图+检查点。
RDD可以通过系谱图机制来进行RDD的恢复。在RDD进行转换和动作的时候,会形成RDD的Lineage依赖链,当某一个RDD失效的时候,可以通过重新计算上游的RDD来生成丢失的RDD数据。
但是在spark计算里面,假如计算流程DAG特别长,服务器需要将整个DAG计算完成得出结果。如果在这很长的计算流程中突然中间算出的数据丢失了,spark又会根据RDD的依赖关系从头到尾计算一遍,这样子就很费性能,当然我们可以将中间的计算结果通过cache或者persist放到内存或者磁盘中,但是这样也不能保证数据完全不会丢失,存储的这个内存出问题了或者磁盘坏了,也会导致spark从头再根据RDD计算一遍,所以就有了checkpoint,其中checkpoint的作用就是将DAG中比较重要的中间数据做一个检查点将结果存储到一个高可用的地方(通常这个地方就是HDFS里面)。
可以通过两种方式创建RDD:1)由一个已经存在的Scala集合创建。2)由外部存储系统的数据集创建,包括本地的文件系统,还有所有Hadoop支持的数据集,比如HDFS、Cassandra、HBase、Amazon S3等。
5,RDD的缓存能够在第一次计算完成后,将计算结果保存到内存、本地文件系统或者Tachyon中。通过缓存,Spark避免了RDD上的重复计算,能够极大地提升计算速度。但是,如果缓存丢失了,则需要重新计算。为了避免缓存丢失重新计算带来的开销,Spark又引入了检查点(checkpoint)机制。
RDD检查点(Checkpoint)是Spark Core计算过程中的容错机制。通过将RDD的数据与状态持久化,一旦计算过程出错,就可以从之前的状态直接恢复现场,而不必从头重算,大大提高了效率与可靠性。
缓存是在计算结束后,直接将计算结果通过用户定义的存储级别(存储级别定义了缓存存储的介质,现在支持内存、本地文件系统和Tachyon)写入不同的介质。而检查点不同,它是在计算完成后,重新建立一个Job来计算。为了避免重复计算,推荐先将RDD缓存,这样就能保证检查点的操作可以快速完成。
首先,根据依赖关系的不同将DAG划分为不同的阶段(Stage)。对于窄依赖,由于Partition依赖关系的确定性,Partition的转换处理就可以在同一个线程里完成,窄依赖被Spark划分到同一个执行阶段;对于宽依赖,由于Shuffle的存在,只能在parent RDD(s)Shuffle处理完成后,才能开始接下来的计算,因此宽依赖就是Spark划分Stage的依据,即Spark根据宽依赖将DAG划分为不同的Stage。在一个Stage内部,每个Partition都会被分配一个计算任务(Task),这些Task是可以并行执行的。Stage之间根据依赖关系变成了一个大粒度的DAG,这个DAG的执行顺序也是从前向后的。也就是说,Stage只有在它没有parent Stage或者parent Stage都已经执行完成后,才可以执行。
7,reduceByKey时会触发一个Shuffle(洗牌)的过程。在Shuffle开始之前,有一个本地聚合的过程,比如第三个分片的(e,1)(e,1)聚合成了(e,2)。Shuffle的结果是为下游的Task生成了三个分片,这三个分片就构成了ShuffledRDD。之后在做了一个聚合之后,就生成了结果的RDD。
1)org.apache.spark.scheduler.ShuffleMapTask
2)org.apache.spark.scheduler.ResultTask
简单来说,DAG的最后一个阶段会为每个结果的Partition生成一个ResultTask,其余所有的阶段都会生成ShuffleMapTask。生成的Task会被发送到已经启动的Executor上,由Executor来完成计算任务的执行,执行过程的实现在org.apache.spark.executor.Executor.TaskRunner#run。
9,Scheduler(任务调度)模块作为Spark Core的核心模块之一,主要包含两大部分,即DAGScheduler和TaskScheduler。
DAGScheduler主要负责分析用户提交的应用,并根据计算任务的依赖关系建立DAG,然后将DAG划分为不同的Stage(阶段),其中每个Stage由可以并发执行的一组Task构成,这些Task的执行逻辑完全相同,只是作用于不同的数据。而且DAG在不同的资源管理框架(即部署方式,包括Standalone、Mesos、YARN、Local、EC2等)下的实现是相同的。
在DAGScheduler将这组Task划分完成后,会将这组Task提交到TaskScheduler。
TaskScheduler通 过Cluster Manager在 集 群 中 的 某 个Worker的Executor上启 动任务。在Executor中运行的任务,如果缓存中没有计算结果,那么就需要开始计算,同时,计算的结果会回传到Driver或者保存在本地。在不同的资源管理框架下, TaskScheduler的实现方式是有差别的,但是最重要的实现是org.apache.spark.scheduler. TaskSchedulerImpl。对于Local、Standalone和Mesos来说,它们的TaskScheduler就是TaskSchedulerImpl;对于YARNCluster和YARN Client的TaskScheduler的实现也是继承自TaskSchedulerImpl。
DAGScheduler SchedulerBackend TaskScheduler
org.apache.spark.scheduler.SchedulerBackend是一个trait,作用是分配当前可用的资源,具体就是向当前等待分配计算资源的Task分配计算资源(即Executor),并且在分配的Executor上启动Task,完成计算的调度过程。它使用reviveOffers完成上述的任务调度。reviveOffers可以说是它最重要的实现。
org.apache.spark.scheduler.TaskScheduler也是一个trait,它的作用是为创建它的SparkContext调度任务,即从DAGScheduler接收不同Stage的任务,并且向集群提交这些任务,并为执行特别慢的任务启动备份任务。TaskScheduler是以后实现多种任务调度器的基础。org.apache.spark.scheduler.TaskSchedulerImpl是唯一实现。TaskSchedulerImpl会在以下几种场景下调用org.apache.spark.scheduler.Scheduler-Backend#reviveOffers:
1)有新任务提交时。2)有任务执行失败时。3)计算节点(即Executor)不可用时。4)某些任务执行过慢而需要为它重新分配资源时。
每个SchedulerBackend都会对应一个的唯一TaskScheduler,而它们都被Spark-Context创建和持有。
实际上是调用org.apache.spark.SparkContext#runJob来进行提交Job的:SparkContext实现了很多runJob,这些函数的区别就是调用参数的不同,不过这些runJob最后都会调用DAGScheduler的runJob:
12,TaskScheduler和DAGScheduler都是在SparkContext创建的时候创建的。其中,Taskcheduler是通过org.apache.spark.SparkContext#createTaskScheduler创建的。而DAGScheduler是直接调用的它的构造函数创建。只不过,DAGScheduler保存了TaskScheduler的引用,因此需要在TaskScheduler创建之后进行创建。
13,每 个TaskScheduler都 对 应 一 个SchedulerBackend。其 中,TaskScheduler负责Application的不同Job之间的调度,在Task执行失败时启动重试机制,并且为执行速度慢的Task启动备份的任务。而SchedulerBackend负责与Cluster Manager交互,取得该Application分配到的资源,并且将这些资源传给TaskScheduler,由TaskScheduler为Task最终分配计算资源。
容错(fault tolerance)指的是在一个系统的部分模块出现错误的情况下还能持续地对外提供服务;如果出现了服务质量的下降,这个下降也是和出错的严重性成正比的。对于没有容错的系统,即使一个微小的错误也可能会导致整个服务停止。
提到容错,不得不提一下容灾(或者称为灾难恢复,disaster recovery)。容灾技术是通过在异地建立和维护一个备份系统,利用地理上的分散性来保证数据对于灾难性事件的抵抗能力。容灾系统在实现中可以分为两个层次:数据容灾和应用容灾。数据容灾指建立一个异地的数据系统,作为本地关键应用数据的一个备份。应用容灾是在数据容灾的基础上,在异地建立一套完整的和本地生产系统相同的备份应用系统(可以是互为备份)。在灾难情况下,由远程系统迅速接管业务运行。容错和容灾都是为了实现系统的高可用性,容错是在系统的部分模块出现问题时的错误恢复机制;容灾则是在整个系统的层面,通过使用数据和应用的镜像,来实现服务的高可用性的。
接下来将从Master、Worker和Executor的异常退出出发,讨论Spark是如何处理的。
Master容错
Master出错,Spark 有四种恢复策略:ZOOKEEPER,FILESYSTEM,CUSTOM,NONE,可以在配置文件 Spark-env.sh 配置项 spark.deploy.recoveryMode 进行设置,默认为 NONE。
Standalone集群模式下的Master容错是通过ZooKeeper来完成的,即有多个Master,一个角色是Active,其他的角色是Standby。当处于Active的Master异常时,需要重新选择新的Master,通过ZooKeeper的ElectLeader功能实现,可以参考Paxos。
要使用ZooKeeper模式,需要在conf/spark-env.sh中为SPARK_DAEMON_JAVA_OPTS添加一些选项
Master还支持一种更简单的单点模式下的错误恢复,即当Master进程异常时,重启Master进程并从错误中恢复。具体方法是设置spark.deploy.recoveryMode属性的值为FILESYSTEM,并为spark.deploy.recoveryDirectory属性设置一个本地目录,用于存储必要的信息来进行错误恢复
Executor容错
Worker容错
Client,它的任务其实是由AppClient和DriverClient共同完成的。AppClient是一个允许app(Client)和Spark集群通信的中间人,接受master URL、app的信息、一个集群事件的监听器以及事件监听的回调函数,主要和Master交互App相关的信息,DriverClient主要用于和Master交互Driver相关的信息,比如启动、停止及运行状况等
之所以需要Shuffle,还是因为具有某种共同特征的一类数据需要最终汇聚(aggregate)到一个计算节点上进行计算。这些数据分布在各个存储节点上并且由不同节点的计算单元处理。
在Spark 1.0以前,Spark只支持Hash Based Shuffle。因为在很多运算场景中并不需要排序,因此多余的排序只能使性能变差,比如Hadoop的Map Reduce就是这么实现的,也就是Reducer拿到的数据都是已经排好序的。实际上Spark的实现很简单:每个Shuffle Map Task根据key的哈希值,计算出每个key需要写入的Partition然后将数据单独写入一个文件,这个Partition实际上就对应了下游的一个Shuffle Map Task或者Result Task。因此下游的Task在计算时会通过网络(如果该Task与上游的Shuffle Map Task运行在同一个节点上,那么此时就是一个本地的硬盘读写)读取这个文件并进行计算。
缺点:
1)每个节点可能会同时打开多个文件,每次打开文件都会占用一定内存。假设每个Write Handler的默认需要100KB的内存,那么同时打开这些文件需要50GB的内存,对于一个集群来说,还是有一定的压力的。尤其是如果Shuffle Map Task和下游的Task同时增大10倍,那么整体的内存就增长到5TB。
2)从整体的角度来看,打开多个文件对于系统来说意味着随机读,尤其是每个文件比较小但是数量非常多的情况。而现在机械硬盘在随机读方面的性能特别差,非常容易成为性能的瓶颈。如果集群依赖的是固态硬盘,也许情况会改善很多,但是随机写的性能肯定不如顺序写的。
为了解决Shuffle过程产生文件过多的问题,在Spark 0.8.1中加入了ShuffleConsolidate Files机制。它的主要目标是减少Shuffle过程产生的文件。若使用这个功能,则需要将spark.shuffle.consolidateFiles设置为true。在Spark 1.2.0中,这个功能还是没有成为默认选项。官方给出的说法是这个功能还欠稳定。接下来介绍这个机制的实现原理。对于运行在同一个Core的Shuffle Map Task,第一个Shuffle Map Task会创建一个文件;之后的就会将数据追加到这个文件上而不是新建一个文件。因此文件数量就从number(shuffle_map_task)*number(following_task)变 成 了number(cores)*number (following_task)。当然,如果每个Core都只运行一个Shuffle Map Task,那么就和原来的机制一样了。但是Shuffle Map Task 明显多于Core数量或者说每个Core都会运行多个Shuffle Map Task,所以这个实现能够显著减少文件的数量。相当于每个core为每个reduce task创建一个文件。
缺点:
Shuffle Consolidate的机制虽然在某些场景下缓解了Shuffle过程产生文件过多的问题,但是还没有彻底解决7.1.2节中提出的两个问题,即同时打开多个文件造成Writer Handler内存使用过大的问题和产生多个文件造成的随机读从而影响性能的问题。
Spark采用了一个很聪明的做法,即建立另外一套Shuffle机制,让用户自己选择。Spark 1.0建立了Shuffle Pluggable的框架,通过实现一个接口来很容易地实现第三方的external Shuffle Service。而Spark 1.1实现了Sort Based Shuffle;而Spark1.2里,Sort Based Shuffle取代Hash Based Shuffle成为Shuffle的默认选项。
为了更好地解决这个问题,Spark 1.1 引入了Sort BasedShuffle。首先,每个Shuffle Map Task不会为每个Reducer生成一个单独的文件;相反,它会将所有的结果写到一个文件里,同时会生成一个Index文件,Reducer可以通过这个Index文件取得它需要处理的数据。避免产生大量文件的直接收益就是节省了内存的使用和顺序Disk IO带来的低延时。节省内存的使用可以减少GC的风险和频率。而减少文件的数量可以避免同时写多个文件给系统带来的压力。
Shuffle Map Task会按照key相对应的Partition ID进行Sort,其中属于同一个Partition的key不会Sort。因为对于不需要Sort的操作来说,这个Sort是负收益的;要知道之前Spark刚开始使用Hash Based的Shuffle而不是Sort Based就是为了避免Hadoop Map Reduce对于所有计算都会Sort的性能损耗。对于那些需要Sort的运算,比如sortByKey,这个Sort在Spark 1.2.0里还是由Reducer完成的。
17 groupbykey、reducebykey以及aggregateByKey
groupbykey是全局聚合算子,将所有map task中的数据都拉取到shuffle中将key相同的数据进行聚合,它存在很多弊端,例如:将大量的数据进行网络传输,浪费大量的资源,最重要的是如果数据量太大还会出现GC和OutOfMemoryError的错误,如果数据某个key的数据量远大于其他key的数据,在进行全局聚合的时候还会出现数据倾斜的问题。
所以在实际的项目中要谨慎使用groupbykey,随着数据量的增加,groupbykey暴露出来的问题会越来越多。
reducebykey是在map阶段进行本地聚合以后才会到shuffle中进行全局聚合,相当于是进入shuffle之前已经做了一部分聚合,那么它的网络传输速度会比groupbykey快很多而且占用资源也会减少很多,但是算子本身就如它的名字一样,主要是进行计算的将相同key的数据进行计算,返回计算结果。
但是实际的项目中在进行聚合之后我们不一定只是要计算,还会找聚合后某个字段的最大值,最小值等等操作,groupbykey聚合后返回的是(K,Iterable[V]),我们可以把iterable[V]这个集合的数据进行二次处理来实现我们实际的项目需求,但是上面已经提到了groupbykey的诸多问题,reducebykey也是只有在单纯的对数据进行计算的时候才能和groupbykey有等价效果。既想像reducebykey那样进行本地聚合,又想像groupbykey那样返回一个集合便于我们操作。aggregatebykey和reducebykey一样首先在本地聚合,然后再在全局聚合。它的返回值也是由我们自己设定的。
18 Spark内存溢出
(1)driver端的内存溢出
可以增大driver的内存参数:spark.driver.memory (default 1g)
这个参数用来设置Driver的内存。在Spark程序中,SparkContext,DAGScheduler都是运行在Driver端的。对应rdd的Stage切分也是在Driver端运行,如果用户自己写的程序有过多的步骤,切分出过多的Stage,这部分信息消耗的是Driver的内存,这个时候就需要调大Driver的内存。
(2)map过程产生大量对象导致内存溢出
这种溢出的原因是在单个map中产生了大量的对象导致的,例如:rdd.map(x=>for(i <- 1 to 10000) yield i.toString),这个操作在rdd中,每个对象都产生了10000个对象,这肯定很容易产生内存溢出的问题。针对这种问题,在不增加内存的情况下,可以通过减少每个Task的大小,以便达到每个Task即使产生大量的对象Executor的内存也能够装得下。具体做法可以在会产生大量对象的map操作之前调用repartition方法,分区成更小的块传入map。例如:rdd.repartition(10000).map(x=>for(i <- 1 to 10000) yield i.toString)。面对这种问题注意,不能使用rdd.coalesce方法,这个方法只能减少分区,不能增加分区,不会有shuffle的过程。
(3)数据不平衡导致内存溢出
数据不平衡除了有可能导致内存溢出外,也有可能导致性能的问题,解决方法和上面说的类似,就是调用repartition重新分区。这里就不再累赘了。
(4)shuffle后内存溢出
shuffle内存溢出的情况可以说都是shuffle后,单个文件过大导致的。在Spark中,join,reduceByKey这一类型的过程,都会有shuffle的过程,在shuffle的使用,需要传入一个partitioner,大部分Spark中的shuffle操作,默认的partitioner都是HashPatitioner,默认值是父RDD中最大的分区数,这个参数通过spark.default.parallelism控制(在spark-sql中用spark.sql.shuffle.partitions) , spark.default.parallelism参数只对HashPartitioner有效,所以如果是别的Partitioner或者自己实现的Partitioner就不能使用spark.default.parallelism这个参数来控制shuffle的并发量了。如果是别的partitioner导致的shuffle内存溢出,就需要从partitioner的代码增加partitions的数量。
(5)standalone模式下资源分配不均匀导致内存溢出
在standalone的模式下如果配置了–total-executor-cores 和 --executor-memory 这两个参数,但是没有配置–executor-cores这个参数的话,就有可能导致,每个Executor的memory是一样的,但是cores的数量不同,那么在cores数量多的Executor中,由于能够同时执行多个Task,就容易导致内存溢出的情况。这种情况的解决方法就是同时配置–executor-cores或者spark.executor.cores参数,确保Executor资源分配均匀。
(6)使用rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)代替rdd.cache()
rdd.cache()和rdd.persist(Storage.MEMORY_ONLY)是等价的,在内存不足的时候rdd.cache()的数据会丢失,再次使用的时候会重算,而rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)在内存不足的时候会存储在磁盘,避免重算,只是消耗点IO时间。