热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Spark:RDD简介及基础算子

本文主要介绍SparkCore的核心内容:RDD。包含以下章节和对应的内容章节内容1RDD简介2RDD分区3RDD的依赖关系4RDD的缓存机制和区别5RDD创建的两

本文主要介绍Spark Core的核心内容:RDD。包含以下章节和对应的内容

章节内容
1RDD简介
2RDD分区
3RDD的依赖关系
4RDD的缓存机制和区别
5RDD创建的两种方式
6RDD算子和总结
7RDD 算子操作案例

 


1、RDD简介

RDD(Resilient Distributed Datasets,弹性分布式数据集),是Spark最为核心的概念,自然也是理解Apache Spark 工作原理的最佳入口之一。

RDD的特点:

  1. 是一个分区的只读记录的集合;
  2. 一个具有容错机制的特殊集;
  3. 只能通过在稳定的存储器或其他RDD上的确定性操作(转换)来创建;
  4. 可以分布在集群的节点上,以函数式操作集合的方式,进行各种并行操作

RDD之所以为“弹性”的特点

  1. 基于Lineage的高效容错(第n个节点出错,会从第n-1个节点恢复,血统容错);
  2. Task如果失败会自动进行特定次数的重试(默认4次);
  3. Stage如果失败会自动进行特定次数的重试(可以值运行计算失败的阶段),只计算失败的数据分片;
  4. 数据调度弹性:DAG TASK 和资源管理无关;
  5. checkpoint;
  6. 自动的进行内存和磁盘数据存储的切换

2、RDD的分区

首先,RDD是一个逻辑概念,分区是一个物理概念,并且带有下标

举例:通过SparkContext.parallelize创建一个RDD: 

val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8),3)      # 后面的数字3 表示3个分区

也就是说,RDD 内部的数据集合在逻辑上(以及物理上)被划分成多个小集合,这样的每一个小集合被称为分区。

那么引出两个问题:

  1. 如何查看分区运行在哪个Worker(机器)上?
  2. 如何查看每个分区中的数据?

针对问题1和2: 在源码级别,RDD 类内存储一个 Partition 列表。每个 Partition 对象都包含一个 index 成员,通过 RDD 编号 + index 就能从唯一确定分区的 Block 编号,持久化的 RDD 就能通过这个 Block 编号从存储介质中获得对应的分区数据。

 

3、RDD的依赖关系

RDD和它依赖的parent RDD(s)的关系有两种不同的类型,即窄依赖(narrow dependency)和宽依赖(wide dependency)。

  1. 窄依赖指的是每一个parent RDD的Partition最多被子RDD的一个Partition使用
  2. 宽依赖指的是多个子RDD的Partition会依赖同一个parent RDD的Partition

 

4、RDD的缓存机制和区别

RDD缓存机制有两种,cache和pesist,两种区别如下:

  1. cache和persist都是用于将一个RDD进行缓存的,这样在之后使用的过程中就不需要重新计算了,可以大大节省程序运行时间;
  2.  cache只有一个默认的缓存级别MEMORY_ONLY ,cache调用了persist,而persist可以根据情况设置其它的缓存级别;3)
  3. executor执行的时候,默认60%做cache,40%做task操作,persist最根本的函数,最底层的函数

5、RDD创建的两种方式

  1:通过SparkContext.parallelize创建
val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8),3)    # 3个分区


  2:通过读取外部的数据源创建:比如:HDFS、本地目录

val rdd1 = sc.textFile("hdfs://bigdata:9000/input/data.txt")    # HDFS文件
val rdd2 = sc.textFile("/root/temp/data.txt")   # 本地文件

 

6、RDD算子

RDD中的所有转换都是延迟加载的,也就是说,它们并不会直接计算结果。相反的,它们只是记住这些应用到基础数据集(例如一个文件)上的转换动作。只有当发生一个要求返回结果给Driver的动作时,这些转换才会真正运行。这种设计让Spark更加有效率地运行。

从大方向来说:RDD算子分为Transformation算子 Action 算子,其中Transformation 操作是延迟计算的,也就是说从一个RDD 转换生成另一个 RDD 的转换操作不是马上执行,需要等到有 Action 操作的时候才会真正触发运算。Action 算子会触发 SparkContext 提交 Job 作业,并将数据输出 Spark系统

从小方向来说:RDD 算子大致可以分为以下三类:

  1. Value数据类型的Transformation算子,这种变换并不触发提交作业,针对处理的数据项是Value型的数据。
  2. Key-Value数据类型的Transfromation算子,这种变换并不触发提交 作业,针对处理的数据项是Key-Value型的数据对。
  3. Action算子,这类算子会触发SparkContext提交Job作业

更多详细算子介绍可以参考:Scala官方API

以下表格是关于Transformation算子-会延时加载(计算)

转换

含义

map(func)

返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成

filter(func)

返回一个新的RDD,该RDD由经过func函数计算后返回值为true的输入元素组成

