热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Spark核心编程RDD

一.RDD概述1.1.什么是RDD​RDD

一.RDD概述

1.1.什么是RDD

​ RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是 Spark 中最基本的数据处理模型。代码中是一个抽象类,它代表一个弹性的、不可变、可分区、里面的元素可并行计算的集合。

➢ 弹性
⚫ 存储的弹性:内存与磁盘的自动切换;
⚫ 容错的弹性:数据丢失可以自动恢复;
⚫ 计算的弹性:计算出错重试机制;
⚫ 分片的弹性:可根据需要重新分片。
➢ 分布式:数据存储在大数据集群不同节点上
➢ 数据集:RDD 封装了计算逻辑,并不保存数据
➢ 数据抽象:RDD 是一个抽象类,需要子类具体实现
➢ 不可变:RDD 封装了计算逻辑,是不可以改变的,想要改变,只能产生新的RDD,
在新的RDD 里面封装计算逻辑
➢ 可分区、并行计算

1.1.1.什么是分布式计算

模拟分布式计算

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-dhwSUDS7-1648901089136)(Spark%E8%BF%90%E8%A1%8C%E5%8E%9F%E7%90%86.assets/image-20220401085316318.png)]

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-2eL7ZNAt-1648901089138)(Spark%E8%BF%90%E8%A1%8C%E5%8E%9F%E7%90%86.assets/image-20220401085830738.png)]

1.1.2 RDD的实现原理

以java IO实现方式(装饰者模式)来理解rdd的实现原理

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-nciPSky5-1648901089140)(Spark%E8%BF%90%E8%A1%8C%E5%8E%9F%E7%90%86.assets/image-20220401090433484.png)]

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-cUm1KN5j-1648901089141)(Spark%E8%BF%90%E8%A1%8C%E5%8E%9F%E7%90%86.assets/image-20220401090321692.png)]

rdd数据处理方式类似于IO,使用了装饰者设计模式

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-BgGLnB1r-1648901089142)(Spark%E8%BF%90%E8%A1%8C%E5%8E%9F%E7%90%86.assets/image-20220401090920474.png)]

1.2.RDD的核心属性

2.1分区列表

RDD 数据结构中存在分区列表,用于执行任务时并行计算,是实现分布式计算的重要属性

2.2分区计算函数

Spark 在计算时,是使用分区函数对每一个分区进行计算

2.3RDD之间的依赖关系

RDD 是计算模型的封装,当需求中需要将多个计算模型进行组合时,就需要将多个 RDD 建立依赖关系

2.4分区器(可选)

当数据为 KV 类型数据时,可以通过设定分区器自定义数据的分区

2.5首选位置(可选)

计算数据时,可以根据计算节点的状态选择不同的节点位置进行计算

1.3.执行原理

​ 从计算的角度来讲,数据处理过程中需要计算资源(内存 & CPU)和计算模型(逻辑)。执行时,需要将计算资源和计算模型进行协调和整合。

​ Spark 框架在执行时,先申请资源,然后将应用程序的数据处理逻辑分解成一个一个的 计算任务。然后将任务发到已经分配资源的计算节点上, 按照指定的计算模型进行数据计算。最后得到计算结果。

​ RDD 是 Spark 框架中用于数据处理的核心模型,以yarn集群来解释rdd的工作原理

  1. 启动 Yarn 集群环境

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-VhUzMIPh-1648901089144)(Spark%E8%BF%90%E8%A1%8C%E5%8E%9F%E7%90%86.assets/wps1.jpg)]

  1. Spark 通过申请资源创建调度节点和计算节点

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Ec1hFTaa-1648901089145)(Spark%E8%BF%90%E8%A1%8C%E5%8E%9F%E7%90%86.assets/wps2.jpg)]

  1. Spark 框架根据需求将计算逻辑根据分区划分成不同的任务(p代表不同分区)

