本文详细介绍了Spark中的弹性分布式数据集(RDD)及其常见的操作方法,包括union、intersection、cartesian、subtract、join、cogroup等转换操作,以及count、collect、reduce、take、foreach、first、saveAsTextFile等行动操作。
弹性分布式数据集(Resilient Distributed Dataset,简称 RDD)是 Spark 中的核心抽象,用于处理大规模数据集。RDD 是一个不可变的、分区的集合,可以在多个节点上并行处理。以下是几种常见的 RDD 操作示例:
union: 创建一个包含源 RDD 和参数所有元素的新 RDD。
Scala 示例:
val rdd1 = sc.parallelize(List('A', 'B'))
val rdd2 = sc.parallelize(List('B', 'C'))
rdd1.union(rdd2).collect()
结果:
Array[Char] = Array(A, B, B, C)
intersection: 创建一个只包含源 RDD 和参数公共元素的新 RDD。
Scala 示例:
rdd1.intersection(rdd2).collect()
结果:
Array[Char] = Array(B)
cartesian: 创建一个包含源 RDD 和参数所有元素的笛卡尔积的新 RDD。
Scala 示例:
rdd1.cartesian(rdd2).collect()
结果:
Array[(Char, Char)] = Array((A, B), (A, C), (B, B), (B, C))
subtract: 创建一个通过删除源 RDD 中与参数相同的数据元素的新 RDD。
Scala 示例:
rdd1.subtract(rdd2).collect()
结果:
Array[Char] = Array(A)
join: 当在 (K, V) 和 (K, W) 上调用时,此操作创建一个新的 RDD (K, (V, W))。
Scala 示例:
val persOnFruit= sc.parallelize(Seq(("Andy", "Apple"), ("Bob", "Banana"), ("Charlie", "Cherry"), ("Andy","Apricot")))
val persOnSE= sc.parallelize(Seq(("Andy", "Google"), ("Bob", "Bing"), ("Charlie", "Yahoo"), ("Bob","AltaVista")))
personFruit.join(personSE).collect()
结果:
Array[(String, (String, String))] = Array((Andy,(Apple,Google)), (Andy,(Apricot,Google)), (Charlie,(Cherry,Yahoo)), (Bob,(Banana,Bing)), (Bob,(Banana,AltaVista)))
cogroup: 将 (K, V) 转换为 (K, Iterable[V])。
Scala 示例:
personFruit.cogroup(personSE).collect()
结果:
Array[(String, (Iterable[String], Iterable[String]))] = Array((Andy, (ArrayBuffer(Apple, Apricot), ArrayBuffer(Google))), (Charlie, (ArrayBuffer(Cherry), ArrayBuffer(Yahoo))), (Bob, (ArrayBuffer(Banana), ArrayBuffer(Bing, AltaVista))))
count: 获取 RDD 中的数据元素数。
Scala 示例:
val rdd = sc.parallelize(List('A', 'B', 'C'))
rdd.count()
结果:
LOng= 3
collect: 将 RDD 中的所有数据元素作为数组返回。
Scala 示例:
rdd.collect()
结果:
Array[Char] = Array(A, B, C)
reduce: 使用指定函数聚合 RDD 中的数据元素,该函数需要两个参数并返回一个值。
Scala 示例:
val rdd = sc.parallelize(List(1, 2, 3, 4))
rdd.reduce(_ + _)
结果:
Int = 10
take: 获取 RDD 中前 n 个数据元素。
Scala 示例:
rdd.take(2)
结果:
Array[Int] = Array(1, 2)
foreach: 为 RDD 中的每个数据元素执行指定函数。通常用于更新累加器或与外部系统交互。
Scala 示例:
rdd.foreach(x => println(s"$x * 10 = ${x * 10}"))
结果:
1 * 10 = 10
4 * 10 = 40
3 * 10 = 30
2 * 10 = 20
first: 检索 RDD 中的第一个数据元素。
Scala 示例:
rdd.first()
结果:
Int = 1
saveAsTextFile: 将 RDD 的内容写入文本文件或一组文本文件到本地文件系统或 HDFS。
Scala 示例:
val hamlet = sc.textFile("/users/akuntamukkala/temp/gutenburg.txt")
hamlet.filter(_.contains("Shakespeare")).saveAsTextFile("/users/akuntamukkala/temp/filtered")
结果:
akuntamukkala@localhost ~/temp/filtered $ ls
_SUCCESS part-00000 part-00001
更多详细信息请参阅:Spark 官方文档