热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

深入理解Spark框架:RDD核心概念与操作详解

RDD是Spark框架的核心计算模型,全称为弹性分布式数据集(ResilientDistributedDataset)。本文详细解析了RDD的基本概念、特性及其在Spark中的关键操作,包括创建、转换和行动操作等,帮助读者深入理解Spark的工作原理和优化策略。通过具体示例和代码片段,进一步阐述了如何高效利用RDD进行大数据处理。

RDD 是 Spark 的计算模型。RDD(Resilient Distributed Dataset)叫做弹性的分布式数据集合,是 Spark 中最基本的数据抽象,它代表一个不可变、只读的,被分区的数据集。操作 RDD 就像操作本地集合一样,有很多的方法可以调用,使用方便,而无需关心底层的调度细节。

2. RDD的创建

Spark Core为我们提供了三种创建RDD的方式,包括:

  1. 使用程序中的集合创建RDD
  2. 使用HDFS文件创建RDD

2.1 Spark初始化

spark程序需要做的第一件事情,就是创建一个SparkContext对象,它将告诉spark如何访问一个集群,而要创建一个SparkContext对象,你首先要创建一个SparkConf对象,该对象访问了你的应用程序的信息,比如下面的代码:

SparkConf cOnf=new SparkConf();
        conf.set("参数", "参数值");     //因为jvm无法获得足够的资源
        JavaSparkContext sc = new JavaSparkContext("local", "First Spark App",conf);

2.2 使用程序中的集合创建RDD

List data = Arrays.asList(1, 2, 3, 4, 5);
        //并行集合,是通过对于驱动程序中的集合调用JavaSparkContext.parallelize来构建的RDD
        JavaRDD distData = sc.parallelize(data);

2.3 使用HDFS文件创建RDD

//通过hdfs上的文件定义一个RDD 这个数据暂时还没有加载到内存,也没有在上面执行动作,lines仅仅指向这个文件
        JavaRDD lines = sc.textFile("hdfs://master:9000/testFile/README.md");

3. RDD的两种算子

RDD支持两种类型的操作算子:Transformation与Action。

3.1 Transformation

Transformation操作会由一个RDD生成一个新的 RDD。Transformation操作是延迟计算的,也就是说从一个RDD转换生成另一个RDD的转换操作不是马上执行,需要等到Actions操作时,才真正开始运算。

3.2 Action

Action操作会对 RDD 计算出一个结果,并把结果返回到驱动器程序中,或把结果存储到外部存储系统(如 HDFS)中。

例如,first() 就是的一个行动操作,它会返回 RDD 的第一个元素。

result = testlines.first()

transformations操作和Action操作的区别在于Spark计算RDD 的方式不同。对于在任何时候transformations得到的新的RDD,Spark只会惰性计算。只有在一个Action操作中用到时,才会真正计算。这种策略也是spark性能高的部分原因。

比如,我们读取一个文本文件创建一个RDD,然后把其中包含spark的行筛选出来。如果Spark在我们运行lines = sc.textFile(test.txt) 时就把文件中所有的行都读取到内存中并存储起来,内存开销会很大,而我们接下来的操作会筛选掉其中的很多数据。相反, 如果Spark 在知道了完整的转化操作链之后,它就可以只计算求结果时真正需要的数据。

事实上,在执行行动操作 first()时,Spark也只是扫描文件直到找到第一个匹配的行为止,而不是读取整个文件。

3.3 RDD常用算子

Spark学习(二)——RDD基础

Spark32个常用算子总结

Spark常用算子详解
Spark学习(二)——RDD基础
Spark学习(二)——RDD基础

4. RDD的持久化机制

blog.csdn.net/weixin_3560…

RDD还有一个叫持久化的机制,就是在不同操作间,持久化(或缓存)一个数据集在内存中。当你持久化一个RDD,每一个结点都将把它的计算分块结果保存在内存中,并在对此数据集(或者衍生出的数据集)进行的其它动作中重用。这将使得后续的动作(action)变得更加迅速。缓存是用Spark构建迭代算法的关键。RDD的缓存能够在第一次计算完成后,将计算结果保存到内存、本地文件系统或者Tachyon(分布式内存文件系统)中。通过缓存,Spark避免了RDD上的重复计算,能够极大地提升计算速度。在Spark应用程序的调优中就会考虑到RDD的持久化的机制。