Driver中,多个rdd形成关联后(一般复杂计算逻辑都是由多个rdd关联组合成复杂逻辑),根据分区分解成一个一个的task,进入到任务迟中

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-fB2Sh7X9-1648901089147)(Spark%E8%BF%90%E8%A1%8C%E5%8E%9F%E7%90%86.assets/wps3.jpg)]

  1. 调度节点将任务根据计算节点状态发送到对应的计算节点进行计算

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Q6rwxrNJ-1648901089150)(Spark%E8%BF%90%E8%A1%8C%E5%8E%9F%E7%90%86.assets/wps4.jpg)]

从以上流程可以看出 RDD 在整个流程中主要用于将逻辑进行封装,并生成 Task 发送给Executor 节点执行计算,接下来我们就一起看看 Spark 框架中RDD 是具体是如何进行数据处理的。

二.RDD的创建方式

2.1.从集合(内存)中创建RDD

从集合中创建RDD,Spark 主要提供了两个方法:parallelize 和 makeRDD

val sparkConf =
new SparkConf().setMaster("local[*]").setAppName("spark")
val sparkContext = new SparkContext(sparkConf)
val rdd1 =
sparkContext.parallelize(List(1,
2,3,4)
)
val rdd2 =
sparkContext.makeRDD(List(1,
2,3,4)
)
rdd1.collect().foreach(println)

从底层代码实现来讲,makeRDD 方法其实就是parallelize 方法

def makeRDD[T:
ClassTag](seq:
Seq[T],
numSlices: Int = defaultParallelism): RDD[T] = withScope
{parallelize(seq, numSlices)

2.2.从外部存储(文件)创建RDD

由外部存储系统的数据集创建RDD 包括:本地的文件系统,所有Hadoop 支持的数据集, 比如HDFS、HBase 等。

val sparkConf =
new SparkConf().setMaster("local[*]").setAppName("spark")
val sparkContext = new SparkContext(sparkConf)
val fileRDD: RDD[String] = sparkContext.textFile("input")
fileRDD.collect().foreach(println)
sparkContext.stop()

三.RDD主要算子介绍

RDD分为转换算子和行动算子

3.1.转换算子

3.1.1 map

将处理的数据逐条进行映射转换,这里的转换可以是类型的转换,也可以是值的转换。

val rdd = spark.makeRDD(List(1, 2, 3, 4))
val mapRdd = rdd.map(num => {
num * 2
})
mapRdd.collect().foreach(println)

3.1.2 flatMap

将处理的数据进行扁平化后再进行映射处理,所以算子也称之为扁平映射

val rdd = spark.makeRDD(List(List(1, 2),List(3,4)))
val flatMapRdd = rdd.flatMap(list => {
list
})
flatMapRdd.collect().foreach(println)

1
2
3
4

3.1.3 groupBy

​ 将数据根据指定的规则进行分组, 分区默认不变,但是数据会被打乱重新组合,我们将这样 的操作称之为shuffle。极限情况下,数据可能被分在同一个分区中。

​ 一个组的数据在一个分区中,但是并不是说一个分区中只有一个组

val rdd = spark.makeRDD(List(1,2,3,4),1)
val groupByRdd = rdd.groupBy(num => {
num % 2
})
groupByRdd.saveAsTextFile("output")

3.1.4 filter

​ 将数据根据指定的规则进行筛选过滤,符合规则的数据保留,不符合规则的数据丢弃。当数据进行筛选过滤后,分区不变,但是分区内的数据可能不均衡,生产环境下,可能会出 现数据倾斜。

val rdd = spark.makeRDD(List(1,2,3,4),2)
val filterRdd = rdd.filter(num => {
num % 2 == 0
})
filterRdd.saveAsTextFile("output")

3.1.5 sortBy

​ 该操作用于排序数据。在排序之前,可以将数据通过 f 函数进行处理,之后按照 f 函数处理的结果进行排序,默认为升序排列。排序后新产生的 RDD 的分区数与原RDD 的分区数一致。中间存在 shuffle 的过程

val rdd = spark.makeRDD(List(6,4,3,2,5,1),2)
val sortByRdd = rdd.sortBy(num => num)
sortByRdd.collect().foreach(println)

1
2
3
4
5
6

3.1.6 reduceByKey

可以将数据按照相同的Key 对 Value 进行聚合

reduceByKey会寻找相同key的数据,当找到这样的两条记录时会对其value(分别记为x,y)做(x,y) => x+y的处理,即只保留求和之后的数据作为value。反复执行这个操作直至每个key只留下一条记录。

val rdd = spark.makeRDD(List(("a",2),("a",3),("c",4)))
val reduceByKeyRdd = rdd.reduceByKey((x, y) => x + y)
reduceByKeyRdd.collect().foreach(println)

(a,5)
(c,4)

3.1.7 groupByKey

将数据源的数据根据 key 对 value 进行分组

val rdd = spark.makeRDD(List(("a",2),("a",3),("c",4)))
val groupByKeyRdd = rdd.groupByKey()
groupByKeyRdd.collect().foreach(println)

(a,CompactBuffer(2, 3))
(c,CompactBuffer(4))

3.2.行动算子

3.2.1 reduce

聚集RDD 中的所有元素,先聚合分区内数据,再聚合分区间数据

val rdd = spark.makeRDD(List(1,2,3,4))
val result = rdd.reduce((num1, num2) => num1 + num2)
println(result)

10

3.2.2 collect

在驱动程序中,以数组Array 的形式返回数据集的所有元素

val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))
// 收集数据到 Driver
rdd.collect().foreach(println)

