热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

SparkShuffle原理讲解以及调优【小二讲堂】

Sparkshuffle一、SparkShuffle二、Spark1.6.x三、SortShuffle(Spark2.3.x,Spark1.6.x&#x


Spark shuffle

  • 一、Spark Shuffle
  • 二、Spark 1.6.x
  • 三、Sort Shuffle(Spark 2.3.x,Spark 1.6.x)
      • 1.普通运行机制
      • 2..bypass优化机制
  • 四、Spark Shuffle总结
      • 细节解释
  • 五、Spark shuffle调优


一、Spark Shuffle

在DAG调度的过程中,Stage阶段的划分是根据是否有shuffle过程,也就是存在ShuffleDependency宽依赖的时候,需要进行shuffle,这时候会将作业job划分成多个Stage;并且在划分Stage的时候,构建ShuffleDependency的时候进行shuffle注册,获取后续数据读取所需要的ShuffleHandle,最终每一个job提交后都会生成一个ResultStage和若干个ShuffleMapStage,其中ResultStage表示生成作业的最终结果所在的Stage. ResultStage与ShuffleMapStage中的task分别对应着ResultTask与ShuffleMapTask。一个作业,除了最终的ResultStage外,其他若干ShuffleMapStage中各个ShuffleMapTask都需要将最终的数据根据相应的Partitioner对数据进行分组,然后持久化分区的数据。


  • shuffle
    就是在不同stage上的相同key的数据文件都汇聚成一个文件,当然不同节点上都会形成,reduce task拉取数据的时候,将不同节点不同分区中相同key的数据拉取到一起,形成一个文件,这个文件中所有的key是相同的,是所有节点上相同key的集合。
  • 聚合类型
    – shuffle write:上一个stage的map task就必须保障自己处理的当前分区的数据相同的key写入到同一个分区文件中
    – shuffle read:reduce task就会从上一个stage的所有task所在的节点上寻找属于自己的分区文件。这样就可以保证每一个key,所对应的value,都会汇聚到同一个节点上去处理。
    在这里插入图片描述

上图是由task任务执行的stage中每个partition中的数据最后溢写成不同的小文件
在HashShuffle没有优化之前,每一个ShufflleMapTask会为每一个ReduceTask创建一个bucket缓存,并且会为每一个bucket创建一个文件。这个bucket存放的数据就是经过Partitioner操作(默认是HashPartitioner)之后找到对应的bucket然后放进去,最后将数据刷新bucket缓存的数据到磁盘上,即对应的block file。
然后ShuffleMapTask将输出作为MapStatus发送到DAGScheduler的MapOutputTrackerMaster,每一个MapStatus包含了每一个ResultTask要拉取的数据的位置和大小

ResultTask然后去利用BlockStoreShuffleFetcher向MapOutputTrackerMaster获取MapStatus,看哪一份数据是属于自己的,然后底层通过BlockManager将数据拉取过来

拉取过来的数据会组成一个内部的ShuffleRDD,优先放入内存,内存不够用则放入磁盘,然后ResulTask开始进行聚合,最后生成我们希望获取的那个MapPartitionRDD
缺点:
如上图所示:在这里有1个worker,2个executor,每一个executor运行2个ShuffleMapTask,有三个ReduceTask,所以总共就有4 * 3=12个bucket和12个block file。
#如果数据量较大,将会生成MR个小文件,比如ShuffleMapTask有100个,ResultTask有100个,这就会产生100100=10000个小文件
#bucket缓存很重要,需要将ShuffleMapTask所有数据都写入bucket,才会刷到磁盘,那么如果Map端数据过多,这就很容易造成内存溢出,尽管后面有优化,bucket写入的数据达到刷新到磁盘的阀值之后,就会将数据一点一点的刷新到磁盘,但是这样磁盘I/O就多了
在这里插入图片描述
在task任务进行向内存中溢写的时候,首先在executor中会根据reduce的个数分配三个缓冲区,task处理的数据,进入executor时会写入buffer缓冲区中,这里会通过key.hashCode%numBuffer,用key的哈希值对Buffer缓冲区的个数取模进行分配数据取那个缓冲区,这个Buffer缓冲区的大小默认是32k,当buffer缓冲区中的数据写满32k时,会将数据进行溢写到磁盘上,溢写成一个磁盘文件,这样当一个task执行完毕时,就会形成很多个task磁盘文件,当task很多时,这样形成的小文件就是特别多的,这样就会造成很多问题:产生的文件个数:MR(map个数reduce个数)

1.造成了IO量的剧增,当reduce进行拉取数据时,效率也就低下了。
2.初次之外还在进行溢写磁盘文件时,肯定会创建大量对象的,这里GC压力过大,导致OOM。
3.进行拉取数据时,首先可定会进行大量的连接,大连接数过多时,肯定会造成连接的中断问题过多。这样会造成taskScheduler进行重试(默认重试3次),三次不成功,DAGScheduler进行重新计算分配Task任务,重新跑task任务,这样的效率是极为低下的。


二、Spark 1.6.x

对上面问题进行的优化
1.HashShuffle
在这里插入图片描述
由上图可以看出运行原理基本和上面相同,就是在进行想缓冲去中写文件时进行了优化,由一个executor中的两个task分别共用一个buffer,这样两个task任务会将取模后相同的buffer缓冲区的数据写到一块 ,最后溢写成对应的一个磁盘小文件。减少了至少一半的磁盘小文件。虽然减少了磁盘小文件但是,在面对大数据的巨量数据集下,这种优化还是有着很大的问题的,和上面一样的问题。
磁盘文件个数:ER(一个Executor产生的小文件个数Reduce的个数)。


三、Sort Shuffle(Spark 2.3.x,Spark 1.6.x)


1.普通运行机制

在这里插入图片描述
在task进行运行的时候,有一个估算机制,会加你估算执行task任务之前所需要的缓冲区大小。
在申请内存的时候,估算机制是:2*估算值-当前。executor会专门开启一块内存用于后续内存的申请。
首先task会将数据一块内存中,然后达到一定的阈值之后进行溢写文件,溢写文件的时候,将数据向事先估算好的内存缓冲中,但是在这之前会将数据的key的hashCode对缓冲区的个数进行取模,这样根据取模的结果进行分配不同的数据到不同的缓冲区中,写到之前首先会进行排序,这里的排序和MapReduce中的分区排序一样。将数据加载到不同的内存缓冲区中。并且到内存缓冲区的值达到一定阈值后会溢写成响应的磁盘文件,最后当map task 执行完毕的时候,会将所有缓冲区中的文件进行溢写到对应的磁盘文件上。当mpa task执行完毕之后会通过归并排序将所有的数据文件合并成一个大文件,这里比如有0,1,2好对应的的文件,则会将不同的分区文件拉取到一起并且0号对应的文件会在大文件的前面,1号在后面,并且归并拉取的同时会创建一个索引文件,来记录数据的位置。
最后由reduce task会将0号对应的磁盘文件拉取过来,包括不同节点上的磁盘文件。


  • 总结:
    在执行过程中map task产生的磁盘小文件是2*M个,即一个map task会产生两个磁盘小文件,即一个是数据文件、另一个是数据文件对应的索引文件。
    这样大大减少了磁盘文件,磁盘IO访问量,以及数据拉取时的网络IO访问量。

2…bypass优化机制

在这里插入图片描述
和sortshuffle机制的运行原理相同,只不过少了排序的过程,这里是针对不需要进行排序的需求进行的运行机制。比如repartition,它只是进行重新分区而不需要文件排序。
这里的运行效率就大大提高了


  • 使用bypass的条件:
    1.reduce端不能有聚合类型的操作。
    2.reduce task的个数必须要小于spark.shuffle.sort.bypassMergeThreshold 参数的值默认是200个,如果reducetask个数过多时,可以设置这个参数的值。

四、Spark Shuffle总结

Shuffle 过程本质上都是将 Map 端获得的数据使用分区器进行划分,并将数据发送给对应的 Reducer 的过程。
shuffle作为处理连接map端和reduce端的枢纽,其shuffle的性能高低直接影响了整个程序的性能和吞吐量。map端的shuffle一般为shuffle的Write阶段,reduce端的shuffle一般为shuffle的read阶段。Hadoop和spark的shuffle在实现上面存在很大的不同,spark的shuffle分为两种实现,分别为HashShuffle和SortShuffle。

HashShuffle又分为普通机制和合并机制,普通机制因为其会产生MR个数的巨量磁盘小文件而产生大量性能低下的Io操作,从而性能较低,因为其巨量的磁盘小文件还可能导致OOM,HashShuffle的合并机制通过重复利用buffer从而将磁盘小文件的数量降低到CoreR个,但是当Reducer 端的并行任务或者是数据分片过多的时候,依然会产生大量的磁盘小文件。

SortShuffle也分为普通机制和bypass机制,普通机制在内存数据结构(默认为5M)完成排序,会产生2M个磁盘小文件。而当shuffle map task数量小于spark.shuffle.sort.bypassMergeThreshold参数的值。或者算子不是聚合类的shuffle算子(比如reduceByKey)的时候会触发SortShuffle的bypass机制,SortShuffle的bypass机制不会进行排序,极大的提高了其性能。

在Spark 1.6以前,默认的shuffle计算引擎是HashShuffleManager,因为HashShuffleManager会产生大量的磁盘小文件而性能低下,在Spark 1.6以后的版本中,默认的ShuffleManager改成了SortShuffleManager。SortShuffleManager相较于HashShuffleManager来说,有了一定的改进。主要就在于,每个Task在进行shuffle操作时,虽然也会产生较多的临时磁盘文件,但是最后会将所有的临时文件合并(merge)成一个磁盘文件,因此每个Task就只有一个磁盘文件。在下一个stage的shuffle read task拉取自己的数据时,只要根据索引读取每个磁盘文件中的部分数据即可。


细节解释


  • shuffle中的定时器:
    定时器会检查内存数据结构的大小,如果内存数据结构空间不够,那么会申请额外的内存,申请的大小满足如下公式:
    applyMemory=nowMenory2-oldMemory
    申请的内存=当前的内存情况
    2-上一次的内嵌情况
    意思就是说内存数据结构的大小的动态变化,如果存储的数据超出内存数据结构的大小,将申请内存数据结构存储的数据*2-内存数据结构的设定值的内存大小空间。申请到了,内存数据结构的大小变大,内存不够,申请不到,则发生溢写。
  • 排序
    在溢写到磁盘文件之前,会先根据key对内存数据结构中已有的数据进行排序。
  • 溢写
    排序过后,会分批将数据写入磁盘文件。默认的batch数量是10000条,也就是说,排序好的数据,会以每批1万条数据的形式分批写入磁盘文件。写入磁盘文件是通过Java的BufferedOutputStream实现的。BufferedOutputStream是Java的缓冲输出流,首先会将数据缓冲在内存中,当内存缓冲满溢之后再一次写入磁盘文件中,这样可以减少磁盘IO次数,提升性能。
  • merge
    一个task将所有数据写入内存数据结构的过程中,会发生多次磁盘溢写操作,也就会产生多个临时文件。最后会将之前所有的临时磁盘文件都进行合并,这就是merge过程,此时会将之前所有临时磁盘文件中的数据读取出来,然后依次写入最终的磁盘文件之中。此外,由于一个task就只对应一个磁盘文件,也就意味着该task为Reduce端的stage的task准备的数据都在这一个文件中,因此还会单独写一份索引文件,其中标识了下游各个task的数据在文件中的start offset与end offset。
    SortShuffleManager由于有一个磁盘文件merge的过程,因此大大减少了文件数量。比如第一个stage有50个task,总共有10个Executor,每个Executor执行5个task,而第二个stage有100个task。由于每个task最终只有一个磁盘文件,因此此时每个Executor上只有5个磁盘文件,所有Executor只有50个磁盘文件。
    1)block file= 2M
    一个map task会产生一个索引文件和一个数据大文件

  1. m*r>2m(r>2):SortShuffle会使得磁盘小文件的个数再次的减少

