RDD 是 Spark 的计算模型。RDD(Resilient Distributed Dataset)叫做弹性的分布式数据集合,是 Spark 中最基本的数据抽象,它代表一个不可变、只读的,被分区的数据集。操作 RDD 就像操作本地集合一样,有很多的方法可以调用,使用方便,而无需关心底层的调度细节。
2. RDD的创建
Spark Core为我们提供了三种创建RDD的方式,包括:
使用程序中的集合创建RDD
使用HDFS文件创建RDD
2.1 Spark初始化
spark程序需要做的第一件事情,就是创建一个SparkContext对象,它将告诉spark如何访问一个集群,而要创建一个SparkContext对象,你首先要创建一个SparkConf对象,该对象访问了你的应用程序的信息,比如下面的代码:
SparkConf cOnf=new SparkConf();
conf.set("参数", "参数值"); //因为jvm无法获得足够的资源
JavaSparkContext sc = new JavaSparkContext("local", "First Spark App",conf);
2.2 使用程序中的集合创建RDD
List data = Arrays.asList(1, 2, 3, 4, 5);
//并行集合,是通过对于驱动程序中的集合调用JavaSparkContext.parallelize来构建的RDD
JavaRDD distData = sc.parallelize(data);
2.3 使用HDFS文件创建RDD
//通过hdfs上的文件定义一个RDD 这个数据暂时还没有加载到内存,也没有在上面执行动作,lines仅仅指向这个文件
JavaRDD lines = sc.textFile("hdfs://master:9000/testFile/README.md");
3. RDD的两种算子
RDD支持两种类型的操作算子:Transformation与Action。
3.1 Transformation
Transformation操作会由一个RDD生成一个新的 RDD。Transformation操作是延迟计算的,也就是说从一个RDD转换生成另一个RDD的转换操作不是马上执行,需要等到Actions操作时,才真正开始运算。
3.2 Action
Action操作会对 RDD 计算出一个结果,并把结果返回到驱动器程序中,或把结果存储到外部存储系统(如 HDFS)中。
例如,first() 就是的一个行动操作,它会返回 RDD 的第一个元素。
result = testlines.first()
transformations操作和Action操作的区别在于Spark计算RDD 的方式不同。对于在任何时候transformations得到的新的RDD,Spark只会惰性计算。只有在一个Action操作中用到时,才会真正计算。这种策略也是spark性能高的部分原因。
比如,我们读取一个文本文件创建一个RDD,然后把其中包含spark的行筛选出来。如果Spark在我们运行lines = sc.textFile(test.txt) 时就把文件中所有的行都读取到内存中并存储起来,内存开销会很大,而我们接下来的操作会筛选掉其中的很多数据。相反, 如果Spark 在知道了完整的转化操作链之后,它就可以只计算求结果时真正需要的数据。
事实上,在执行行动操作 first()时,Spark也只是扫描文件直到找到第一个匹配的行为止,而不是读取整个文件。
3.3 RDD常用算子
Spark32个常用算子总结
Spark常用算子详解
4. RDD的持久化机制
blog.csdn.net/weixin_3560…
RDD还有一个叫持久化的机制,就是在不同操作间,持久化(或缓存)一个数据集在内存中。当你持久化一个RDD,每一个结点都将把它的计算分块结果保存在内存中,并在对此数据集(或者衍生出的数据集)进行的其它动作中重用。这将使得后续的动作(action)变得更加迅速。缓存是用Spark构建迭代算法的关键。RDD的缓存能够在第一次计算完成后,将计算结果保存到内存、本地文件系统或者Tachyon(分布式内存文件系统)中。通过缓存,Spark避免了RDD上的重复计算,能够极大地提升计算速度。在Spark应用程序的调优中就会考虑到RDD的持久化的机制。
4.1 RDD持久化机制
Spark非常重要的一个功能特性就是可以将RDD 持久化在内存中,当对RDD执行持久化操作时,每个节点都会将自己操作的RDD的partition持久化到内存中,并且在之后对该RDD的反复使用中,直接使用内存缓存的partition,这样的话,对于针对一个RDD反复执行多个操作的场景,就只要对RDD计算一次即可,后面直接使用该RDD ,而不需要计算多次该RDD
要持久化一个RDD,只要调用其cache()或者persist()方法即可。但是并不是这两个方法被调用时立即缓存,在该RDD第一次被计算出来时(触发后面的action时),该RDD将会被缓存在计算节点的内存中,并供后面重用。而且Spark的持久化机制还是自动容错的,如果持久化的RDD的任何partition丢失了,那么Spark会自动通过其源RDD,使用transformation操作重新计算该partition。
cache()和persist()的区别在于,cache()是persist()的一种简化方式,cache()的底层就是调用的persist()的无参版本,同时就是调用persist(MEMORY_ONLY),将数据持久化到内存中。如果需要从内存中去除缓存,那么可以使用unpersist()方法。
4.2 RDD的持久化的级别
顺便看一下RDD都有哪些缓存级别,查看 StorageLevel 类的源码:
object StorageLevel {
val NOnE= new StorageLevel(false, false, false, false)
val DISK_OnLY= new StorageLevel(true, false, false, false)
val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
val MEMORY_OnLY= new StorageLevel(false, true, false, true)
val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
val OFF_HEAP = new StorageLevel(false, false, true, false)
......
}
查看其构造参数
class StorageLevel private(
private var _useDisk: Boolean,
private var _useMemory: Boolean,
private var _useOffHeap: Boolean,
private var _deserialized: Boolean,
private var _replication: Int = 1)
extends Externalizable {
......
def useDisk: Boolean = _useDisk
def useMemory: Boolean = _useMemory
def useOffHeap: Boolean = _useOffHeap
def deserialized: Boolean = _deserialized
def replication: Int = _replication
......
}
可以看到StorageLevel类的主构造器包含了5个参数:
useDisk:使用硬盘(外存)
useMemory:使用内存
useOffHeap:使用堆外内存,这是 Java 虚拟机里面的概念,堆外内存意味着把内存对象分配在Java虚拟机的堆以外的内存,这些内存直接受操作系统管理(而不是虚拟机)。这样做的结果就是能保持一个较小的堆,以减少垃圾收集对应用的影响。
deserialized:反序列化,其逆过程序列化(Serialization)是java提供的一种机制,将对象表示成一连串的字节;而反序列化就表示将字节恢复为对象的过程。序列化是对象永久化的一种机制,可以将对象及其属性保存起来,并能在反序列化后直接恢复这个对象。序列化方式存储对象可以节省磁盘或内存的空间,一般 序列化:反序列化=1:3
replication:备份数(在多个节点上备份)
理解了这5个参数,StorageLevel 的12种缓存级别就不难理解了。例如:
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
就表示使用这种缓存级别的RDD将存储在硬盘以及内存中,使用序列化(在硬盘中),并且在多个节点上备份2份(正常的RDD只有一份)
注意:持久化的单位为Partition
注意:当使用RDD的MEMORY_ONLY进行持久化的时候,当内存空间不够的时候,不会报OOM,它会选择最小的partiton来持久化在内存,当重新的使用RDD时候,其他的partition会根据依赖关系重新计算
4.3 选择持久化级别
Spark的多个存储级别意味着在内存利用率和cpu利用效率间的不同权衡。我们推荐通过下面的过程选择一个合适的存储级别:
如果你的RDD适合默认的存储级别(MEMORY_ONLY),就选择默认的存储级别。因为这是cpu利用率最高的选项,会使RDD上的操作尽可能的快。
如果不适合用默认的级别,选择MEMORY_ONLY_SER。选择一个更快的序列化库提高对象的空间使用率,但是仍能够相当快的访问。
除非函数计算RDD的花费较大或者它们需要过滤大量的数据,不要将RDD存储到磁盘上,否则,重复计算一个分区就会和重磁盘上读取数据一样慢。
如果你希望更快的错误恢复,可以利用重复存储级别。所有的存储级别都可以通过重复计算丢失的数据来支持完整的容错,但是重复的数据能够使你在RDD上继续运行任务,而不需要重复计算丢失的数据。
在拥有大量内存的环境中或者多应用程序的环境中,OFF_HEAP具有如下优势:
它运行多个执行者共享Tachyon中相同的内存池
它显著地减少垃圾回收的花费
如果单个的执行者崩溃,缓存的数据不会丢失
4.4 checkPoint
当业务场景非常的复杂的时候,RDD的lineage(血统)依赖会非常的长,一旦血统较后面的RDD数据丢失的时候,Spark会根据血统依赖重新的计算丢失的RDD,这样会造成计算的时间过长,Spark提供了一个叫checkPoint的算子来解决这样的业务场景。
为当前RDD设置检查点。该函数将会创建一个二进制的文件,并存储到checkpoint目录中,该目录是用SparkContext.setCheckpointDir()设置的。在checkpoint的过程中,该RDD的所有依赖于父RDD中的信息将全部被移出。对RDD进行checkpoint操作并不会马上被执行,必须执行Action操作才能触发。
4.4.1 checkPoint优点
持久化在HDFS上,HDFS默认的3副本备份使得持久化的备份数据更加的安全
切断RDD的依赖关系:当业务场景复杂的时候,RDD的依赖关系非常的长的时候,当靠后的RDD数据丢失的时候,会经历较长的重新计算的过程,采用checkPoint会转为依赖checkPointRDD,可以避免长的lineage重新计算。
4.4.2 checkPoint的原理
当finalRDD执行Action类算子计算job任务的时候,Spark会从finalRDD从后往前回溯查看哪些RDD使用了checkPoint算子
将使用了checkPoint的算子标记
Spark会自动的启动一个job来重新计算标记了的RDD,并将计算的结果存入HDFS,然后切断RDD的依赖关系
建议在执行checkpoint()方法之前先对rdd进行persisted操作。 在checkPoint的RDD之前先cache RDD,那么Spark就不用启动一个job来计算checkPoint的RDD,而是将持久化到内存的数据直接拷贝到HDFS,进而提高Spark的计算速度,提高应用程序的性能
5. 共享变量
Spark共享变量(广播变量、累加器)
Spark两种共享变量:广播变量(broadcast variable)与累加器(accumulator)
累加器用来对信息进行聚合,而广播变量用来高效分发较大的对象。
5.1 广播变量
有时会变量是在driver端创建的,但是因为需要在excutor端使用,所以driver会把变量以task的形式发送到excutor端,如果有很多个task,就会有很多给excutor端携带很多个变量,如果这个变量非常大的时候,就可能会造成内存溢出(如下图所示)。这个时候就引出了广播变量。
使用广播变量后:
int factor = 3;
final Broadcast factorBroadcast = sc.broadcast(factor);
...
//在RDD计算中
int factor = factorBroadcast.value();
另外,为了确保所有的节点获得相同的变量,对象factorBroadcast广播后只读不能够被修改。
注意事项:
5.2 累加器
Spark提供的Accumulator,主要用于多个节点对一个变量进行共享性的操作。Accumulator只提供了累加的功能,但是却给我们提供了多个task对一个变量并行操作的功能。task只能对Accumulator进行累加操作,不能读取它的值。只有Driver程序可以读取Accumulator的值。
final Accumulator sum = sc.accumulator(0);
...
//RDD计算中
sum.add(1);
...
//Driver端
System.out.println(sum.value());
累加器只能由Spark内部进行更新,并保证每个任务在累加器的更新操作仅执行一次,也就是说重启任务也不应该更新。在转换操作中,用户必须意识到任务和作业的调度过程重新执行会造成累加器的多次更新。
累加器同样具有Spark懒加载的求值模型。如果它们在RDD的操作中进行更新,它们的值只在RDD进行行动操作时才进行更新。