3.2.3 count

返回RDD 中元素的个数

val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))
// 返回 RDD 中元素的个数
val countResult: Long = rdd.count()

3.2.4 take

返回一个由RDD 的前 n 个元素组成的数组

val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))
// 返回 RDD 中元素的个数
val takeResult: Array[Int] = rdd.take(2)
println(takeResult.mkString(","))

3.2.5 countByKey

统计每种 key 的个数

val rdd: RDD[(Int, String)] = sc.makeRDD(List((1, "a"), (1, "a"), (1, "a"), (2,
"b"), (3, "c"), (3, "c")))
// 统计每种 key 的个数
val result: collection.Map[Int, Long] = rdd.countByKey()

3.2.6 save相关算子

将数据保存到不同格式的文件中

// 保存成 Text 文件
rdd.saveAsTextFile("output")
// 序列化成对象保存到文件
rdd.saveAsObjectFile("output1")
// 保存成 Sequencefile 文件
rdd.map((_,1)).saveAsSequenceFile("output2")

四.RDD序列化和依赖关系

4.1 序列化和闭包检查

4.1.1.为什么要序列化

object SerializerDemo {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("SerializerDemo")
val sc = new SparkContext(conf)
val rdd = sc.makeRDD(List(1, 2, 3, 4))
val user = new User()
rdd.foreach(num => {
println("age=" + user.age + num)
})
sc.stop()
}
class User {
var age: Int = 30
}
}

报错:Task not serializable

从计算的角度, 算子以外的代码都是在Driver 端执行, 算子里面的代码都是在 Executor端执行

报错原因:Spark 算子外部Driver端构建了User对象,算子内部执行时(在Executor执行)使用了User对象。
算子内部在Excutor执行时需要Driver端将User对象传递过来,而User对象要在网络传递需要被序列化

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-OhPoTOab-1648901089152)(Spark运行原理.assets/image-20220402094901562.png)]

解决办法:User类extends Serializable即可;或者再User前加case

4.1.2.闭包检查

object SerializerDemo {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("SerializerDemo")
val sc = new SparkContext(conf)
val rdd = sc.makeRDD(List())
val user = new User()
// list 为空foreach应该不会执行,但仍然报错
rdd.foreach(num => {
println("age=" + user.age + num)
})
sc.stop()
}
class User{
var age: Int = 30
}
}

