热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

深入理解Spark框架:RDD核心概念与操作详解

RDD是Spark框架的核心计算模型,全称为弹性分布式数据集(ResilientDistributedDataset)。本文详细解析了RDD的基本概念、特性及其在Spark中的关键操作,包括创建、转换和行动操作等,帮助读者深入理解Spark的工作原理和优化策略。通过具体示例和代码片段,进一步阐述了如何高效利用RDD进行大数据处理。

RDD 是 Spark 的计算模型。RDD(Resilient Distributed Dataset)叫做弹性的分布式数据集合,是 Spark 中最基本的数据抽象,它代表一个不可变、只读的,被分区的数据集。操作 RDD 就像操作本地集合一样,有很多的方法可以调用,使用方便,而无需关心底层的调度细节。

2. RDD的创建

Spark Core为我们提供了三种创建RDD的方式,包括:

  1. 使用程序中的集合创建RDD
  2. 使用HDFS文件创建RDD

2.1 Spark初始化

spark程序需要做的第一件事情,就是创建一个SparkContext对象,它将告诉spark如何访问一个集群,而要创建一个SparkContext对象,你首先要创建一个SparkConf对象,该对象访问了你的应用程序的信息,比如下面的代码:

SparkConf cOnf=new SparkConf();
        conf.set("参数", "参数值");     //因为jvm无法获得足够的资源
        JavaSparkContext sc = new JavaSparkContext("local", "First Spark App",conf);

2.2 使用程序中的集合创建RDD

List data = Arrays.asList(1, 2, 3, 4, 5);
        //并行集合,是通过对于驱动程序中的集合调用JavaSparkContext.parallelize来构建的RDD
        JavaRDD distData = sc.parallelize(data);

2.3 使用HDFS文件创建RDD

//通过hdfs上的文件定义一个RDD 这个数据暂时还没有加载到内存,也没有在上面执行动作,lines仅仅指向这个文件
        JavaRDD lines = sc.textFile("hdfs://master:9000/testFile/README.md");

3. RDD的两种算子

RDD支持两种类型的操作算子:Transformation与Action。

3.1 Transformation

Transformation操作会由一个RDD生成一个新的 RDD。Transformation操作是延迟计算的,也就是说从一个RDD转换生成另一个RDD的转换操作不是马上执行,需要等到Actions操作时,才真正开始运算。

3.2 Action

Action操作会对 RDD 计算出一个结果,并把结果返回到驱动器程序中,或把结果存储到外部存储系统(如 HDFS)中。

例如,first() 就是的一个行动操作,它会返回 RDD 的第一个元素。

result = testlines.first()

transformations操作和Action操作的区别在于Spark计算RDD 的方式不同。对于在任何时候transformations得到的新的RDD,Spark只会惰性计算。只有在一个Action操作中用到时,才会真正计算。这种策略也是spark性能高的部分原因。

比如,我们读取一个文本文件创建一个RDD,然后把其中包含spark的行筛选出来。如果Spark在我们运行lines = sc.textFile(test.txt) 时就把文件中所有的行都读取到内存中并存储起来,内存开销会很大,而我们接下来的操作会筛选掉其中的很多数据。相反, 如果Spark 在知道了完整的转化操作链之后,它就可以只计算求结果时真正需要的数据。

事实上,在执行行动操作 first()时,Spark也只是扫描文件直到找到第一个匹配的行为止,而不是读取整个文件。

3.3 RDD常用算子

Spark学习(二)——RDD基础

Spark32个常用算子总结

Spark常用算子详解
Spark学习(二)——RDD基础
Spark学习(二)——RDD基础

4. RDD的持久化机制

blog.csdn.net/weixin_3560…

RDD还有一个叫持久化的机制,就是在不同操作间,持久化(或缓存)一个数据集在内存中。当你持久化一个RDD,每一个结点都将把它的计算分块结果保存在内存中,并在对此数据集(或者衍生出的数据集)进行的其它动作中重用。这将使得后续的动作(action)变得更加迅速。缓存是用Spark构建迭代算法的关键。RDD的缓存能够在第一次计算完成后,将计算结果保存到内存、本地文件系统或者Tachyon(分布式内存文件系统)中。通过缓存,Spark避免了RDD上的重复计算,能够极大地提升计算速度。在Spark应用程序的调优中就会考虑到RDD的持久化的机制。