4.1 RDD持久化机制

  • Spark非常重要的一个功能特性就是可以将RDD 持久化在内存中,当对RDD执行持久化操作时,每个节点都会将自己操作的RDD的partition持久化到内存中,并且在之后对该RDD的反复使用中,直接使用内存缓存的partition,这样的话,对于针对一个RDD反复执行多个操作的场景,就只要对RDD计算一次即可,后面直接使用该RDD ,而不需要计算多次该RDD
  • 要持久化一个RDD,只要调用其cache()或者persist()方法即可。但是并不是这两个方法被调用时立即缓存,在该RDD第一次被计算出来时(触发后面的action时),该RDD将会被缓存在计算节点的内存中,并供后面重用。而且Spark的持久化机制还是自动容错的,如果持久化的RDD的任何partition丢失了,那么Spark会自动通过其源RDD,使用transformation操作重新计算该partition。
  • cache()和persist()的区别在于,cache()是persist()的一种简化方式,cache()的底层就是调用的persist()的无参版本,同时就是调用persist(MEMORY_ONLY),将数据持久化到内存中。如果需要从内存中去除缓存,那么可以使用unpersist()方法。

4.2 RDD的持久化的级别

顺便看一下RDD都有哪些缓存级别,查看 StorageLevel 类的源码:

object StorageLevel {
  val NOnE= new StorageLevel(false, false, false, false)
  val DISK_OnLY= new StorageLevel(true, false, false, false)
  val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
  val MEMORY_OnLY= new StorageLevel(false, true, false, true)
  val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
  val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
  val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
  val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
  val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
  val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
  val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
  val OFF_HEAP = new StorageLevel(false, false, true, false)
  ......
}

查看其构造参数

class StorageLevel private(
    private var _useDisk: Boolean,
    private var _useMemory: Boolean,
    private var _useOffHeap: Boolean,
    private var _deserialized: Boolean,
    private var _replication: Int = 1)
  extends Externalizable {
  ......
  def useDisk: Boolean = _useDisk
  def useMemory: Boolean = _useMemory
  def useOffHeap: Boolean = _useOffHeap
  def deserialized: Boolean = _deserialized
  def replication: Int = _replication
  ......
}

可以看到StorageLevel类的主构造器包含了5个参数:

  • useDisk:使用硬盘(外存)
  • useMemory:使用内存
  • useOffHeap:使用堆外内存,这是 Java 虚拟机里面的概念,堆外内存意味着把内存对象分配在Java虚拟机的堆以外的内存,这些内存直接受操作系统管理(而不是虚拟机)。这样做的结果就是能保持一个较小的堆,以减少垃圾收集对应用的影响。
  • deserialized:反序列化,其逆过程序列化(Serialization)是java提供的一种机制,将对象表示成一连串的字节;而反序列化就表示将字节恢复为对象的过程。序列化是对象永久化的一种机制,可以将对象及其属性保存起来,并能在反序列化后直接恢复这个对象。序列化方式存储对象可以节省磁盘或内存的空间,一般 序列化:反序列化=1:3
  • replication:备份数(在多个节点上备份)

理解了这5个参数,StorageLevel 的12种缓存级别就不难理解了。例如:

val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)

就表示使用这种缓存级别的RDD将存储在硬盘以及内存中,使用序列化(在硬盘中),并且在多个节点上备份2份(正常的RDD只有一份)

注意:持久化的单位为Partition

注意:当使用RDD的MEMORY_ONLY进行持久化的时候,当内存空间不够的时候,不会报OOM,它会选择最小的partiton来持久化在内存,当重新的使用RDD时候,其他的partition会根据依赖关系重新计算

4.3 选择持久化级别

