热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

spark性能优化>spark工作原理

从本篇文章开始,将开启spark学习和总结之旅,专门针对如何提高spark性能进

从本篇文章开始,将开启 spark 学习和总结之旅,专门针对如何提高 spark 性能进行总结,力图总结出一些干货。

无论你是从事算法工程师,还是数据分析又或是其他与数据相关工作,利用 spark 进行海量数据处理和建模都是非常重要和必须掌握的一门技术,我感觉编写 spark 代码是比较简单的,特别是利用 Spark SQL 下的 DataFrame 接口进行数据处理,只要有 python 基础都是非常容易入门的,但是在性能调优上,许多人都是一知半解,写的 spark 程序经常陷入 OOM 或卡死状态。这时深入了解 spark 原理就显得非常有必要了。

本系列总结主要针对 Hadoop YARN 模式。

RDD(Resilient Distributed Datasets)

RDD 是 spark 中最基本的数据抽象,存储在 exector 或 node 中,它代表一个 “惰性,”“静态”,“不可变”,“分布式“的数据集合,RDD 基本介绍在网上上太多了,这里就不做详细介绍了,主要讲下以下内容:

transform(转换)与 action(执行)的区别

转换操作:返回的是一个新的 RDD,常见的如:map、filter、flatMap、groupByKey 等等
执行操作:返回的是一个结果,一个数值或者是写入操作等,如 reduce、collect、count、first 等等

惰性计算

spark 中计算 RDD 是惰性的,也即 RDD 真正被计算(执行操作,例如写入存储操作、collect 操作等)时,其转换操作才会真正被执行。spark 为什么采用惰性计算:

在 MapReduce 中,大量的开发人员浪费在最小化 MapReduce 通过次数上。通过将操作合并在一起来实现。在 Spark 中,我们不创建单个执行图,而是将许多简单的操作结合在一起。因此,它造成了 Hadoop MapReduce 与 Apache Spark 之间的差异。

惰性设计的好处:

① 提高可管理性
可以查看整个 DAG(将对数据执行的所有转换的图形),并且可以使用该信息来优化计算。

② 降低时间复杂度和加快计算速度
只运算真正要计算的转换操作,并且可以根据 DAG 图,合并不需要与 drive 通信的操作(连续的依赖转换),例如在一个 RDD 上同时调用 map 和 filter 转换操作,spark 可以将 map 和 filter 指令发送到每个 executor 上,spark 程序在真正执行 map 和 filter 时,只需访问一次 record,而不是发送两组指令并两次访问分区。理论上相对于非惰性,将时间复杂度降低了一半。例如:

val list1 = list.map(i -> i * 3) // Transformation1
val list2 = list1.map(i -> i + 3) // Transformation1
val list3 = list1.map(i -> i / 3) // Transformation1
list3.collect() // ACTION

假设原始列表(list) 很大,其中包含数百万个元素。如果没有懒惰的评估,我们将完成三遍如此庞大的计算。如果我们假设一次这样的列表迭代需要 10 秒,那么整个评估就需要 30 秒。并且每个 RDD 都会缓存下来,浪费内存。使用惰性评估,Spark 可以将这三个转换像这样合并到一个转换中,如下:

val list3 = list.map(i -> i + 1)

它将只执行一次该操作。只需一次迭代即可完成,这意味着只需要 10 秒的时间。

容错性

RDD 本身包含其复制所需的所有依赖信息,一旦该 RDD 中某个分区丢失了,该 RDD 有足够需要重新计算的信息,可以去并行的,很快的重新计算丢失的分区。

运行在内存

在 spark application 的生命周期中,RDD 始终常驻内存(在所在的节点内存),这也是其比 MapReduce 更快的重要原因。

spark 中提供了三种内存管理机制:

① in-memory as deserialized data
这种常驻内存方式速度快(因为去掉了序列化时间),但是内存利用效率低。

② in-memory as serialized data
该方法内存利用效率高,但是速度慢

