摘要:
本文提出了分布式内存抽象的概念——弹性分布式数据集(RDD,Resilient Distributed Datasets)。它同意开发者在大型集群上运行基于内存的计算。RDD适用于两种应用,而现有的数据流系统对这两种应用的处理并不高效:一是迭代式算法,这在图应用和机器学习领域非经常见。二是交互式数据挖掘工具。这两种情况下。将数据保存在内存中可以极大地提高性能。为了有效地实现容错,RDD提供了一种高度受限的共享内存,即RDD在共享状态的时候是基于粗粒度的转换而不是细粒度的更新(换句话说就是,RDD仅仅能通过其它RDD上的批量操作来创建)。
虽然如此,RDD仍然足以用于非常多类型的计算。包含专用的迭代编程模型(如Pregel)等。我们已经将RDD应用到Spark系统之中,并对改性进行了相关的评估。
介绍:
像MapReduce和Dryad这种集群计算框架已经被广泛地应用到了大规模的数据分析中。这些系统提供了高级的运算符(算子),使得用户可以轻松地写出并行计算的程序而不用考虑任务分配问题以及错误容忍的细节。
虽然眼下的框架都提供使用集群计算资源的高级抽象,可是使用分布式内存却没有相应的类似的高级抽象。假设不能非常好地使用内存。那么,当一些应用的数据结果须要多次复用的情况出现的时候,运行效率就会受到影响。这些应用一般包含:
(1)迭代式机器学习和图应用中经常使用的迭代算法(每一步对数据运行相似的函数),比方pagerank、K-means聚合、逻辑回归等算法。
(2)交互式数据挖掘工具(用户重复查询一个数据子集)。
当前的框架採取的方法是将须要复用的数据存储到存储系统中(HDFS等),然后在每次查询时又一次载入,那么,在数据复制、磁盘I/O、序列化的过程中会产生较大的开销,而这些开销会支配整个任务运行的时间。
针对这个问题。我们提出了RDD,它能够为大部分的应用提供高效的数据复用。RDD是一个具有容错机制,并行数据架构,能够让用户将中间结果保存在内存中。并通过控制RDD的数据分区进而优化数据部署,还提供丰富的算子来操纵RDDs。
就错误容忍而言,如今的集群系统採取的方式通常是提供细粒度的更新操作:数据备份或者是通过log处理进行数据恢复。对于数据密集型的任务来说。由于要进行大量的数据拷贝,network的带宽压力非常大(network的带宽小于RAM(内存cache)的带宽),且文件系统负载也会加大。
相比而言,RDD提供了一种粗粒度的转换方式来解决容物容忍的问题。RDD通过Lineage(包括了怎样从其它RDD衍生所必需的相关信息),从而不须要检查点操作就能够重构丢失的数据分区。
我们通过微基准和用户应用程序来评估RDD。实验表明。在处理迭代式应用上Spark比Hadoop快高达20多倍,计算数据分析类报表的性能提高了40多倍,同一时候可以在5-7秒的延时内交互式扫描1TB数据集。此外。我们还在Spark之上实现了Pregel和HaLoop编程模型(包含其位置优化策略),以库的形式实现(分别使用了100和200行Scala代码)。
RDDs
RDD是仅仅读的、分区记录的集合。RDD仅仅能通过稳定物理存储中的数据集和其它已有的RDD上的确定性操作来创建。这些确定性操作称之为转换,如map、filter、groupBy、join等。
RDD始终不须要物化。RDD含有怎样从其它RDD衍生(即计算)出本RDD的相关信息(即Lineage信息)。据此能够从物理存储的数据计算出对应的RDD分区。
最后要说明的是,用户能够控制的RDDs的其它两个方面,持久化和划分:
(1)用户能够指定须要复用的RDDs并选择一个基于内存的存储策略。用户能够请求将RDD缓存,这样执行时将已经计算好的RDD分区存储起来,以加速后期的重用。相关策略:缓存的RDD默认存储在内存中,但假设内存不够,能够写到磁盘上;用户也能够使用其它的存储策略。比方只将RDD存储在磁盘等等;用户还能够在每一个RDD上面指定优先级来确定最先spill到磁盘的RDD。
(2)用户能够就RDD每条记录里面的key进行划分。这有助于部署的优化。比如,我们能够将
两个数据集用相同的方式进行 hash-partitioned以便于之后的join操作(事先经过key进行了划分)。
- Spark Programming Interface
在Spark中,RDD被表示为对象,通过这些对象上的方法(或函数)的调用在其上进行转换操作。
程序猿一開始通过在存储层上的数据“转换”(map、filter等)定义一个或者多个RDDs。即transformations算子操作过程
。之后。程序猿就能够在“动作”(actions)中使用RDD了。动作是向应用程序返回值。或向存储系统导出数据的那些操作,比如,count(返回RDD中的元素个数)。collect(返回元素本身),save(将RDD输出到存储系统)。
在Spark中,仅仅有在
RDD上第一次运行actions 操作时,才会真正開始计算RDD(即延迟计算)。
【补充】每一个运算(如flatMap、map)事实上返回的都是一个RDD对象。每一个RDD对象都有一个Parent,通过这个Parent,实际上我们把一个个RDD对象串联了起来!能够觉得最后形成了一个RDD对象的队列。直到最后须要计算时(调用了action算子,后调用runjob函数)才開始逐一调用各个RDD对象的compute方法。完毕实际的运算。
Example: Console Log Mining(控制台的日志挖掘)
本部分我们通过一个详细演示样例来阐述RDD。假定有一个大型站点出错,操作员想要检查Hadoop文件系统(HDFS)中的日志文件(TB级大小)来找出原因。通过使用Spark。操作员仅仅需将日志中的错误信息装载到一组节点的内存中,然后运行交互式查询就可以。
首先,须要在Spark解释器中输入例如以下Scala命令:
lines = spark.textFile("hdfs://...") errors = lines.filter(_.startsWith("ERROR"))
errors.cache()
第1行从HDFS文件定义了一个RDD(即一个文本行集合),第2行获得一个过滤后的RDD,第3行请求将errors缓存起来。注意在Scala语法中filter的參数是一个闭包。
errors.count()
用户还能够在RDD上运行很多其它的转换操作。并使用转换结果,如
// Count errors mentioning MySQL
errors.filter(_.contains("MySQL")).count()
// Return the time fields of errors mentioning
// HDFS as an array (assuming time is field
// number 3 in a tab-separated format):
errors.filter(_.contains("HDFS"))
.map(_.split('\t')(3))
.collect()
使用errors的第一个action执行以后。Spark会把errors的分区缓存在内存中(action之后才会将数据载入到内存)。极大地加快了兴许计算速度。注意,最初的RDD
lines由于错误信息可能仅仅占原数据集的非常小一部分(小到足以放入内存)。
最后。为了说明模型的容错性,下图给出了上述查询过程的的Lineage图。在RDD lines上运行filter操作,得到errors。然后再filter、map后得到新的RDD。在这个RDD上运行collect操作。Spark调度器以流水线的方式运行后两个转换,向拥有errors分区缓存的节点发送一组任务。此外。假设某个errors分区丢失,Spark仅仅在对应的lines分区上运行filter操作(“血统”信息记录操作过程)来重建该errors分区。
Discussion
- Expressing Existing Programming Models
RDD能够高效地实现眼下已经被提议独立的集群编程模型。所谓高效。就是RDDs不只能够实现跟这些模型一样的输出。还能够优化这些框架的性能。比方将特定的数据保存在内存。将RDD进行分区进而实现节点交互的优化。以及进行高效的数据恢复。这些能够使用RDDs进行实现的模型包含:
MapReduce
该模型能够使用Spark里面的flatMap和groupByKey算子来实现。或者是在有combiner的前提下用reduceByKey来实现。
DryadLINQ
该系统相对于MapReduce会提供更加广泛的算子,这些算子基本上都是Spark中的的总体计算转换算子,比方(map, groupByKey, join等
SQL
类似于DryadLINQ。SQL在记录集上运行数据并行查询的操作。
Pregel
Google的Pregel是专门用于图处理应用的模型,这个模型看起来跟其它面向数据集的程序模型非常不一样。在Pregel
(基于Bulk Synchronous
Parallel。总体同步并行计算模型)中。程序由一系列超步(Superstep)协调迭代执行在每一个超步中,各个顶点执行用户函数,并更新对应的顶点状态。改变图的拓扑结构。然后向下一个超步的顶点集发送消息。这样的模型可以描写叙述非常多图算法,包含最短路径,双边匹配和PageRank等。
使用RDDs来完毕上述模型的主要原因是Pregel对全部定点的每一次迭代中使用了相似的用户程序。因此,我们能够将一个RDD中每一次迭代的定点状态信息进行存储。然后用该用户函数运行总体转换(flatmap)并将相关信息存储在新生成的RDD里面。我们能够将这个新生成的RDD和定点的状态进行join操作来进行定点的信息交换更新操作。
另一点非常重要,RDDs能够同意我们像Pregel一样将定点状态信息存储在内存中,这样就能够通过控制RDD的分区来优化集群节点之间的交互了。也能够支持部分的错误回复。
Iterative MapReduce
近期的好几个系统,包含Hadoop和Twister都提供了交互式的MapReduce模型。在该模型中。用户能够给系统一系列的job来迭代运行。此系统能够通过迭代连续地进行数据划分,且Twister能够将数据保存在内存其中。
Batched Stream Processing
研究人员近期提议了好几个增量的处理系统,这些系统都能够周期性地更新数据。比如,一个每15分钟对广告点击统计进行更新的应用,该应用能够将之前15分钟的状态和最新的log数据进行合并从而得到最新的状态。这些系统运行和
Dryad类似的
总体操作。可是会在分布式文件系统中存储状态。将中间状态放入RDDs中能够加速处理过程。
Explaining the Expressivity of RDDs
为什么RDDs能够实现各种程序模型?由于附加在RDD上面的限制差点儿不会影响到大多数的并行应用。特别地,尽管RDDs只只能通过总体的转换操作来创建,可是非常多的并行程序自然地会对记录採用同样的操作(适合总体操作的模式)。
相似地,RDDs的不可变性也不是什么障碍。由于我们能够创建多个RDD来来代表同样数据集的不同版本号。
其实,眼下使用文件系统的非常多的MapReduce应用都不支持文件的更新操作。
- Leveraging RDDs for Debugging
当我们最開始设计RDDs进行广义地(“血统”)针对错误容忍的重计算的时候。这个属性也有助于debug。特别地,通过记录一个job期间创建的“血统”信息,我们之后能够又一次计算这些RDDs,也能够让用户交互式地查询这些信息。进而能够在一个单进程的debugger过程中通过又一次计算相应的RDD分区来又一次执行来自一个job的不论什么的task(有针对性地获取“血统”信息。并有针对性地对单个partition进行计算来针对性地重执行单个task)。不像传统分布式系统那样通过又一次执行app来进行bug调试的调试者。必须在较多的nodes间搞清楚事件的执行顺序。相对而言,RDDs的debug过程没有添加不论什么负载,由于仅仅有RDD的“血统”图须要被记录,我们眼下也正基于上述的观点开发一个Spark调试器。
Related Work
- Cluster Programming Models
眼下的集群模型分为好几类:
(1)数据流模型。像MapReduce、Dryad、Ciel那样拥有一系列处理数据的算子,可是是使用稳定的存储系统来进行数据的共享的。
相对于那些稳定的存储系统,RDDs代表一种更加高效的数据共享抽象,由于RDDs避免了数据复制、I/O以及序列化的开销。
(2)拥有高级程序接口的数据流系统。比方DryadLINQ、
FlumeJava等,这些系统会提供相关的API使得用户能够通过map和join算子进行并行的数据获取。
然而,在这些系统中,并行的数据获取对象能够是磁盘中的文件或者是用于查询过程的临时的数据集。且系统会通过同样的算子来进行流水线式的数据处理(比方一系列的map操作),可是同样的算子的处理过程中,数据共享的效率太低。
为了更加便利,我们在并行获取数据的过程中以正常的Spark API为基础。可是在口后背后会提供RDDs作为存储抽象。
(3)为特定的须要数据共享的应用提供高级接口的系统。比如,Pregel支持迭代的图应用。而
Twister和
HaLoop是处理迭代式的MapReduce的。
然而,这些系统会为他们支持的计算模块提供数据共享。不会为用户提供普遍的数据共享的抽象。比如。用户无法使用Pregel或者Twister来将一个数据集载入到内存。并决定哪个数据查询的操作能够在上面运行。
可是RDDs提供了一个分布式存储抽象而且能够支持哪些专门的系统无法支持的应用,比方交互式数据挖掘。
(4)暴露了可分享的可变的状态的一些系统。比如。Piccolo让用户能够执行一些并行函数来读取或者更新一个分布式hash表的元素。DSM(Distributed
shared memory )以及key-value存储(比方RAMCloud提供了相似的模型)也是类似的。
RDDs与上述系统有两个方面的不同。第一。RDD基于算子map、sort和join等提供了高级编程接口,然而,Piccolo和DSM只只能读取以及更新hash表中的元素。第二,Piccolo和DSM系统通过检查点以及回滚操作来运行数据恢复,其开销要比RDDs的基于“血统”的错误容忍策略大非常多。
Caching Systems
Nectar可以通过识别带有程序分析的子表达式,跨DryadLINQ作业重用中间结果。这样的能力将会增加到基于RDD的系统。
可是Nectar并没有提供In-Memory缓存(使用一般的分布式文件系统)。也不可以让用户显式地控制应该缓存那个数据集,以及怎样对其进行分区。Ciel和FlumeJava相同可以记住任务结果,但不能提供In-Memory缓存并显式控制它的缓存方式。
Ananthanarayanan系统通过利用数据存取的暂时性和空间本地性。进而添加了一个基于内存缓存的分布式文件系统。这样的方法提供了针对文件系统更快的数据存取速度,可是和共享中间结果的RDDs比起来效率还是不行,由于该系统还是须要应用将stage操作之间的结果写入文件系统进而共享数据。
Lineage
在科学计算和数据库领域。表示数据的Lineage和原始信息一直以来都是一个研究课题。对于一些应用。如须要解释结果以及同意被又一次生成、工作流中发现了bug或者数据集丢失须要又一次计算数据。RDD提供了一个并行的编程模型。在这个模型中使用的“血统”机制来表示是很easy的,因此它能够被用于容错。
基于“血统”的恢复机制事实上与MapReduce以及Dryad的恢复机制是类似的,也会追踪tasks构成的DAG图的依赖关系。然而在这些系统中,“血统”信息会在一个job结束之后丢失,最后还是须要採用在存储系统进行复制备份的方法来共享计算的数据。相反,RDDs用"血统"机制高效地将
数据缓存至内存而不须要复制和I/O的开销。
Relational Databases
从概念上看,RDD类似于数据库中的视图,RDD的持久化过程类似于视图的物化。然而,像DSM系统一样,数据库同意典型地读写全部记录,通过操作日志和数据的日志来实现容错。还须要花费额外的开销来维护一致性。RDD编程模型粗粒度的转换模型能够避免这些开销。
Conclusion
我们提出的RDD是一个高效的,通用的,具有错误容忍的应用于共享数据的集群应用的抽象。RDD广泛支持并行应用,还包含已经提出来的非常多专门的程序模型处理的迭代计算以及那些专门的程序模型不支持的新应用。不像已经存在的通过数据复制进行容错处理的集群存储抽象。RDD基于粗粒度的转换操作提供了一些列的API,这些API可以基于“血统”机制进行高效率的数据恢复。
使用RDD的Spark系统实现处理迭代式作业的速度超过Hadoop大约20倍,并且还可以交互式查询数百GB的数据。
[Berkeley]弹性分布式数据集RDD的介绍(RDD: A Fault-Tolerant Abstraction for In-Memory Cluster Computing 论文翻译)