热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Sparkshuffle详细过程

有许多场景下,我们需要进行跨服务器的数据整合,比如两个表之间,通过Id进行join操作,你必须确保所有具有相同id的数据整合到相同的块文件中。那么我们先说一下mapreduce的shuffle过程。

有许多场景下,我们需要进行跨服务器的数据整合,比如两个表之间,通过Id进行join操作,你必须确保所有具有相同id的数据整合到相同的块文件中。那么我们先说一下mapreduce的shuffle过程。

 

Mapreduce的shuffle的计算过程是在executor中划分mapper与reducer。Spark的Shuffling中有两个重要的压缩参数。spark.shuffle.compress true---是否将会将shuffle中outputs的过程进行压缩。将spark.io.compression.codec编码器设置为压缩数据,默认是true.同时,通过spark.shuffle.manager 来设置shuffle时的排序算法,有hash,sort,tungsten-sort。(用hash会快一点,我不需要排序啊~)

 

Hash Shuffle

使用hash散列有很多缺点,主要是因为每个Map task都会为每个reduce生成一份文件,所以最后就会有M * R个文件数量。那么如果在比较多的Map和Reduce的情况下就会出问题,输出缓冲区的大小,系统中打开文件的数量,创建和删除所有这些文件的速度都会受到影响。如下图:

 

