本文主要介绍Spark Core的核心内容:RDD。包含以下章节和对应的内容
章节 | 内容 |
1 | RDD简介 |
2 | RDD分区 |
3 | RDD的依赖关系 |
4 | RDD的缓存机制和区别 |
5 | RDD创建的两种方式 |
6 | RDD算子和总结 |
7 | RDD 算子操作案例 |
1、RDD简介
RDD(Resilient Distributed Datasets,弹性分布式数据集),是Spark最为核心的概念,自然也是理解Apache Spark 工作原理的最佳入口之一。
RDD的特点:
- 是一个分区的只读记录的集合;
- 一个具有容错机制的特殊集;
- 只能通过在稳定的存储器或其他RDD上的确定性操作(转换)来创建;
- 可以分布在集群的节点上,以函数式操作集合的方式,进行各种并行操作
RDD之所以为“弹性”的特点
- 基于Lineage的高效容错(第n个节点出错,会从第n-1个节点恢复,血统容错);
- Task如果失败会自动进行特定次数的重试(默认4次);
- Stage如果失败会自动进行特定次数的重试(可以值运行计算失败的阶段),只计算失败的数据分片;
- 数据调度弹性:DAG TASK 和资源管理无关;
- checkpoint;
- 自动的进行内存和磁盘数据存储的切换
2、RDD的分区
首先,RDD是一个逻辑概念,分区是一个物理概念,并且带有下标
举例:通过SparkContext.parallelize创建一个RDD:
val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8),3) # 后面的数字3 表示3个分区
也就是说,RDD 内部的数据集合在逻辑上(以及物理上)被划分成多个小集合,这样的每一个小集合被称为分区。
那么引出两个问题:
- 如何查看分区运行在哪个Worker(机器)上?
- 如何查看每个分区中的数据?
针对问题1和2: 在源码级别,RDD 类内存储一个 Partition 列表。每个 Partition 对象都包含一个 index 成员,通过 RDD 编号 + index 就能从唯一确定分区的 Block 编号,持久化的 RDD 就能通过这个 Block 编号从存储介质中获得对应的分区数据。
3、RDD的依赖关系
RDD和它依赖的parent RDD(s)的关系有两种不同的类型,即窄依赖(narrow dependency)和宽依赖(wide dependency)。
- 窄依赖指的是每一个parent RDD的Partition最多被子RDD的一个Partition使用
- 宽依赖指的是多个子RDD的Partition会依赖同一个parent RDD的Partition
4、RDD的缓存机制和区别
RDD缓存机制有两种,cache和pesist,两种区别如下:
- cache和persist都是用于将一个RDD进行缓存的,这样在之后使用的过程中就不需要重新计算了,可以大大节省程序运行时间;
- cache只有一个默认的缓存级别MEMORY_ONLY ,cache调用了persist,而persist可以根据情况设置其它的缓存级别;3)
- executor执行的时候,默认60%做cache,40%做task操作,persist最根本的函数,最底层的函数
5、RDD创建的两种方式
1:通过SparkContext.parallelize创建
val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8),3) # 3个分区
2:通过读取外部的数据源创建:比如:HDFS、本地目录
val rdd1 = sc.textFile("hdfs://bigdata:9000/input/data.txt") # HDFS文件
val rdd2 = sc.textFile("/root/temp/data.txt") # 本地文件
6、RDD算子
RDD中的所有转换都是延迟加载的,也就是说,它们并不会直接计算结果。相反的,它们只是记住这些应用到基础数据集(例如一个文件)上的转换动作。只有当发生一个要求返回结果给Driver的动作时,这些转换才会真正运行。这种设计让Spark更加有效率地运行。
从大方向来说:RDD算子分为Transformation算子 和Action 算子,其中Transformation 操作是延迟计算的,也就是说从一个RDD 转换生成另一个 RDD 的转换操作不是马上执行,需要等到有 Action 操作的时候才会真正触发运算。Action 算子会触发 SparkContext 提交 Job 作业,并将数据输出 Spark系统
从小方向来说:RDD 算子大致可以分为以下三类:
- Value数据类型的Transformation算子,这种变换并不触发提交作业,针对处理的数据项是Value型的数据。
- Key-Value数据类型的Transfromation算子,这种变换并不触发提交 作业,针对处理的数据项是Key-Value型的数据对。
- Action算子,这类算子会触发SparkContext提交Job作业
更多详细算子介绍可以参考:Scala官方API
以下表格是关于Transformation算子-会延时加载(计算)
转换 | 含义 |
map(func) | 返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成 |
filter(func) | 返回一个新的RDD,该RDD由经过func函数计算后返回值为true的输入元素组成 |
flatMap(func) | 类似于map,但是每一个输入元素可以被映射为0或多个输出元素(所以func应该返回一个序列,而不是单一元素) |
mapPartitions(func) | 类似于map,但独立地在RDD的每一个分片上运行,因此在类型为T的RDD上运行时,func的函数类型必须是Iterator[T] => Iterator[U] |
mapPartitionsWithIndex(func) | 类似于mapPartitions,但func带有一个整数参数表示分片的索引值,因此在类型为T的RDD上运行时,func的函数类型必须是(Int, Interator[T]) => Iterator[U] |
sample(withReplacement, fraction, seed) | 根据fraction指定的比例对数据进行采样,可以选择是否使用随机数进行替换,seed用于指定随机数生成器种子 |
union(otherDataset) | 对源RDD和参数RDD求并集后返回一个新的RDD |
intersection(otherDataset) | 对源RDD和参数RDD求交集后返回一个新的RDD |
distinct([numTasks])) | 对源RDD进行去重后返回一个新的RDD |
groupByKey([numTasks]) | 在一个(K,V)的RDD上调用,返回一个(K, Iterator[V])的RDD |
reduceByKey(func, [numTasks]) | 在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,将相同key的值聚合到一起,与groupByKey类似,reduce任务的个数可以通过第二个可选的参数来设置 |
aggregateByKey(zeroValue)(seqOp,combOp,[numTasks]) | |
sortByKey([ascending], [numTasks]) | 在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD |
sortBy(func,[ascending], [numTasks]) | 与sortByKey类似,但是更灵活 |
join(otherDataset, [numTasks]) | 在类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的所有元素对在一起的(K,(V,W))的RDD |
cogroup(otherDataset, [numTasks]) | 在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable,Iterable))类型的RDD |
cartesian(otherDataset) | 笛卡尔积 |
pipe(command, [envVars]) | |
coalesce(numPartitions) | |
repartition(numPartitions) | |
repartitionAndSortWithinPartitions(partitioner) | |
以下表格是关于Action算子- 触发计算
动作 | 含义 |
reduce(func) | 通过func函数聚集RDD中的所有元素,这个功能必须是课交换且可并联的 |
collect() | 在驱动程序中,以数组的形式返回数据集的所有元素 |
count() | 返回RDD的元素个数 |
first() | 返回RDD的第一个元素(类似于take(1)) |
take(n) | 返回一个由数据集的前n个元素组成的数组 |
takeSample(withReplacement,num, [seed]) | 返回一个数组,该数组由从数据集中随机采样的num个元素组成,可以选择是否用随机数替换不足的部分,seed用于指定随机数生成器种子 |
takeOrdered(n, [ordering]) | |
saveAsTextFile(path) | 将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统,对于每个元素,Spark将会调用toString方法,将它装换为文件中的文本 |
saveAsSequenceFile(path) | 将数据集中的元素以Hadoop sequencefile的格式保存到指定的目录下,可以使HDFS或者其他Hadoop支持的文件系统。 |
saveAsObjectFile(path) | |
countByKey() | 针对(K,V)类型的RDD,返回一个(K,Int)的map,表示每一个key对应的元素个数。 |
foreach(func) | 在数据集的每一个元素上,运行函数func进行更新。 |
7、RDD 算子操作案例:
案例1:映射和过滤
创建一个RDD,也可以使用List
val rdd1 = sc.parallelize(List(5,6,7,8,1,2,3,100,30))每个元素乘以2,再排序
val rdd2 = rdd1.map(_*2).sortBy(x=>x,true) #降序过滤出大于20的元素
val rdd3 = rdd2.filter(_ > 20)输出结果
rdd3.collect
案例2:字符串操作和字符计数
val rdd4 = sc.parallelize(Array("a b c","d b c","a y c"))val rdd5 = rdd4.flatMap(_.split(" ")) # 切分字符并切平val rdd6 = rdd5.map((_,1)) # 逐一统计val rdd7 = rdd6.reduceByKey(_+_).collect # 计数输出结果
案例3:集合操作
集合运算、去重
val rdd6 = sc.parallelize(List(5,6,7,8,1,2,3,100,30))
val rdd7 = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10))并集:union(如果是SQL中的集合运算,对集合是有要求的)
val rdd8 = rdd6.union(rdd7)
去重
rdd8.distinct.collect交集: intersect(如果是SQL中的集合运算,对集合是有要求的)
val rdd9 = rdd6.intersection(rdd7)
案例4:key-value操作
val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 3), ("kitty", 2), ("shuke", 1)))
val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 3), ("shuke", 2), ("kitty", 5)))
val rdd3 = rdd1.union(rdd2)
//按key进行聚合
val rdd4 = rdd3.reduceByKey(_ + _)
rdd4.collect
//按value的降序排序
val rdd5 = rdd4.map(t => (t._2, t._1)).sortByKey(false).map(t => (t._2, t._1))
rdd5.collect提示:交换了key-value的位置,并且交换了两次