Spark的多个存储级别意味着在内存利用率和cpu利用效率间的不同权衡。我们推荐通过下面的过程选择一个合适的存储级别:

  1. 如果你的RDD适合默认的存储级别(MEMORY_ONLY),就选择默认的存储级别。因为这是cpu利用率最高的选项,会使RDD上的操作尽可能的快。
  2. 如果不适合用默认的级别,选择MEMORY_ONLY_SER。选择一个更快的序列化库提高对象的空间使用率,但是仍能够相当快的访问。
  3. 除非函数计算RDD的花费较大或者它们需要过滤大量的数据,不要将RDD存储到磁盘上,否则,重复计算一个分区就会和重磁盘上读取数据一样慢。
  4. 如果你希望更快的错误恢复,可以利用重复存储级别。所有的存储级别都可以通过重复计算丢失的数据来支持完整的容错,但是重复的数据能够使你在RDD上继续运行任务,而不需要重复计算丢失的数据。
  5. 在拥有大量内存的环境中或者多应用程序的环境中,OFF_HEAP具有如下优势:
    • 它运行多个执行者共享Tachyon中相同的内存池
    • 它显著地减少垃圾回收的花费
    • 如果单个的执行者崩溃,缓存的数据不会丢失
Spark学习(二)——RDD基础

4.4 checkPoint

当业务场景非常的复杂的时候,RDD的lineage(血统)依赖会非常的长,一旦血统较后面的RDD数据丢失的时候,Spark会根据血统依赖重新的计算丢失的RDD,这样会造成计算的时间过长,Spark提供了一个叫checkPoint的算子来解决这样的业务场景。

Spark学习(二)——RDD基础
  • 为当前RDD设置检查点。该函数将会创建一个二进制的文件,并存储到checkpoint目录中,该目录是用SparkContext.setCheckpointDir()设置的。在checkpoint的过程中,该RDD的所有依赖于父RDD中的信息将全部被移出。对RDD进行checkpoint操作并不会马上被执行,必须执行Action操作才能触发。

4.4.1 checkPoint优点

  1. 持久化在HDFS上,HDFS默认的3副本备份使得持久化的备份数据更加的安全
  2. 切断RDD的依赖关系:当业务场景复杂的时候,RDD的依赖关系非常的长的时候,当靠后的RDD数据丢失的时候,会经历较长的重新计算的过程,采用checkPoint会转为依赖checkPointRDD,可以避免长的lineage重新计算。

4.4.2 checkPoint的原理

  1. 当finalRDD执行Action类算子计算job任务的时候,Spark会从finalRDD从后往前回溯查看哪些RDD使用了checkPoint算子
  2. 将使用了checkPoint的算子标记
  3. Spark会自动的启动一个job来重新计算标记了的RDD,并将计算的结果存入HDFS,然后切断RDD的依赖关系
  4. 建议在执行checkpoint()方法之前先对rdd进行persisted操作。 在checkPoint的RDD之前先cache RDD,那么Spark就不用启动一个job来计算checkPoint的RDD,而是将持久化到内存的数据直接拷贝到HDFS,进而提高Spark的计算速度,提高应用程序的性能

5. 共享变量

Spark共享变量(广播变量、累加器)

Spark两种共享变量:广播变量(broadcast variable)与累加器(accumulator)

累加器用来对信息进行聚合,而广播变量用来高效分发较大的对象。

5.1 广播变量

有时会变量是在driver端创建的,但是因为需要在excutor端使用,所以driver会把变量以task的形式发送到excutor端,如果有很多个task,就会有很多给excutor端携带很多个变量,如果这个变量非常大的时候,就可能会造成内存溢出(如下图所示)。这个时候就引出了广播变量。

Spark学习(二)——RDD基础

使用广播变量后:

Spark学习(二)——RDD基础
int factor = 3;
final Broadcast factorBroadcast = sc.broadcast(factor);
...
//在RDD计算中
int factor = factorBroadcast.value();

另外,为了确保所有的节点获得相同的变量,对象factorBroadcast广播后只读不能够被修改。

注意事项:

  • 能不能将一个RDD使用广播变量广播出去?

    不能,因为RDD是不存储数据的。可以将RDD的结果广播出去。 广播变量只能在Driver端定义,不能在Executor端定义。

5.2 累加器

Spark提供的Accumulator,主要用于多个节点对一个变量进行共享性的操作。Accumulator只提供了累加的功能,但是却给我们提供了多个task对一个变量并行操作的功能。task只能对Accumulator进行累加操作,不能读取它的值。只有Driver程序可以读取Accumulator的值。

final Accumulator sum = sc.accumulator(0);
...
//RDD计算中
sum.add(1);
...
//Driver端
System.out.println(sum.value());