4.1 RDD持久化机制

  • Spark非常重要的一个功能特性就是可以将RDD 持久化在内存中,当对RDD执行持久化操作时,每个节点都会将自己操作的RDD的partition持久化到内存中,并且在之后对该RDD的反复使用中,直接使用内存缓存的partition,这样的话,对于针对一个RDD反复执行多个操作的场景,就只要对RDD计算一次即可,后面直接使用该RDD ,而不需要计算多次该RDD
  • 要持久化一个RDD,只要调用其cache()或者persist()方法即可。但是并不是这两个方法被调用时立即缓存,在该RDD第一次被计算出来时(触发后面的action时),该RDD将会被缓存在计算节点的内存中,并供后面重用。而且Spark的持久化机制还是自动容错的,如果持久化的RDD的任何partition丢失了,那么Spark会自动通过其源RDD,使用transformation操作重新计算该partition。
  • cache()和persist()的区别在于,cache()是persist()的一种简化方式,cache()的底层就是调用的persist()的无参版本,同时就是调用persist(MEMORY_ONLY),将数据持久化到内存中。如果需要从内存中去除缓存,那么可以使用unpersist()方法。

4.2 RDD的持久化的级别

顺便看一下RDD都有哪些缓存级别,查看 StorageLevel 类的源码:

object StorageLevel {
  val NOnE= new StorageLevel(false, false, false, false)
  val DISK_OnLY= new StorageLevel(true, false, false, false)
  val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
  val MEMORY_OnLY= new StorageLevel(false, true, false, true)
  val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
  val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
  val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
  val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
  val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
  val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
  val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
  val OFF_HEAP = new StorageLevel(false, false, true, false)
  ......
}

查看其构造参数

class StorageLevel private(
    private var _useDisk: Boolean,
    private var _useMemory: Boolean,
    private var _useOffHeap: Boolean,
    private var _deserialized: Boolean,
    private var _replication: Int = 1)
  extends Externalizable {
  ......
  def useDisk: Boolean = _useDisk
  def useMemory: Boolean = _useMemory
  def useOffHeap: Boolean = _useOffHeap
  def deserialized: Boolean = _deserialized
  def replication: Int = _replication
  ......
}

可以看到StorageLevel类的主构造器包含了5个参数:

  • useDisk:使用硬盘(外存)
  • useMemory:使用内存
  • useOffHeap:使用堆外内存,这是 Java 虚拟机里面的概念,堆外内存意味着把内存对象分配在Java虚拟机的堆以外的内存,这些内存直接受操作系统管理(而不是虚拟机)。这样做的结果就是能保持一个较小的堆,以减少垃圾收集对应用的影响。
  • deserialized:反序列化,其逆过程序列化(Serialization)是java提供的一种机制,将对象表示成一连串的字节;而反序列化就表示将字节恢复为对象的过程。序列化是对象永久化的一种机制,可以将对象及其属性保存起来,并能在反序列化后直接恢复这个对象。序列化方式存储对象可以节省磁盘或内存的空间,一般 序列化:反序列化=1:3
  • replication:备份数(在多个节点上备份)

理解了这5个参数,StorageLevel 的12种缓存级别就不难理解了。例如:

val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)

就表示使用这种缓存级别的RDD将存储在硬盘以及内存中,使用序列化(在硬盘中),并且在多个节点上备份2份(正常的RDD只有一份)

注意:持久化的单位为Partition

注意:当使用RDD的MEMORY_ONLY进行持久化的时候,当内存空间不够的时候,不会报OOM,它会选择最小的partiton来持久化在内存,当重新的使用RDD时候,其他的partition会根据依赖关系重新计算

4.3 选择持久化级别

Spark的多个存储级别意味着在内存利用率和cpu利用效率间的不同权衡。我们推荐通过下面的过程选择一个合适的存储级别:

  1. 如果你的RDD适合默认的存储级别(MEMORY_ONLY),就选择默认的存储级别。因为这是cpu利用率最高的选项,会使RDD上的操作尽可能的快。
  2. 如果不适合用默认的级别,选择MEMORY_ONLY_SER。选择一个更快的序列化库提高对象的空间使用率,但是仍能够相当快的访问。
  3. 除非函数计算RDD的花费较大或者它们需要过滤大量的数据,不要将RDD存储到磁盘上,否则,重复计算一个分区就会和重磁盘上读取数据一样慢。
  4. 如果你希望更快的错误恢复,可以利用重复存储级别。所有的存储级别都可以通过重复计算丢失的数据来支持完整的容错,但是重复的数据能够使你在RDD上继续运行任务,而不需要重复计算丢失的数据。
  5. 在拥有大量内存的环境中或者多应用程序的环境中,OFF_HEAP具有如下优势:
    • 它运行多个执行者共享Tachyon中相同的内存池
    • 它显著地减少垃圾回收的花费
    • 如果单个的执行者崩溃,缓存的数据不会丢失