五、Spark shuffle调优


  • spark.shuffle.file.buffer
    默认值:32k
    参数说明:该参数用于设置shuffle write task的BufferedOutputStream的buffer缓冲大小。将数据写到磁盘文件之前,会先写入buffer缓冲中,待缓冲写满之后,才会溢写到磁盘。
    调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如64k),从而减少shuffle write过程中溢写磁盘文件的次数,也就可以减少磁盘IO次数,进而提升性能。在实践中发现,合理调节该参数,性能会有1%~5%的提升。
  • spark.reducer.maxSizeInFlight
    默认值:48m
    参数说明:该参数用于设置shuffle read task的buffer缓冲大小,而这个buffer缓冲决定了每次能够拉取多少数据。
    调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如96m),从而减少拉取数据的次数,也就可以减少网络传输的次数,进而提升性能。在实践中发现,合理调节该参数,性能会有1%~5%的提升。
  • spark.shuffle.io.maxRetries
    默认值:3
    参数说明:shuffle read task从shuffle write task所在节点拉取属于自己的数据时,如果因为网络异常导致拉取失败,是会自动进行重试的。该参数就代表了可以重试的最大次数。如果在指定次数之内拉取还是没有成功,就可能会导致作业执行失败。
    调优建议:对于那些包含了特别耗时的shuffle操作的作业,建议增加重试最大次数(比如60次),以避免由于JVM的full gc或者网络不稳定等因素导致的数据拉取失败。在实践中发现,对于针对超大数据量(数十亿~上百亿)的shuffle过程,调节该参数可以大幅度提升稳定性。
  • spark.shuffle.io.retryWait
    默认值:5s
    参数说明:具体解释同上,该参数代表了每次重试拉取数据的等待间隔,默认是5s。
    调优建议:建议加大间隔时长(比如60s),以增加shuffle操作的稳定性。
  • spark.shuffle.memoryFraction
    默认值:0.2
    参数说明:该参数代表了Executor内存中,分配给shuffle read task进行聚合操作的内存比例,默认是20%。
    调优建议:在资源参数调优中讲解过这个参数。如果内存充足,而且很少使用持久化操作,建议调高这个比例,给shuffle read的聚合操作更多内存,以避免由于内存不足导致聚合过程中频繁读写磁盘。在实践中发现,合理调节该参数可以将性能提升10%左右。
  • spark.shuffle.manager
    默认值:sort
    参数说明:该参数用于设置ShuffleManager的类型。Spark 1.5以后,有三个可选项:hash、sort和tungsten-sort。HashShuffleManager是Spark 1.2以前的默认选项,但是Spark 1.2以及之后的版本默认都是SortShuffleManager了。tungsten-sort与sort类似,但是使用了tungsten计划中的堆外内存管理机制,内存使用效率更高。
    调优建议:由于SortShuffleManager默认会对数据进行排序,因此如果你的业务逻辑中需要该排序机制的话,则使用默认的SortShuffleManager就可以;而如果你的业务逻辑不需要对数据进行排序,那么建议参考后面的几个参数调优,通过bypass机制或优化的HashShuffleManager来避免排序操作,同时提供较好的磁盘读写性能。这里要注意的是,tungsten-sort要慎用,因为之前发现了一些相应的bug。
  • spark.shuffle.sort.bypassMergeThreshold
    默认值:200
    参数说明:当ShuffleManager为SortShuffleManager时,如果shuffle read task的数量小于这个阈值(默认是200),则shuffle write过程中不会进行排序操作,而是直接按照未经优化的HashShuffleManager的方式去写数据,但是最后会将每个task产生的所有临时磁盘文件都合并成一个文件,并会创建单独的索引文件。
    调优建议:当你使用SortShuffleManager时,如果的确不需要排序操作,那么建议将这个参数调大一些,大于shuffle read task的数量。那么此时就会自动启用bypass机制,map-side就不会进行排序了,减少了排序的性能开销。但是这种方式下,依然会产生大量的磁盘文件,因此shuffle write性能有待提高。
  • spark.shuffle.consolidateFiles
    默认值:false
    参数说明:如果使用HashShuffleManager,该参数有效。如果设置为true,那么就会开启consolidate机制,会大幅度合并shuffle write的输出文件,对于shuffle read task数量特别多的情况下,这种方法可以极大地减少磁盘IO开销,提升性能。
    调优建议:如果的确不需要SortShuffleManager的排序机制,那么除了使用bypass机制,还可以尝试将spark.shffle.manager参数手动指定为hash,使用HashShuffleManager,同时开启consolidate机制。在实践中尝试过,发现其性能比开启了bypass机制的SortShuffleManager要高出10%~30%。