报错:Task not serializable
报错位置:Exception in thread "main" org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
还未到任务执行阶段,在ClosureCleaner$.ensureSerializable处检测序列化时报错

报错原因:从计算的角度, 算子以外的代码都是在Driver 端执行, 算子里面的代码都是在 Executor
端执行。那么在 scala 的函数式编程中,就会导致算子内经常会用到算子外的数据,这样就
形成了闭包的效果,如果使用的算子外的数据无法序列化,就意味着无法传值给Executor
端执行,就会发生错误,所以需要在执行任务计算前,检测闭包内的对象是否可以进行序列
化,这个操作我们称之为闭包检测。Scala2.12 版本后闭包编译方式发生了改变

4.2.RDD依赖关系

4.2.1 RDD的依赖关系及血缘关系

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-70TSvYfd-1648901089154)(Spark运行原理.assets/image-20220402104207888.png)]

4.2.2 RDD血缘关系的保存

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-O3iqU3NX-1648901089155)(Spark运行原理.assets/image-20220402104658771.png)]

rdd保存依赖关系的示意图

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-53TTprp6-1648901089156)(Spark运行原理.assets/image-20220402105040292.png)]

4.2.3宽窄依赖

窄依赖(OneToOne依赖)

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-QCQMSXN7-1648901089158)(Spark运行原理.assets/image-20220402110202532.png)]

宽依赖(Shuffle依赖)

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-yyAVachj-1648901089159)(Spark运行原理.assets/image-20220402110412404.png)]

4.3 RDD阶段划分

窄依赖产生的任务个数(窄依赖不需要划分阶段)

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-fR8Vi6yC-1648901089160)(Spark运行原理.assets/image-20220402111415428.png)]

宽依赖产生的任务个数(宽依赖会划分阶段)

下一个阶段的执行需要等待上一个阶段的任务全部执行完成

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-NN3OhtKD-1648901089162)(Spark运行原理.assets/image-20220402111305958.png)]

rdd阶段的划分

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-5dlPz6Bs-1648901089163)(Spark运行原理.assets/image-20220402113128210.png)]

4.4 RDD任务的划分

RDD 任务切分中间分为:Application、Job、Stage 和 Task
⚫ Application:初始化一个 SparkContext 即生成一个Application;
⚫ Job:一个Action 算子就会生成一个Job;
⚫ Stage:Stage 等于宽依赖(ShuffleDependency)的个数加 1;
⚫ Task:一个 Stage 阶段中,最后一个RDD 的分区个数就是Task 的个数。
注意:Application->Job->Stage->Task 每一层都是 1 对 n 的关系。
一个应用程序对应多个Job(即一个应用程序中可能有多个行动算子),一个Job中可能会出现多个宽依赖(即一个Job中可能出现多个阶段),每个阶段最后一个RDD使用的分区数就是Task的当前阶段任务个数

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-XCZvkytM-1648901089164)(Spark运行原理.assets/image-20220402114133981.png)]