Spark学习(二)——RDD基础

4.4 checkPoint

当业务场景非常的复杂的时候,RDD的lineage(血统)依赖会非常的长,一旦血统较后面的RDD数据丢失的时候,Spark会根据血统依赖重新的计算丢失的RDD,这样会造成计算的时间过长,Spark提供了一个叫checkPoint的算子来解决这样的业务场景。

Spark学习(二)——RDD基础
  • 为当前RDD设置检查点。该函数将会创建一个二进制的文件,并存储到checkpoint目录中,该目录是用SparkContext.setCheckpointDir()设置的。在checkpoint的过程中,该RDD的所有依赖于父RDD中的信息将全部被移出。对RDD进行checkpoint操作并不会马上被执行,必须执行Action操作才能触发。

4.4.1 checkPoint优点

  1. 持久化在HDFS上,HDFS默认的3副本备份使得持久化的备份数据更加的安全
  2. 切断RDD的依赖关系:当业务场景复杂的时候,RDD的依赖关系非常的长的时候,当靠后的RDD数据丢失的时候,会经历较长的重新计算的过程,采用checkPoint会转为依赖checkPointRDD,可以避免长的lineage重新计算。

4.4.2 checkPoint的原理

  1. 当finalRDD执行Action类算子计算job任务的时候,Spark会从finalRDD从后往前回溯查看哪些RDD使用了checkPoint算子
  2. 将使用了checkPoint的算子标记
  3. Spark会自动的启动一个job来重新计算标记了的RDD,并将计算的结果存入HDFS,然后切断RDD的依赖关系
  4. 建议在执行checkpoint()方法之前先对rdd进行persisted操作。 在checkPoint的RDD之前先cache RDD,那么Spark就不用启动一个job来计算checkPoint的RDD,而是将持久化到内存的数据直接拷贝到HDFS,进而提高Spark的计算速度,提高应用程序的性能

5. 共享变量

Spark共享变量(广播变量、累加器)

Spark两种共享变量:广播变量(broadcast variable)与累加器(accumulator)

累加器用来对信息进行聚合,而广播变量用来高效分发较大的对象。

5.1 广播变量

有时会变量是在driver端创建的,但是因为需要在excutor端使用,所以driver会把变量以task的形式发送到excutor端,如果有很多个task,就会有很多给excutor端携带很多个变量,如果这个变量非常大的时候,就可能会造成内存溢出(如下图所示)。这个时候就引出了广播变量。

Spark学习(二)——RDD基础

使用广播变量后:

Spark学习(二)——RDD基础
int factor = 3;
final Broadcast factorBroadcast = sc.broadcast(factor);
...
//在RDD计算中
int factor = factorBroadcast.value();

另外,为了确保所有的节点获得相同的变量,对象factorBroadcast广播后只读不能够被修改。

注意事项:

  • 能不能将一个RDD使用广播变量广播出去?

    不能,因为RDD是不存储数据的。可以将RDD的结果广播出去。 广播变量只能在Driver端定义,不能在Executor端定义。

5.2 累加器

Spark提供的Accumulator,主要用于多个节点对一个变量进行共享性的操作。Accumulator只提供了累加的功能,但是却给我们提供了多个task对一个变量并行操作的功能。task只能对Accumulator进行累加操作,不能读取它的值。只有Driver程序可以读取Accumulator的值。

final Accumulator sum = sc.accumulator(0);
...
//RDD计算中
sum.add(1);
...
//Driver端
System.out.println(sum.value());

累加器只能由Spark内部进行更新,并保证每个任务在累加器的更新操作仅执行一次,也就是说重启任务也不应该更新。在转换操作中,用户必须意识到任务和作业的调度过程重新执行会造成累加器的多次更新。

累加器同样具有Spark懒加载的求值模型。如果它们在RDD的操作中进行更新,它们的值只在RDD进行行动操作时才进行更新。


以上所述就是小编给大家介绍的《Spark学习(二)——RDD基础》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 我们 的支持!