这里有一个优化的参数spark.shuffle.consolidateFiles,默认为false,当设置成true时,会对mapper output时的文件进行合并。如果你集群有E个executors(“-num-excutors”)以及C个cores("-executor-cores”),以及每个task又T个CPUs(“spark.task.cpus”),那么总共的execution的slot在集群上的个数就是E * C / T(也就是executor个数×CORE的数量/CPU个数)个,那么shuffle过程中所创建的文件就为E * C / T * R(也就是executor个数 × core的个数/CPU个数×reduce个数)个。外文文献写的太公式化,那么我用通俗易懂的形式阐述下。就好比总共的并行度是20(5个executor,4个task)  Map阶段会将数据写入磁盘,当它完成时,他将会以reduce的个数来生成文件数。那么每个executor就只会计算core的数量/cpu个数的tasks.如果task数量大于总共集群并行度,那么将开启下一轮,轮询执行。

速度较快,因为没有再对中间结果进行排序,减少了reduce打开文件时的性能消耗。

当然,当数据是经过序列化以及压缩的。当重新读取文件,数据将进行解压缩与反序列化,这里reduce端数据的拉取有个参数spark.reducer.maxSizeInFlight(默认为48MB),它将决定每次数据从远程的executors中拉取大小。这个拉取过程是由5个并行的request,从不同的executor中拉取过来,从而提升了fetch的效率。 如果你加大了这个参数,那么reducers将会请求更多的文数据进来,它将提高性能,但是也会增加reduce时的内存开销

 

 

Sort Shuffle

Sort Shuffle如同hash shuffle map写入磁盘,reduce拉取数据的一个性质,当在进行sort shuffle时,总共的reducers要小于spark.shuffle.sort.bypassMergeThrshold(默认为200),将会执行回退计划,使用hash将数据写入单独的文件中,然后将这些小文件聚集到一个文件中,从而加快了效率。(实现自BypassMergeSortShuffleWriter中)

那么它的实现逻辑是在reducer端合并mappers的输出结果。Spark在reduce端的排序是用了TimSort,它就是在reduce前,提前用算法进行了排序。  那么用算法的思想来说,合并的M N个元素进行排序,那么其复杂度为O(MNlogM) 具体算法不讲了~要慢慢看~

随之,当你没有足够的内存保存map的输出结果时,在溢出前,会将它们disk到磁盘,那么缓存到内存的大小便是 spark.shuffle.memoryFraction * spark.shuffle.safyFraction.默认的情况下是”JVM Heap Size * 0.2 * 0.8 = JVM Heap Size * 0.16”。需要注意的是,当你多个线程同时在一个executor中运行时(spark.executor.cores/spark.task.cpus 大于1的情况下),那么map output的每个task将会拥有 “JVM Heap Size * spark.shuffle.memoryFraction * spark.shuffle.safetyFraction / spark.executor.cores * spark.task.cpus。运行原理如下图:

使用此种模式,会比使用hashing要慢一点,可通过bypassMergeThreshold找到集群的最快平衡点。

 

Tungsten Sort

使用此种排序方法的优点在于,操作的二进制数据不需要进行反序列化。它使用 sun.misc.Unsafe模式进行直接数据的复制,因为没有反序列化,所以直接是个字节数组。同时,它使用特殊的高效缓存器ShuffleExtemalSorter压记录与指针以及排序的分区id.只用了8 Bytes的空间的排序数组。这将会比使用CPU缓存要效率。

每个spill的数据、指针进行排序,输出到一个索引文件中。随后将这些partitions再次合并到一个输出文件中。

 

本文翻译自一位国外大神的博客:https://0x0fff.com/spark-memory-management/


推荐阅读
  • Android中高级面试必知必会,积累总结
    本文介绍了Android中高级面试的必知必会内容,并总结了相关经验。文章指出,如今的Android市场对开发人员的要求更高,需要更专业的人才。同时,文章还给出了针对Android岗位的职责和要求,并提供了简历突出的建议。 ... [详细]
  • 本文介绍了Java工具类库Hutool,该工具包封装了对文件、流、加密解密、转码、正则、线程、XML等JDK方法的封装,并提供了各种Util工具类。同时,还介绍了Hutool的组件,包括动态代理、布隆过滤、缓存、定时任务等功能。该工具包可以简化Java代码,提高开发效率。 ... [详细]
  • 图解redis的持久化存储机制RDB和AOF的原理和优缺点
    本文通过图解的方式介绍了redis的持久化存储机制RDB和AOF的原理和优缺点。RDB是将redis内存中的数据保存为快照文件,恢复速度较快但不支持拉链式快照。AOF是将操作日志保存到磁盘,实时存储数据但恢复速度较慢。文章详细分析了两种机制的优缺点,帮助读者更好地理解redis的持久化存储策略。 ... [详细]
  • 本文整理了Java面试中常见的问题及相关概念的解析,包括HashMap中为什么重写equals还要重写hashcode、map的分类和常见情况、final关键字的用法、Synchronized和lock的区别、volatile的介绍、Syncronized锁的作用、构造函数和构造函数重载的概念、方法覆盖和方法重载的区别、反射获取和设置对象私有字段的值的方法、通过反射创建对象的方式以及内部类的详解。 ... [详细]
  • 本文介绍了H5游戏性能优化和调试技巧,包括从问题表象出发进行优化、排除外部问题导致的卡顿、帧率设定、减少drawcall的方法、UI优化和图集渲染等八个理念。对于游戏程序员来说,解决游戏性能问题是一个关键的任务,本文提供了一些有用的参考价值。摘要长度为183字。 ... [详细]
  • 一、Hadoop来历Hadoop的思想来源于Google在做搜索引擎的时候出现一个很大的问题就是这么多网页我如何才能以最快的速度来搜索到,由于这个问题Google发明 ... [详细]
  • 开发笔记:加密&json&StringIO模块&BytesIO模块
    篇首语:本文由编程笔记#小编为大家整理,主要介绍了加密&json&StringIO模块&BytesIO模块相关的知识,希望对你有一定的参考价值。一、加密加密 ... [详细]
  • 无损压缩算法专题——LZSS算法实现
    本文介绍了基于无损压缩算法专题的LZSS算法实现。通过Python和C两种语言的代码实现了对任意文件的压缩和解压功能。详细介绍了LZSS算法的原理和实现过程,以及代码中的注释。 ... [详细]
  • C++字符字符串处理及字符集编码方案
    本文介绍了C++中字符字符串处理的问题,并详细解释了字符集编码方案,包括UNICODE、Windows apps采用的UTF-16编码、ASCII、SBCS和DBCS编码方案。同时说明了ANSI C标准和Windows中的字符/字符串数据类型实现。文章还提到了在编译时需要定义UNICODE宏以支持unicode编码,否则将使用windows code page编译。最后,给出了相关的头文件和数据类型定义。 ... [详细]
  • 本文介绍了2020年计算机二级MSOffice的选择习题及答案,详细解析了操作系统的五大功能模块,包括处理器管理、作业管理、存储器管理、设备管理和文件管理。同时,还解答了算法的有穷性的含义。 ... [详细]
  • 第四章高阶函数(参数传递、高阶函数、lambda表达式)(python进阶)的讲解和应用
    本文主要讲解了第四章高阶函数(参数传递、高阶函数、lambda表达式)的相关知识,包括函数参数传递机制和赋值机制、引用传递的概念和应用、默认参数的定义和使用等内容。同时介绍了高阶函数和lambda表达式的概念,并给出了一些实例代码进行演示。对于想要进一步提升python编程能力的读者来说,本文将是一个不错的学习资料。 ... [详细]
  • Android工程师面试准备及设计模式使用场景
    本文介绍了Android工程师面试准备的经验,包括面试流程和重点准备内容。同时,还介绍了建造者模式的使用场景,以及在Android开发中的具体应用。 ... [详细]
  • GreenDAO快速入门
    前言之前在自己做项目的时候,用到了GreenDAO数据库,其实对于数据库辅助工具库从OrmLite,到litePal再到GreenDAO,总是在不停的切换,但是没有真正去了解他们的 ... [详细]
  • Java 11相对于Java 8,OptaPlanner性能提升有多大?
    本文通过基准测试比较了Java 11和Java 8对OptaPlanner的性能提升。测试结果表明,在相同的硬件环境下,Java 11相对于Java 8在垃圾回收方面表现更好,从而提升了OptaPlanner的性能。 ... [详细]
  • Windows7企业版怎样存储安全新功能详解
    本文介绍了电脑公司发布的GHOST WIN7 SP1 X64 通用特别版 V2019.12,软件大小为5.71 GB,支持简体中文,属于国产软件,免费使用。文章还提到了用户评分和软件分类为Win7系统,运行环境为Windows。同时,文章还介绍了平台检测结果,无插件,通过了360、腾讯、金山和瑞星的检测。此外,文章还提到了本地下载文件大小为5.71 GB,需要先下载高速下载器才能进行高速下载。最后,文章详细解释了Windows7企业版的存储安全新功能。 ... [详细]
author-avatar
mobiledu2502858407
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有