MapReduce shuffle讲解:https://blog.csdn.net/Mirror_w/article/details/89421705
小二讲堂:https://blog.csdn.net/Mirror_w
Spark讲堂:https://blog.csdn.net/Mirror_w/article/details/89408567


推荐阅读
  • vue使用
    关键词: ... [详细]
  • 图解redis的持久化存储机制RDB和AOF的原理和优缺点
    本文通过图解的方式介绍了redis的持久化存储机制RDB和AOF的原理和优缺点。RDB是将redis内存中的数据保存为快照文件,恢复速度较快但不支持拉链式快照。AOF是将操作日志保存到磁盘,实时存储数据但恢复速度较慢。文章详细分析了两种机制的优缺点,帮助读者更好地理解redis的持久化存储策略。 ... [详细]
  • sklearn数据集库中的常用数据集类型介绍
    本文介绍了sklearn数据集库中常用的数据集类型,包括玩具数据集和样本生成器。其中详细介绍了波士顿房价数据集,包含了波士顿506处房屋的13种不同特征以及房屋价格,适用于回归任务。 ... [详细]
  • 海马s5近光灯能否直接更换为H7?
    本文主要介绍了海马s5车型的近光灯是否可以直接更换为H7灯泡,并提供了完整的教程下载地址。此外,还详细讲解了DSP功能函数中的数据拷贝、数据填充和浮点数转换为定点数的相关内容。 ... [详细]
  • Linux服务器密码过期策略、登录次数限制、私钥登录等配置方法
    本文介绍了在Linux服务器上进行密码过期策略、登录次数限制、私钥登录等配置的方法。通过修改配置文件中的参数,可以设置密码的有效期、最小间隔时间、最小长度,并在密码过期前进行提示。同时还介绍了如何进行公钥登录和修改默认账户用户名的操作。详细步骤和注意事项可参考本文内容。 ... [详细]
  • 在Android开发中,使用Picasso库可以实现对网络图片的等比例缩放。本文介绍了使用Picasso库进行图片缩放的方法,并提供了具体的代码实现。通过获取图片的宽高,计算目标宽度和高度,并创建新图实现等比例缩放。 ... [详细]
  • t-io 2.0.0发布-法网天眼第一版的回顾和更新说明
    本文回顾了t-io 1.x版本的工程结构和性能数据,并介绍了t-io在码云上的成绩和用户反馈。同时,还提到了@openSeLi同学发布的t-io 30W长连接并发压力测试报告。最后,详细介绍了t-io 2.0.0版本的更新内容,包括更简洁的使用方式和内置的httpsession功能。 ... [详细]
  • 本文介绍了Web学习历程记录中关于Tomcat的基本概念和配置。首先解释了Web静态Web资源和动态Web资源的概念,以及C/S架构和B/S架构的区别。然后介绍了常见的Web服务器,包括Weblogic、WebSphere和Tomcat。接着详细讲解了Tomcat的虚拟主机、web应用和虚拟路径映射的概念和配置过程。最后简要介绍了http协议的作用。本文内容详实,适合初学者了解Tomcat的基础知识。 ... [详细]
  • 本文详细介绍了git常用命令及其操作方法,包括查看、添加、提交、删除、找回等操作,以及如何重置修改文件、抛弃工作区修改、将工作文件提交到本地暂存区、从版本库中删除文件等。同时还介绍了如何从暂存区恢复到工作文件、恢复最近一次提交过的状态,以及如何合并多个操作等。 ... [详细]
  • 本文整理了Java面试中常见的问题及相关概念的解析,包括HashMap中为什么重写equals还要重写hashcode、map的分类和常见情况、final关键字的用法、Synchronized和lock的区别、volatile的介绍、Syncronized锁的作用、构造函数和构造函数重载的概念、方法覆盖和方法重载的区别、反射获取和设置对象私有字段的值的方法、通过反射创建对象的方式以及内部类的详解。 ... [详细]
  • 全面介绍Windows内存管理机制及C++内存分配实例(四):内存映射文件
    本文旨在全面介绍Windows内存管理机制及C++内存分配实例中的内存映射文件。通过对内存映射文件的使用场合和与虚拟内存的区别进行解析,帮助读者更好地理解操作系统的内存管理机制。同时,本文还提供了相关章节的链接,方便读者深入学习Windows内存管理及C++内存分配实例的其他内容。 ... [详细]
  • 本文介绍了使用数据库管理员用户执行onstat -l命令来监控GBase8s数据库的物理日志和逻辑日志的使用情况,并强调了对已使用的逻辑日志是否及时备份的重要性。同时提供了监控方法和注意事项。 ... [详细]
  • 微信官方授权及获取OpenId的方法,服务器通过SpringBoot实现
    主要步骤:前端获取到code(wx.login),传入服务器服务器通过参数AppID和AppSecret访问官方接口,获取到OpenId ... [详细]
  • 本文讨论了如何使用GStreamer来删除H264格式视频文件中的中间部分,而不需要进行重编码。作者提出了使用gst_element_seek(...)函数来实现这个目标的思路,并提到遇到了一个解决不了的BUG。文章还列举了8个解决方案,希望能够得到更好的思路。 ... [详细]
  • 本文总结和分析了JDK核心源码(2)中lang包下的基础知识,包括常用的对象类型包和异常类型包。在对象类型包中,介绍了Object类、String类、StringBuilder类、StringBuffer类和基本元素的包装类。在异常类型包中,介绍了Throwable类、Error类型和Exception类型。这些基础知识对于理解和使用JDK核心源码具有重要意义。 ... [详细]
author-avatar
鹤Cheire_295
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有