累加器只能由Spark内部进行更新,并保证每个任务在累加器的更新操作仅执行一次,也就是说重启任务也不应该更新。在转换操作中,用户必须意识到任务和作业的调度过程重新执行会造成累加器的多次更新。

累加器同样具有Spark懒加载的求值模型。如果它们在RDD的操作中进行更新,它们的值只在RDD进行行动操作时才进行更新。


以上所述就是小编给大家介绍的《Spark学习(二)——RDD基础》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 我们 的支持!


推荐阅读
  • 本文将介绍如何编写一些有趣的VBScript脚本,这些脚本可以在朋友之间进行无害的恶作剧。通过简单的代码示例,帮助您了解VBScript的基本语法和功能。 ... [详细]
  • 优化ListView性能
    本文深入探讨了如何通过多种技术手段优化ListView的性能,包括视图复用、ViewHolder模式、分批加载数据、图片优化及内存管理等。这些方法能够显著提升应用的响应速度和用户体验。 ... [详细]
  • 深入解析JVM垃圾收集器
    本文基于《深入理解Java虚拟机:JVM高级特性与最佳实践》第二版,详细探讨了JVM中不同类型的垃圾收集器及其工作原理。通过介绍各种垃圾收集器的特性和应用场景,帮助读者更好地理解和优化JVM内存管理。 ... [详细]
  • 本文详细介绍了Java中org.neo4j.helpers.collection.Iterators.single()方法的功能、使用场景及代码示例,帮助开发者更好地理解和应用该方法。 ... [详细]
  • 本文介绍了Java并发库中的阻塞队列(BlockingQueue)及其典型应用场景。通过具体实例,展示了如何利用LinkedBlockingQueue实现线程间高效、安全的数据传递,并结合线程池和原子类优化性能。 ... [详细]
  • 1.如何在运行状态查看源代码?查看函数的源代码,我们通常会使用IDE来完成。比如在PyCharm中,你可以Ctrl+鼠标点击进入函数的源代码。那如果没有IDE呢?当我们想使用一个函 ... [详细]
  • Explore a common issue encountered when implementing an OAuth 1.0a API, specifically the inability to encode null objects and how to resolve it. ... [详细]
  • 本文介绍了如何使用 Spring Boot DevTools 实现应用程序在开发过程中自动重启。这一特性显著提高了开发效率,特别是在集成开发环境(IDE)中工作时,能够提供快速的反馈循环。默认情况下,DevTools 会监控类路径上的文件变化,并根据需要触发应用重启。 ... [详细]
  • 本文探讨了Hive中内部表和外部表的区别及其在HDFS上的路径映射,详细解释了两者的创建、加载及删除操作,并提供了查看表详细信息的方法。通过对比这两种表类型,帮助读者理解如何更好地管理和保护数据。 ... [详细]
  • Java 类成员初始化顺序与数组创建
    本文探讨了Java中类成员的初始化顺序、静态引入、可变参数以及finalize方法的应用。通过具体的代码示例,详细解释了这些概念及其在实际编程中的使用。 ... [详细]
  • 1:有如下一段程序:packagea.b.c;publicclassTest{privatestaticinti0;publicintgetNext(){return ... [详细]
  • 本文详细介绍了如何在Linux系统上安装和配置Smokeping,以实现对网络链路质量的实时监控。通过详细的步骤和必要的依赖包安装,确保用户能够顺利完成部署并优化其网络性能监控。 ... [详细]
  • 本文介绍如何使用 Python 将一个字符串按照指定的行和元素分隔符进行两次拆分,最终将字符串转换为矩阵形式。通过两种不同的方法实现这一功能:一种是使用循环与 split() 方法,另一种是利用列表推导式。 ... [详细]
  • 本文介绍如何在 Android 中通过代码模拟用户的点击和滑动操作,包括参数说明、事件生成及处理逻辑。详细解析了视图(View)对象、坐标偏移量以及不同类型的滑动方式。 ... [详细]
  • Windows服务与数据库交互问题解析
    本文探讨了在Windows 10(64位)环境下开发的Windows服务,旨在定期向本地MS SQL Server (v.11)插入记录。尽管服务已成功安装并运行,但记录并未正确插入。我们将详细分析可能的原因及解决方案。 ... [详细]
author-avatar
林斯诺_SNOW
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有