推荐阅读
  • 优化ListView性能
    本文深入探讨了如何通过多种技术手段优化ListView的性能,包括视图复用、ViewHolder模式、分批加载数据、图片优化及内存管理等。这些方法能够显著提升应用的响应速度和用户体验。 ... [详细]
  • 1:有如下一段程序:packagea.b.c;publicclassTest{privatestaticinti0;publicintgetNext(){return ... [详细]
  • 本文详细介绍了Java编程语言中的核心概念和常见面试问题,包括集合类、数据结构、线程处理、Java虚拟机(JVM)、HTTP协议以及Git操作等方面的内容。通过深入分析每个主题,帮助读者更好地理解Java的关键特性和最佳实践。 ... [详细]
  • 2023年京东Android面试真题解析与经验分享
    本文由一位拥有6年Android开发经验的工程师撰写,详细解析了京东面试中常见的技术问题。涵盖引用传递、Handler机制、ListView优化、多线程控制及ANR处理等核心知识点。 ... [详细]
  • 从 .NET 转 Java 的自学之路:IO 流基础篇
    本文详细介绍了 Java 中的 IO 流,包括字节流和字符流的基本概念及其操作方式。探讨了如何处理不同类型的文件数据,并结合编码机制确保字符数据的正确读写。同时,文中还涵盖了装饰设计模式的应用,以及多种常见的 IO 操作实例。 ... [详细]
  • 深入解析 Apache Shiro 安全框架架构
    本文详细介绍了 Apache Shiro,一个强大且灵活的开源安全框架。Shiro 专注于简化身份验证、授权、会话管理和加密等复杂的安全操作,使开发者能够更轻松地保护应用程序。其核心目标是提供易于使用和理解的API,同时确保高度的安全性和灵活性。 ... [详细]
  • 本文深入探讨了MySQL中常见的面试问题,包括事务隔离级别、存储引擎选择、索引结构及优化等关键知识点。通过详细解析,帮助读者在面对BAT等大厂面试时更加从容。 ... [详细]
  • 本文探讨了Hive中内部表和外部表的区别及其在HDFS上的路径映射,详细解释了两者的创建、加载及删除操作,并提供了查看表详细信息的方法。通过对比这两种表类型,帮助读者理解如何更好地管理和保护数据。 ... [详细]
  • 本文深入探讨了Linux系统中网卡绑定(bonding)的七种工作模式。网卡绑定技术通过将多个物理网卡组合成一个逻辑网卡,实现网络冗余、带宽聚合和负载均衡,在生产环境中广泛应用。文章详细介绍了每种模式的特点、适用场景及配置方法。 ... [详细]
  • 网络运维工程师负责确保企业IT基础设施的稳定运行,保障业务连续性和数据安全。他们需要具备多种技能,包括搭建和维护网络环境、监控系统性能、处理突发事件等。本文将探讨网络运维工程师的职业前景及其平均薪酬水平。 ... [详细]
  • PHP 5.5.0rc1 发布:深入解析 Zend OPcache
    2013年5月9日,PHP官方发布了PHP 5.5.0rc1和PHP 5.4.15正式版,这两个版本均支持64位环境。本文将详细介绍Zend OPcache的功能及其在Windows环境下的配置与测试。 ... [详细]
  • 深入解析TCP/IP五层协议
    本文详细介绍了TCP/IP五层协议模型,包括物理层、数据链路层、网络层、传输层和应用层。每层的功能及其相互关系将被逐一解释,帮助读者理解互联网通信的原理。此外,还特别讨论了UDP和TCP协议的特点以及三次握手、四次挥手的过程。 ... [详细]
  • FinOps 与 Serverless 的结合:破解云成本难题
    本文探讨了如何通过 FinOps 实践优化 Serverless 应用的成本管理,提出了首个 Serverless 函数总成本估计模型,并分享了多种有效的成本优化策略。 ... [详细]
  • 本文详细介绍了 RosPack 类的功能和用法,探讨了其在 ROS 系统中的重要作用。RosPack 类提供了类似于终端命令 rospack 的功能,能够方便地查询和管理 ROS 包的相关信息。 ... [详细]
  • ElasticSearch 集群监控与优化
    本文详细介绍了如何有效地监控 ElasticSearch 集群,涵盖了关键性能指标、集群健康状况、统计信息以及内存和垃圾回收的监控方法。 ... [详细]
author-avatar
林斯诺_SNOW
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有