热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Spark面对OOM问题的解决方法及优化总结(转载)

Spark面对OOM问题的解决方法及优化总结(转载)转载地址:http:blog.csdn.netyhb315279058articledetails51035631
Spark面对OOM问题的解决方法及优化总结 (转载)

转载地址: http://blog.csdn.net/yhb315279058/article/details/51035631

Spark中的OOM问题不外乎以下两种情况
  • map执行中内存溢出
  • shuffle后内存溢出
map执行中内存溢出代表了所有map类型的操作,包括:flatMap,filter,mapPatitions等。shuffle后内存溢出的shuffle操作包括join,reduceByKey,repartition等操作。后面先总结一下我对Spark内存模型的理解,再总结各种OOM的情况相对应的解决办法和性能优化方面的总结。如果理解有错,希望在评论中指出。

Spark 内存模型:
Spark在一个Executor中的内存分为三块,一块是execution内存,一块是storage内存,一块是other内存。
  • execution内存是执行内存,文档中说join,aggregate都在这部分内存中执行,shuffle的数据也会先缓存在这个内存中,满了再写入磁盘,能够减少IO。其实map过程也是在这个内存中执行的。
  • storage内存是存储broadcast,cache,persist数据的地方。
  • other内存是程序执行时预留给自己的内存。
execution和storage是Spark Executor中内存的大户,other占用内存相对少很多,这里就不说了。在spark-1.6.0以前的版本,execution和storage的内存分配是固定的,使用的参数配置分别是spark.shuffle.memoryFraction(execution内存占Executor总内存大小,default 0.2)和spark.storage.memoryFraction(storage内存占Executor内存大小,default 0.6),因为是1.6.0以前这两块内存是互相隔离的,这就导致了Executor的内存利用率不高,而且需要根据Application的具体情况,使用者自己来调节这两个参数才能优化Spark的内存使用。在spark-1.6.0以上的版本,execution内存和storage内存可以相互借用,提高了内存的Spark中内存的使用率,同时也减少了OOM的情况。
在Spark-1.6.0后加入了堆外内存,进一步优化了Spark的内存使用,堆外内存使用JVM堆以外的内存,不会被gc回收,可以减少频繁的full gc,所以在Spark程序中,会长时间逗留再Spark程序中的大内存对象可以使用堆外内存存储。使用堆外内存有两种方式,一种是在rdd调用persist的时候传入参数StorageLevel.OFF_HEAP,这种使用方式需要配合Tachyon一起使用。另外一种是使用Spark自带的spark.memory.offHeap.enabled 配置为true进行使用,但是这种方式在1.6.0的版本还不支持使用,只是多了这个参数,在以后的版本中会开放。
OOM的问题通常出现在execution这块内存中,因为storage这块内存在存放数据满了之后,会直接丢弃内存中旧的数据,对性能有影响但是不会有OOM的问题。

内存溢出解决方法:
1. map过程产生大量对象导致内存溢出:
这种溢出的原因是在单个map中产生了大量的对象导致的&#xff0c;例如&#xff1a;rdd.map(x&#61;>for(i <- 1 to 10000) yield i.toString)&#xff0c;这个操作在rdd中&#xff0c;每个对象都产生了10000个对象&#xff0c;这肯定很容易产生内存溢出的问题。针对这种问题&#xff0c;在不增加内存的情况下&#xff0c;可以通过减少每个Task的大小&#xff0c;以便达到每个Task即使产生大量的对象Executor的内存也能够装得下。具体做法可以在会产生大量对象的map操作之前调用repartition方法&#xff0c;分区成更小的块传入map。例如&#xff1a;rdd.repartition(10000).map(x&#61;>for(i <- 1 to 10000) yield i.toString)。
面对这种问题注意&#xff0c;不能使用rdd.coalesce方法&#xff0c;这个方法只能减少分区&#xff0c;不能增加分区&#xff0c;不会有shuffle的过程。

2.数据不平衡导致内存溢出&#xff1a;
数据不平衡除了有可能导致内存溢出外&#xff0c;也有可能导致性能的问题&#xff0c;解决方法和上面说的类似&#xff0c;就是调用repartition重新分区。这里就不再累赘了。