③ 直接存在 disk 上
对于那些较大容量的 RDD,没办法直接存在内存中,需要写入到 DISK 上。该方法仅适用于大容量 RDD。要持久化一个 RDD,只要调用其 cache()或者 persist()方法即可。在该 RDD 第一次被计算出来时,就会直接缓存在每个节点中。而且 Spark 的持久化机制还是自动容错的,如果持久化的 RDD 的任何 partition 丢失了,那么 Spark 会自动通过其源 RDD,使用 transformation 操作重新计算该 partition。

cache()和 persist()的区别在于,cache()是 persist()的一种简化方式,cache()的底层就是调用的 persist()的无参版本,同时就是调用 persist(MEMORY_ONLY),将数据持久化到内存中。如果需要从内存中清楚缓存,那么可以使用 unpersist()方法。

我们来仔细分析下持久化和非持久化的区别:

非持久化:持久化:

显然对于要复用多次的 RDD,要将其进行持久化操作,此时 Spark 就会根据你的持久化策略,将 RDD 中的数据保存到内存或者磁盘中。以后每次对这个 RDD 进行算子操作时,都会直接从内存或磁盘中提取持久化的 RDD 数据,然后执行算子,而不会从源头处重新计算一遍这个 RDD,再执行算子操作。 所以在写 spark 代码时:尽可能复用同一个 RDD。

这里常有个误区:

val rdd1 = ... // 读取hdfs数据,加载成RDD
rdd1.cache // 持久化操作


val rdd2 = rdd1.map(...)
val rdd3 = rdd1.filter(...)


rdd1.unpersist // 释放缓存


rdd2.take(10).foreach(println)
rdd3.take(10).foreach(println)

如果按上述代码进行持久化,则效果就如同没有持久化一样。原因就在于 spark 的 lazy 计算。

代码应该如下:

val rdd1 = ... // 读取hdfs数据,加载成RDD
rdd1.cache


val rdd2 = rdd1.map(...)
val rdd3 = rdd1.filter(...)


rdd2.take(10).foreach(println)
rdd3.take(10).foreach(println)


rdd1.unpersist

rdd2 执行 take 时,会先缓存 rdd1,接下来直接 rdd3 执行 take 时,直接利用缓存的 rdd1,最后,释放掉 rdd1。所以在何处释放 RDD 也是非常需要细心的。 请在 action 之后 unpersisit!!!

Spark Job Scheduling

窄依赖 与 宽依赖

shuffle 过程,简单来说,就是将分布在集群中多个节点上的同一个 key,拉取到同一个节点上,进行聚合或 join 等操作。比如 reduceByKey、join 等算子,都会触发 shuffle 操作。shuffle 操作需要将数据进行重新聚合和划分,然后分配到集群的各个节点上进行下一个 stage 操作,这里会涉及集群不同节点间的大量数据交换。由于不同节点间的数据通过网络进行传输时需要先将数据写入磁盘,因此集群中每个节点均有大量的文件读写操作,从而导致 shuffle 操作十分耗时(相对于 map 操作)。

窄依赖:父 RDD 与 子 RDD 的分区是一对一(map 操作)或多对一(coalesce)的,不会有 shuffle 过程;并且子 RDD 的分区结果与其 key 和 value 值无关,每个分区与其他分区亦无关。

上面左图可对应 map 操作分区,右图对应 coalesce 操作。

宽依赖:父 RDD 与子 RDD 的分区是一对多的关系,并且是按一定方式进行重分区,会有 shuffle 过程产生,比较耗时,可能会引发 spark 性能问题。常见的宽依赖操作如:groupByKey、reduceByKey、sort、sortByKey 等等。注意:coalesce 操作如果是将 10 个分区换成 100 个分区,由少分区转成大分区将会发生 shuffle 过程。coalesce 操作场景主要是 rdd 经过多层过滤后的小文件合并。rdd 的 reparation 方法与 coalesce 相反,主要是为了 处理数据倾斜,增加 partiton 的数量使得每个 task 处理的数据量减少,肯定会有 shuffle 过程产生(repartition 其实调用的就是 coalesce,只不过 shuffle = true (coalesce 中 shuffle: Boolean = false))。

Spark Application

一个 spark 应用主要由一系列的 spark Job 组成,而这些 spark Job 由 sparkContext 定义而来。当 SparkContent 启动时,一个 driver 和一系列的 executor 会在集群的工作节点上启动。每个 executor 都有个 JVM 虚拟环境,一个 executor 不能跨越多个节点。