flatMap(func)

类似于map,但是每一个输入元素可以被映射为0或多个输出元素(所以func应该返回一个序列,而不是单一元素)

mapPartitions(func)

类似于map,但独立地在RDD的每一个分片上运行,因此在类型为T的RDD上运行时,func的函数类型必须是Iterator[T] => Iterator[U]

mapPartitionsWithIndex(func)

类似于mapPartitions,但func带有一个整数参数表示分片的索引值,因此在类型为T的RDD上运行时,func的函数类型必须是(Int, Interator[T]) => Iterator[U]

sample(withReplacement, fraction, seed)

根据fraction指定的比例对数据进行采样,可以选择是否使用随机数进行替换,seed用于指定随机数生成器种子

union(otherDataset)

对源RDD和参数RDD求并集后返回一个新的RDD

intersection(otherDataset)

对源RDD和参数RDD求交集后返回一个新的RDD

distinct([numTasks]))

对源RDD进行去重后返回一个新的RDD

groupByKey([numTasks])   

在一个(K,V)的RDD上调用,返回一个(K, Iterator[V])的RDD

reduceByKey(func, [numTasks])

在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,将相同key的值聚合到一起,与groupByKey类似,reduce任务的个数可以通过第二个可选的参数来设置

aggregateByKey(zeroValue)(seqOp,combOp,[numTasks])

 

sortByKey([ascending], [numTasks])

在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD

sortBy(func,[ascending], [numTasks])

与sortByKey类似,但是更灵活

join(otherDataset, [numTasks])

在类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的所有元素对在一起的(K,(V,W))的RDD

cogroup(otherDataset, [numTasks])

在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable,Iterable))类型的RDD

cartesian(otherDataset)

笛卡尔积

pipe(command, [envVars])

 

coalesce(numPartitions)    

 

repartition(numPartitions)

 

repartitionAndSortWithinPartitions(partitioner)

 

 

以下表格是关于Action算子- 触发计算

动作

含义

reduce(func)

通过func函数聚集RDD中的所有元素,这个功能必须是课交换且可并联的

collect()

在驱动程序中,以数组的形式返回数据集的所有元素

count()

返回RDD的元素个数

first()

返回RDD的第一个元素(类似于take(1))

take(n)

返回一个由数据集的前n个元素组成的数组

takeSample(withReplacement,num, [seed])

返回一个数组,该数组由从数据集中随机采样的num个元素组成,可以选择是否用随机数替换不足的部分,seed用于指定随机数生成器种子

takeOrdered(n[ordering])

 

saveAsTextFile(path)

将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统,对于每个元素,Spark将会调用toString方法,将它装换为文件中的文本

saveAsSequenceFile(path

将数据集中的元素以Hadoop sequencefile的格式保存到指定的目录下,可以使HDFS或者其他Hadoop支持的文件系统。

saveAsObjectFile(path

 

countByKey()

针对(K,V)类型的RDD,返回一个(K,Int)的map,表示每一个key对应的元素个数。

foreach(func)

在数据集的每一个元素上,运行函数func进行更新。

 

7、RDD 算子操作案例:

案例1:映射和过滤

创建一个RDD,也可以使用List
val rdd1 = sc.parallelize(List(5,6,7,8,1,2,3,100,30))每个元素乘以2,再排序
val rdd2 = rdd1.map(_*2).sortBy(x=>x,true) #降序过滤出大于20的元素
val rdd3 = rdd2.filter(_ > 20)输出结果
rdd3.collect

 

案例2:字符串操作和字符计数

val rdd4 = sc.parallelize(Array("a b c","d b c","a y c"))val rdd5 = rdd4.flatMap(_.split(" ")) # 切分字符并切平val rdd6 = rdd5.map((_,1)) # 逐一统计val rdd7 = rdd6.reduceByKey(_+_).collect # 计数输出结果

 

 

案例3:集合操作

集合运算、去重
val rdd6 = sc.parallelize(List(5,6,7,8,1,2,3,100,30))
val rdd7 = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10))并集:union(如果是SQL中的集合运算,对集合是有要求的)
val rdd8 = rdd6.union(rdd7)
去重
rdd8.distinct.collect交集: intersect(如果是SQL中的集合运算,对集合是有要求的)
val rdd9 = rdd6.intersection(rdd7)

 

 

案例4:key-value操作

val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 3), ("kitty", 2), ("shuke", 1)))
val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 3), ("shuke", 2), ("kitty", 5)))
val rdd3 = rdd1.union(rdd2)
//按key进行聚合
val rdd4 = rdd3.reduceByKey(_ + _)
rdd4.collect
//按value的降序排序
val rdd5 = rdd4.map(t => (t._2, t._1)).sortByKey(false).map(t => (t._2, t._1))
rdd5.collect提示:交换了key-value的位置,并且交换了两次


推荐阅读
author-avatar
河南王修华
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有