3.coalesce调用导致内存溢出&#xff1a;
这是我最近才遇到的一个问题&#xff0c;因为hdfs中不适合存小问题&#xff0c;所以Spark计算后如果产生的文件太小&#xff0c;我们会调用coalesce合并文件再存入hdfs中。但是这会导致一个问题&#xff0c;例如在coalesce之前有100个文件&#xff0c;这也意味着能够有100个Task&#xff0c;现在调用coalesce(10)&#xff0c;最后只产生10个文件&#xff0c;因为coalesce并不是shuffle操作&#xff0c;这意味着coalesce并不是按照我原本想的那样先执行100个Task&#xff0c;再将Task的执行结果合并成10个&#xff0c;而是从头到位只有10个Task在执行&#xff0c;原本100个文件是分开执行的&#xff0c;现在每个Task同时一次读取10个文件&#xff0c;使用的内存是原来的10倍&#xff0c;这导致了OOM。解决这个问题的方法是令程序按照我们想的先执行100个Task再将结果合并成10个文件&#xff0c;这个问题同样可以通过repartition解决&#xff0c;调用repartition(10)&#xff0c;因为这就有一个shuffle的过程&#xff0c;shuffle前后是两个Stage&#xff0c;一个100个分区&#xff0c;一个是10个分区&#xff0c;就能按照我们的想法执行。

4.shuffle后内存溢出&#xff1a;
shuffle内存溢出的情况可以说都是shuffle后&#xff0c;单个文件过大导致的。在Spark中&#xff0c;join&#xff0c;reduceByKey这一类型的过程&#xff0c;都会有shuffle的过程&#xff0c;在shuffle的使用&#xff0c;需要传入一个partitioner&#xff0c;大部分Spark中的shuffle操作&#xff0c;默认的partitioner都是HashPatitioner&#xff0c;默认值是父RDD中最大的分区数,这个参数通过spark.default.parallelism控制(在spark-sql中用spark.sql.shuffle.partitions) &#xff0c; spark.default.parallelism参数只对HashPartitioner有效&#xff0c;所以如果是别的Partitioner或者自己实现的Partitioner就不能使用spark.default.parallelism这个参数来控制shuffle的并发量了。如果是别的partitioner导致的shuffle内存溢出&#xff0c;就需要从partitioner的代码增加partitions的数量。

5. standalone模式下资源分配不均匀导致内存溢出&#xff1a;
在standalone的模式下如果配置了--total-executor-cores 和 --executor-memory 这两个参数&#xff0c;但是没有配置--executor-cores这个参数的话&#xff0c;就有可能导致&#xff0c;每个Executor的memory是一样的&#xff0c;但是cores的数量不同&#xff0c;那么在cores数量多的Executor中&#xff0c;由于能够同时执行多个Task&#xff0c;就容易导致内存溢出的情况。这种情况的解决方法就是同时配置--executor-cores或者spark.executor.cores参数&#xff0c;确保Executor资源分配均匀。

6.在RDD中&#xff0c;共用对象能够减少OOM的情况&#xff1a;
这个比较特殊&#xff0c;这里说记录一下&#xff0c;遇到过一种情况&#xff0c;类似这样rdd.flatMap(x&#61;>for(i <- 1 to 1000) yield ("key","value"))导致OOM&#xff0c;但是在同样的情况下&#xff0c;使用rdd.flatMap(x&#61;>for(i <- 1 to 1000) yield "key"&#43;"value")就不会有OOM的问题&#xff0c;这是因为每次("key","value")都产生一个Tuple对象&#xff0c;而"key"&#43;"value"&#xff0c;不管多少个&#xff0c;都只有一个对象&#xff0c;指向常量池。具体测试如下&#xff1a;

这个例子说明("key","value")和("key","value")在内存中是存在不同位置的,也就是存了两份,但是"key"&#43;"value"虽然出现了两次,但是只存了一份,在同一个地址,这用到了JVM常量池的知识.于是乎,如果RDD中有大量的重复数据,或者Array中需要存大量重复数据的时候我们都可以将重复数据转化为String,能够有效的减少内存使用.