上图表示在一个分布式系统上启动一个 spark application 的物理硬件层面流程。

  1. 启动一个 SparkContext
  2. 驱动程序(driver program)会定义一个集群管理(cluster manager)
  3. cluster manager 会在工作节点上启动一些 executor,运行提交的代码(注意:一个节点 node 上会有多个 executor,但是一个 executor 不能跨越多个 node

需要注意以下两点:

  • 一个节点 node 上会有多个 executor,但是一个 executor 不能跨越多个 node
  • 每个 executor 会有多个分区,但是一个分区不能跨越多个 executor

DAG(Directed Acyclic Graph)详解

spark Application tree

简而言之:一个 spark Application 由多个 Job 组成,Job 由提交代码中的 Action 操作定义,而一个 Action 操作由多个 Stage 组成,Stage 的分割由宽依赖进行分割的,而每个 Stage 又由多个 Task 组成。一个 Task 对应一个分区,一个 task 会被分配到一个 executor 上执行。

每个 Job 都对应一个 DAG 图,每个 DAG 有一系列的 Stage 组成。

  • Job:每个 Job 对应一个 Action 操作,在 spark execution Graph 中,其边是基于代码中的 transform 操作的依赖关系定义的。
  • Stages:每个 Action 中可能包含一个或多个 transform 操作,其中宽依赖又将 Job 划分成多个 Stage。因为 Stages 的边缘需要和 driver 进行通信,故通常一个 Job 里,必须顺序的执行 Stages 而非并行。并且会将多个窄依赖步骤合并成一个步骤,因为其中没有的转换操作没有 shuffle 过程,可以通过只访问一次数据,连续执行多个 transform 操作,这也是上面提到的惰性计算的优点

def simpleSparkProgram(rdd : RDD[Double]): LOng={
//stage1
rdd.filter(_<1000.0)
.map(x => (x, x) )
//stage2
.groupByKey()
.map{ case(value, groups) => (groups.sum, value)}
//stage 3
.sortByKey()
.count()
}

其代码中对应的 Stage 如下:

  • Task:task 是 spark 中最小最基本的执行单元,每个 task 代表一个局部的计算任务。在 executor 中可以有多个 core,而每个 core 可以对应一个 task,每个 task 针对一个分区。 每次针对不同的一块分区,执行相同的代码。

注意:

  • spark 中同时并行的 task 数量不能超过所有 executor core 数量。 其中 所有 executor cores 数量= 每个 executor 中 core 数量 * executor 数量。
  • task 的并行化是有 executor 数量 × core 数量决定的。task 过多,并行化过小,就会浪费时间;反之就会浪费资源。所以设置参数是一个需要权衡的过程,原则就是在已有的资源情况下,充分利用内存和并行化。

总结

对于 DAG 的深刻理解非常重要,如果理解不深刻则可能定位问题的效率不高。比如常见的数据倾斜。当理解了这些,如果出现了数据倾斜,可以分析 job,stage 和 task,找到部分 task 输入的严重不平衡,最终定位是数据问题或计算逻辑问题。

参考

  • High Performance Spark
  • https://www.quora.com/What-is-the-reason-behind-keeping-lazy-evaluation-in-Apache-Spark
  • https://data-flair.training/blogs/apache-spark-lazy-evaluation/
  • http://bourneli.github.io/scala/spark/2016/06/17/spark-unpersist-after-action.html




推荐阅读
  • 基于PgpoolII的PostgreSQL集群安装与配置教程
    本文介绍了基于PgpoolII的PostgreSQL集群的安装与配置教程。Pgpool-II是一个位于PostgreSQL服务器和PostgreSQL数据库客户端之间的中间件,提供了连接池、复制、负载均衡、缓存、看门狗、限制链接等功能,可以用于搭建高可用的PostgreSQL集群。文章详细介绍了通过yum安装Pgpool-II的步骤,并提供了相关的官方参考地址。 ... [详细]
  • 本文介绍了Java工具类库Hutool,该工具包封装了对文件、流、加密解密、转码、正则、线程、XML等JDK方法的封装,并提供了各种Util工具类。同时,还介绍了Hutool的组件,包括动态代理、布隆过滤、缓存、定时任务等功能。该工具包可以简化Java代码,提高开发效率。 ... [详细]
  • 本文介绍了在Oracle数据库中创建序列时如何选择cache或nocache参数。cache参数可以提高序列的存取速度,但可能会导致序列丢失;nocache参数可以避免序列丢失,但在高并发访问时可能导致性能问题。文章详细解释了两者的区别和使用场景。 ... [详细]
  • 前景:当UI一个查询条件为多项选择,或录入多个条件的时候,比如查询所有名称里面包含以下动态条件,需要模糊查询里面每一项时比如是这样一个数组条件:newstring[]{兴业银行, ... [详细]
  • web.py开发web 第八章 Formalchemy 服务端验证方法
    本文介绍了在web.py开发中使用Formalchemy进行服务端表单数据验证的方法。以User表单为例,详细说明了对各字段的验证要求,包括必填、长度限制、唯一性等。同时介绍了如何自定义验证方法来实现验证唯一性和两个密码是否相等的功能。该文提供了相关代码示例。 ... [详细]
  • 一句话解决高并发的核心原则
    本文介绍了解决高并发的核心原则,即将用户访问请求尽量往前推,避免访问CDN、静态服务器、动态服务器、数据库和存储,从而实现高性能、高并发、高可扩展的网站架构。同时提到了Google的成功案例,以及适用于千万级别PV站和亿级PV网站的架构层次。 ... [详细]
  • 本文介绍了OpenStack的逻辑概念以及其构成简介,包括了软件开源项目、基础设施资源管理平台、三大核心组件等内容。同时还介绍了Horizon(UI模块)等相关信息。 ... [详细]
  • Explain如何助力SQL语句的优化及其分析方法
    本文介绍了Explain如何助力SQL语句的优化以及分析方法。Explain是一个数据库SQL语句的模拟器,通过对SQL语句的模拟返回一个性能分析表,从而帮助工程师了解程序运行缓慢的原因。文章还介绍了Explain运行方法以及如何分析Explain表格中各个字段的含义。MySQL 5.5开始支持Explain功能,但仅限于select语句,而MySQL 5.7逐渐支持对update、delete和insert语句的模拟和分析。 ... [详细]
  • Todayatworksomeonetriedtoconvincemethat:今天在工作中有人试图说服我:{$obj->getTableInfo()}isfine ... [详细]
  • 篇首语:本文由编程笔记#小编为大家整理,主要介绍了软件测试知识点之数据库压力测试方法小结相关的知识,希望对你有一定的参考价值。 ... [详细]
  • 深入理解Java虚拟机的并发编程与性能优化
    本文主要介绍了Java内存模型与线程的相关概念,探讨了并发编程在服务端应用中的重要性。同时,介绍了Java语言和虚拟机提供的工具,帮助开发人员处理并发方面的问题,提高程序的并发能力和性能优化。文章指出,充分利用计算机处理器的能力和协调线程之间的并发操作是提高服务端程序性能的关键。 ... [详细]
  • php缓存ri,浅析ThinkPHP缓存之快速缓存(F方法)和动态缓存(S方法)(日常整理)
    thinkPHP的F方法只能用于缓存简单数据类型,不支持有效期和缓存对象。S()缓存方法支持有效期,又称动态缓存方法。本文是小编日常整理有关thinkp ... [详细]
  • 云原生应用最佳开发实践之十二原则(12factor)
    目录简介一、基准代码二、依赖三、配置四、后端配置五、构建、发布、运行六、进程七、端口绑定八、并发九、易处理十、开发与线上环境等价十一、日志十二、进程管理当 ... [详细]
  • 本文介绍了在PostgreSQL中批量导入数据时的优化方法。包括使用unlogged表、删除重建索引、删除重建外键、禁用触发器、使用COPY方法、批量插入等。同时还提到了一些参数优化的注意事项,如设置effective_cache_size、shared_buffer等,并强调了在导入大量数据后使用analyze命令重新收集统计信息的重要性。 ... [详细]
  • 1Lock与ReadWriteLock1.1LockpublicinterfaceLock{voidlock();voidlockInterruptibl ... [详细]
author-avatar
枫的Lilyxj
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有