推荐阅读
  • 一、Hadoop来历Hadoop的思想来源于Google在做搜索引擎的时候出现一个很大的问题就是这么多网页我如何才能以最快的速度来搜索到,由于这个问题Google发明 ... [详细]
  • 本文介绍了如何使用php限制数据库插入的条数并显示每次插入数据库之间的数据数目,以及避免重复提交的方法。同时还介绍了如何限制某一个数据库用户的并发连接数,以及设置数据库的连接数和连接超时时间的方法。最后提供了一些关于浏览器在线用户数和数据库连接数量比例的参考值。 ... [详细]
  • Hadoop源码解析1Hadoop工程包架构解析
    1 Hadoop中各工程包依赖简述   Google的核心竞争技术是它的计算平台。Google的大牛们用了下面5篇文章,介绍了它们的计算设施。   GoogleCluster:ht ... [详细]
  • 本文介绍了在开发Android新闻App时,搭建本地服务器的步骤。通过使用XAMPP软件,可以一键式搭建起开发环境,包括Apache、MySQL、PHP、PERL。在本地服务器上新建数据库和表,并设置相应的属性。最后,给出了创建new表的SQL语句。这个教程适合初学者参考。 ... [详细]
  • 本文介绍了在Docker容器技术中限制容器对CPU的使用的方法,包括使用-c参数设置容器的内存限额,以及通过设置工作线程数量来充分利用CPU资源。同时,还介绍了容器权重分配的情况,以及如何通过top命令查看容器在CPU资源紧张情况下的使用情况。 ... [详细]
  • 本文讨论了在手机移动端如何使用HTML5和JavaScript实现视频上传并压缩视频质量,或者降低手机摄像头拍摄质量的问题。作者指出HTML5和JavaScript无法直接压缩视频,只能通过将视频传送到服务器端由后端进行压缩。对于控制相机拍摄质量,只有使用JAVA编写Android客户端才能实现压缩。此外,作者还解释了在交作业时使用zip格式压缩包导致CSS文件和图片音乐丢失的原因,并提供了解决方法。最后,作者还介绍了一个用于处理图片的类,可以实现图片剪裁处理和生成缩略图的功能。 ... [详细]
  • Oracle优化新常态的五大禁止及其性能隐患
    本文介绍了Oracle优化新常态中的五大禁止措施,包括禁止外键、禁止视图、禁止触发器、禁止存储过程和禁止JOB,并分析了这些禁止措施可能带来的性能隐患。文章还讨论了这些禁止措施在C/S架构和B/S架构中的不同应用情况,并提出了解决方案。 ... [详细]
  • 海马s5近光灯能否直接更换为H7?
    本文主要介绍了海马s5车型的近光灯是否可以直接更换为H7灯泡,并提供了完整的教程下载地址。此外,还详细讲解了DSP功能函数中的数据拷贝、数据填充和浮点数转换为定点数的相关内容。 ... [详细]
  • 嵌入式处理器的架构与内核发展历程
    本文主要介绍了嵌入式处理器的架构与内核发展历程,包括不同架构的指令集的变化,以及内核的流水线和结构。通过对ARM架构的分析,可以更好地理解嵌入式处理器的架构与内核的关系。 ... [详细]
  • 本文介绍了操作系统的定义和功能,包括操作系统的本质、用户界面以及系统调用的分类。同时还介绍了进程和线程的区别,包括进程和线程的定义和作用。 ... [详细]
  • Java和JavaScript是什么关系?java跟javaScript都是编程语言,只是java跟javaScript没有什么太大关系,一个是脚本语言(前端语言),一个是面向对象 ... [详细]
  • 大数据Hadoop生态(20)MapReduce框架原理OutputFormat的开发笔记
    本文介绍了大数据Hadoop生态(20)MapReduce框架原理OutputFormat的开发笔记,包括outputFormat接口实现类、自定义outputFormat步骤和案例。案例中将包含nty的日志输出到nty.log文件,其他日志输出到other.log文件。同时提供了一些相关网址供参考。 ... [详细]
  • Hadoop2.6.0 + 云centos +伪分布式只谈部署
    3.0.3玩不好,现将2.6.0tar.gz上传到usr,chmod-Rhadoop:hadophadoop-2.6.0,rm掉3.0.32.在etcp ... [详细]
  • 什么是大数据lambda架构
    一、什么是Lambda架构Lambda架构由Storm的作者[NathanMarz]提出,根据维基百科的定义,Lambda架构的设计是为了在处理大规模数 ... [详细]
  • mapreduce源码分析总结
    这篇文章总结的非常到位,故而转之一MapReduce概述MapReduce是一个用于大规模数据处理的分布式计算模型,它最初是由Google工程师设计并实现的ÿ ... [详细]
author-avatar
手机用户2602881147
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有