优化&#xff1a;
这一部分主要记录一下到spark-1.6.1版本&#xff0c;笔者觉得有优化性能作用的一些参数配置和一些代码优化技巧&#xff0c;在参数优化部分&#xff0c;如果笔者觉得默认值是最优的了&#xff0c;这里就不再记录。
代码优化技巧&#xff1a;
1.使用mapPartitions代替大部分map操作&#xff0c;或者连续使用的map操作&#xff1a;
这里需要稍微讲一下RDD和DataFrame的区别。RDD强调的是不可变对象&#xff0c;每个RDD都是不可变的&#xff0c;当调用RDD的map类型操作的时候&#xff0c;都是产生一个新的对象&#xff0c;这就导致了一个问题&#xff0c;如果对一个RDD调用大量的map类型操作的话&#xff0c;每个map操作会产生一个到多个RDD对象&#xff0c;这虽然不一定会导致内存溢出&#xff0c;但是会产生大量的中间数据&#xff0c;增加了gc操作。另外RDD在调用action操作的时候&#xff0c;会出发Stage的划分&#xff0c;但是在每个Stage内部可优化的部分是不会进行优化的&#xff0c;例如rdd.map(_&#43;1).map(_&#43;1)&#xff0c;这个操作在数值型RDD中是等价于rdd.map(_&#43;2)的&#xff0c;但是RDD内部不会对这个过程进行优化。DataFrame则不同&#xff0c;DataFrame由于有类型信息所以是可变的&#xff0c;并且在可以使用sql的程序中&#xff0c;都有除了解释器外&#xff0c;都会有一个sql优化器&#xff0c;DataFrame也不例外&#xff0c;有一个优化器Catalyst&#xff0c;具体介绍看后面参考的文章。
上面说到的这些RDD的弊端&#xff0c;有一部分就可以使用mapPartitions进行优化&#xff0c;mapPartitions可以同时替代rdd.map,rdd.filter,rdd.flatMap的作用&#xff0c;所以在长操作中&#xff0c;可以在mapPartitons中将RDD大量的操作写在一起&#xff0c;避免产生大量的中间rdd对象&#xff0c;另外是mapPartitions在一个partition中可以复用可变类型&#xff0c;这也能够避免频繁的创建新对象。使用mapPartitions的弊端就是牺牲了代码的易读性。

2.broadcast join和普通join&#xff1a;
在大数据分布式系统中&#xff0c;大量数据的移动对性能的影响也是巨大的。基于这个思想&#xff0c;在两个RDD进行join操作的时候&#xff0c;如果其中一个RDD相对小很多&#xff0c;可以将小的RDD进行collect操作然后设置为broadcast变量&#xff0c;这样做之后&#xff0c;另一个RDD就可以使用map操作进行join&#xff0c;这样能够有效的减少相对大很多的那个RDD的数据移动。

3.先filter在join&#xff1a;
这个就是谓词下推&#xff0c;这个很显然&#xff0c;filter之后再join&#xff0c;shuffle的数据量会减少&#xff0c;这里提一点是spark-sql的优化器已经对这部分有优化了&#xff0c;不需要用户显示的操作&#xff0c;个人实现rdd的计算的时候需要注意这个。

4.partitonBy优化&#xff1a;
这一部分在另一篇文章《spark partitioner使用技巧 》有详细介绍&#xff0c;这里不说了。

5. combineByKey的使用&#xff1a;
这个操作在Map-Reduce中也有&#xff0c;这里举个例子&#xff1a;rdd.groupByKey().mapValue(_.sum)比rdd.reduceByKey的效率低&#xff0c;原因如下两幅图所示(网上盗来的&#xff0c;侵删)



上下两幅图的区别就是上面那幅有combineByKey的过程减少了shuffle的数据量&#xff0c;下面的没有。combineByKey是key-value型rdd自带的API&#xff0c;可以直接使用。

6. 在内存不足的使用&#xff0c;使用rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)代替rdd.cache():
rdd.cache()和rdd.persist(Storage.MEMORY_ONLY)是等价的&#xff0c;在内存不足的时候rdd.cache()的数据会丢失&#xff0c;再次使用的时候会重算&#xff0c;而rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)在内存不足的时候会存储在磁盘&#xff0c;避免重算&#xff0c;只是消耗点IO时间。

7.在spark使用hbase的时候&#xff0c;spark和hbase搭建在同一个集群&#xff1a;
在spark结合hbase的使用中&#xff0c;spark和hbase最好搭建在同一个集群上上&#xff0c;或者spark的集群节点能够覆盖hbase的所有节点。hbase中的数据存储在HFile中&#xff0c;通常单个HFile都会比较大&#xff0c;另外Spark在读取Hbase的数据的时候&#xff0c;不是按照一个HFile对应一个RDD的分区&#xff0c;而是一个region对应一个RDD分区。所以在Spark读取Hbase的数据时&#xff0c;通常单个RDD都会比较大&#xff0c;如果不是搭建在同一个集群&#xff0c;数据移动会耗费很多的时间。

参数优化部分&#xff1a;
8. spark.driver.memory (default 1g)&#xff1a;
这个参数用来设置Driver的内存。在Spark程序中&#xff0c;SparkContext&#xff0c;DAGScheduler都是运行在Driver端的。对应rdd的Stage切分也是在Driver端运行&#xff0c;如果用户自己写的程序有过多的步骤&#xff0c;切分出过多的Stage&#xff0c;这部分信息消耗的是Driver的内存&#xff0c;这个时候就需要调大Driver的内存。

9. spark.rdd.compress (default false) &#xff1a;
这个参数在内存吃紧的时候&#xff0c;又需要persist数据有良好的性能&#xff0c;就可以设置这个参数为true&#xff0c;这样在使用persist(StorageLevel.MEMORY_ONLY_SER)的时候&#xff0c;就能够压缩内存中的rdd数据。减少内存消耗&#xff0c;就是在使用的时候会占用CPU的解压时间。

10. spark.serializer (default org.apache.spark.serializer.JavaSerializer )
建议设置为 org.apache.spark.serializer.KryoSerializer&#xff0c;因为KryoSerializer比JavaSerializer快&#xff0c;但是有可能会有些Object会序列化失败&#xff0c;这个时候就需要显示的对序列化失败的类进行KryoSerializer的注册&#xff0c;这个时候要配置spark.kryo.registrator参数或者使用参照如下代码&#xff1a;
valconf&#61;newSparkConf().setMaster(...).setAppName(...)
conf.registerKryoClasses(Array(classOf[MyClass1]
,classOf[MyClass2]))
valsc &#61;newSparkContext(conf)

11. spark.memory.storageFraction (default 0.5)
这个参数设置内存表示 Executor内存中 storage/(storage&#43;execution)&#xff0c;虽然spark-1.6.0&#43;的版本内存storage和execution的内存已经是可以互相借用的了&#xff0c;但是借用和赎回也是需要消耗性能的&#xff0c;所以如果明知道程序中storage是多是少就可以调节一下这个参数。

12.spark.locality.wait (default 3s)&#xff1a;
spark中有4中本地化执行level&#xff0c;PROCESS_LOCAL->NODE_LOCAL->RACK_LOCAL->ANY,一个task执行完&#xff0c;等待spark.locality.wait时间如果&#xff0c;第一次等待PROCESS的Task到达&#xff0c;如果没有&#xff0c;等待任务的等级下调到NODE再等待spark.locality.wait时间&#xff0c;依次类推&#xff0c;直到ANY。分布式系统是否能够很好的执行本地文件对性能的影响也是很大的。如果RDD的每个分区数据比较多&#xff0c;每个分区处理时间过长&#xff0c;就应该把 spark.locality.wait 适当调大一点&#xff0c;让Task能够有更多的时间等待本地数据。特别是在使用persist或者cache后&#xff0c;这两个操作过后&#xff0c;在本地机器调用内存中保存的数据效率会很高&#xff0c;但是如果需要跨机器传输内存中的数据&#xff0c;效率就会很低。

13. spark.speculation (default false):
一个大的集群中&#xff0c;每个节点的性能会有差异&#xff0c;spark.speculation这个参数表示空闲的资源节点会不会尝试执行还在运行&#xff0c;并且运行时间过长的Task&#xff0c;避免单个节点运行速度过慢导致整个任务卡在一个节点上。这个参数最好设置为true。与之相配合可以一起设置的参数有spark.speculation.×开头的参数。参考中有文章详细说明这个参数。

以后有遇到新的内容再补充。

参考&#xff1a;
1. http://www.jianshu.com/p/c0181667daa0
2. http://www.csdn.net/article/2015-06-18/2824958
3. https://chenzhongpu.gitbooks.io/bigdatanotes/content/SparkSQLOptimizer/index.html
4. http://book.51cto.com/art/201409/453045.htm


转载于:https://www.cnblogs.com/jiangxiaoxian/p/7442022.html


推荐阅读
  • 单页面应用 VS 多页面应用的区别和适用场景
    本文主要介绍了单页面应用(SPA)和多页面应用(MPA)的区别和适用场景。单页面应用只有一个主页面,所有内容都包含在主页面中,页面切换快但需要做相关的调优;多页面应用有多个独立的页面,每个页面都要加载相关资源,页面切换慢但适用于对SEO要求较高的应用。文章还提到了两者在资源加载、过渡动画、路由模式和数据传递方面的差异。 ... [详细]
  • 本文介绍了Java工具类库Hutool,该工具包封装了对文件、流、加密解密、转码、正则、线程、XML等JDK方法的封装,并提供了各种Util工具类。同时,还介绍了Hutool的组件,包括动态代理、布隆过滤、缓存、定时任务等功能。该工具包可以简化Java代码,提高开发效率。 ... [详细]
  • 阿里Treebased Deep Match(TDM) 学习笔记及技术发展回顾
    本文介绍了阿里Treebased Deep Match(TDM)的学习笔记,同时回顾了工业界技术发展的几代演进。从基于统计的启发式规则方法到基于内积模型的向量检索方法,再到引入复杂深度学习模型的下一代匹配技术。文章详细解释了基于统计的启发式规则方法和基于内积模型的向量检索方法的原理和应用,并介绍了TDM的背景和优势。最后,文章提到了向量距离和基于向量聚类的索引结构对于加速匹配效率的作用。本文对于理解TDM的学习过程和了解匹配技术的发展具有重要意义。 ... [详细]
  • 开发笔记:加密&json&StringIO模块&BytesIO模块
    篇首语:本文由编程笔记#小编为大家整理,主要介绍了加密&json&StringIO模块&BytesIO模块相关的知识,希望对你有一定的参考价值。一、加密加密 ... [详细]
  • t-io 2.0.0发布-法网天眼第一版的回顾和更新说明
    本文回顾了t-io 1.x版本的工程结构和性能数据,并介绍了t-io在码云上的成绩和用户反馈。同时,还提到了@openSeLi同学发布的t-io 30W长连接并发压力测试报告。最后,详细介绍了t-io 2.0.0版本的更新内容,包括更简洁的使用方式和内置的httpsession功能。 ... [详细]
  • 本文介绍了在Oracle数据库中创建序列时如何选择cache或nocache参数。cache参数可以提高序列的存取速度,但可能会导致序列丢失;nocache参数可以避免序列丢失,但在高并发访问时可能导致性能问题。文章详细解释了两者的区别和使用场景。 ... [详细]
  • Oracle seg,V$TEMPSEG_USAGE与Oracle排序的关系及使用方法
    本文介绍了Oracle seg,V$TEMPSEG_USAGE与Oracle排序之间的关系,V$TEMPSEG_USAGE是V_$SORT_USAGE的同义词,通过查询dba_objects和dba_synonyms视图可以了解到它们的详细信息。同时,还探讨了V$TEMPSEG_USAGE的使用方法。 ... [详细]
  • 本文介绍了操作系统的定义和功能,包括操作系统的本质、用户界面以及系统调用的分类。同时还介绍了进程和线程的区别,包括进程和线程的定义和作用。 ... [详细]
  • 上图是InnoDB存储引擎的结构。1、缓冲池InnoDB存储引擎是基于磁盘存储的,并将其中的记录按照页的方式进行管理。因此可以看作是基于磁盘的数据库系统。在数据库系统中,由于CPU速度 ... [详细]
  • HashMap的相关问题及其底层数据结构和操作流程
    本文介绍了关于HashMap的相关问题,包括其底层数据结构、JDK1.7和JDK1.8的差异、红黑树的使用、扩容和树化的条件、退化为链表的情况、索引的计算方法、hashcode和hash()方法的作用、数组容量的选择、Put方法的流程以及并发问题下的操作。文章还提到了扩容死链和数据错乱的问题,并探讨了key的设计要求。对于对Java面试中的HashMap问题感兴趣的读者,本文将为您提供一些有用的技术和经验。 ... [详细]
  • 本文介绍了在Android开发中使用软引用和弱引用的应用。如果一个对象只具有软引用,那么只有在内存不够的情况下才会被回收,可以用来实现内存敏感的高速缓存;而如果一个对象只具有弱引用,不管内存是否足够,都会被垃圾回收器回收。软引用和弱引用还可以与引用队列联合使用,当被引用的对象被回收时,会将引用加入到关联的引用队列中。软引用和弱引用的根本区别在于生命周期的长短,弱引用的对象可能随时被回收,而软引用的对象只有在内存不够时才会被回收。 ... [详细]
  • 本文介绍了H5游戏性能优化和调试技巧,包括从问题表象出发进行优化、排除外部问题导致的卡顿、帧率设定、减少drawcall的方法、UI优化和图集渲染等八个理念。对于游戏程序员来说,解决游戏性能问题是一个关键的任务,本文提供了一些有用的参考价值。摘要长度为183字。 ... [详细]
  • 初识java关于JDK、JRE、JVM 了解一下 ... [详细]
  • 生产环境下JVM调优参数的设置实例
     正文前先来一波福利推荐: 福利一:百万年薪架构师视频,该视频可以学到很多东西,是本人花钱买的VIP课程,学习消化了一年,为了支持一下女朋友公众号也方便大家学习,共享给大家。福利二 ... [详细]
  • 尾部|柜台_Java并发线程池篇附场景分析
    篇首语:本文由编程笔记#小编为大家整理,主要介绍了Java并发-线程池篇-附场景分析相关的知识,希望对你有一定的参考价值。作者:汤圆个人博客 ... [详细]
author-avatar
艳斐儿M
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有