作者:朝阳又起风云 | 来源:互联网 | 2023-10-10 02:23
第一:Hashshuffle第二:shufflePluggable第三:SortedShuffle第四:Shuffle性能优化------------------
第一:Hash shuffle
第二:shuffle Pluggable
第三:Sorted Shuffle
第四:Shuffle 性能优化
-------------------------------
-------------------------------------------------
spark适合处理中小规模的数据?怎么理解?
spark版本低时只有hash一种方式,对大规模的数据无法适应,但是增加sorted以后,特别是2.0之后的trunsten推出,spark可以胜任任意规模的数据。
一:到底什么是Shuffle?
Shuffle中文翻译为“洗牌”,需要Shuffle的关键性原因是某种具有共同特征的数据需要最终汇聚到一个计算节点上进行计算。
harli经典语录:
Shuffle就是洗牌,洗牌就是数据重组,重组就是修改数据的组织结构。如果前后组织结构相同,就不需要重组,即不需要洗牌。
对应的,就是协同分区的概念。例如:self.partitiOner== Some(partitioner) 这是什么情况下会发生的?
分区器相同,组织结构就相同,就不需要重组。分区器相同,分区器是用来组织数据的算法。所以需要多次迭代时,先重组,然后通过协同分区,避免迭代过程中多次重组。就像多次使用数据时,先缓存,避免迭代时多次读取数据到内存。co-partition就是分区器相同,协作,相同的分区器。
分区器是在rdd中的,每个rdd都有自己的数据组织形式,后rdd从前rdd获取分区数据,这时候如果分区器相同,数据组织结构相同,就直接获取整个分区,而不需要把分区内的数据重组到新的结构,新的分区中。Shuffle,输出时,就是根据后续rdd需要的结构(分区器)输出,对应得到的数据结构和后续RDD一样了,这样读取的时候就整块读取。需要注意一点,很多人经常忽略的,就是分区器本身是包含分区数的,不仅仅是分到哪个分区的算法,比如哪个key对应到哪个分区,同时还需要有相同的分区个数,虽然通常分区算法和分区个数也是有关联的!
这里是说RDD 有几个分区数量?
比如,都用key的hash进行分区,但个数不同时,数据组织形式就不同....
有一大块数据,这么数据可以分成几块小数据
二:Shuffle可能面临的问题?
运行Task的时候才会产生Shuffle(Shuffle已经融化在Spark的算子中了)。
1, 数据量非常大;
2, 数据如何分类,即如何Partition,Hash、Sort、钨丝计算;
3, 负载均衡(数据倾斜);
4, 网络传输效率,需要在压缩和解压缩之间做出权衡,序列化和反序列也是要考虑的问题;
说明:具体的Task进行计算的时候尽一切最大可能使得数据具备Process Locality的特性;退而求次是增加数据分片,减少每个Task处理的数据量。
三:Hash Shuffle
1, key不能是Array;
2, Hash Shuffle不需要排序,此时从理论上讲就节省了Hadoop MapReduce中进行Shuffle需要排序时候的时间浪费,因为实际生产环境有大量的不需要排序的Shuffle类型;
思考:不需要排序的Hash Shuffle是否一定比需要排序的Sorted Shuffle速度更快?不一定!如果数据规模比较小的情形下,Hash Shuffle会比Sorted Shuffle速度快(很多)!但是如果数据量大,此时Sorted Shuffle一般都会比Hash Shuffle快(很多)
3,每个ShuffleMapTask会根据key的哈希值计算出当前的key需要写入的Partition,然后把决定后的结果写入当单独的文件,此时会
导致每个Task产生R(指下一个Stage的并行度)个文件(Task的个数等于当前stage最后一个RDD的partition的数目),如果当前的Stage中有M个ShuffleMapTask,则会M*R个文件!!!
注意:Shuffle操作绝大多数情况下都要通过网络,如果Mapper和Reducer在同一台机器上,此时只需要读取本地磁盘即可。
Hash Shuffle的两大死穴:第一:Shuffle前会产生海量的小文件于磁盘之上,此时会产生大量耗时低效的IO操作;第二:内存不共用!!!由于内存中需要保存海量的文件操作句柄和临时缓存信息,如果数据处理规模比较庞大的话,内存不可承受,出现OOM等问题
三:Sorted Shuffle:
为了改善上述的问题(同时打开过多文件导致Writer Handler内存使用过大以及产生过度文件导致大量的随机读写带来的效率极为低下的磁盘IO操作),Spark后来推出了Consalidate机制,来把小文件合并,此时Shuffle时文件产生的数量为cores*R,对于ShuffleMapTask的数量明显多于同时可用的并行Cores的数量的情况下,Shuffle产生的文件会大幅度减少,会极大降低OOM的可能;
为此Spark推出了Shuffle Pluggable开放框架,方便系统升级的时候定制Shuffle功能模块,也方便第三方系统改造人员根据实际的业务场景来开放具体最佳的Shuffle模块;核心接口ShuffleManager,具体默认实现有HashShuffleManager、SortShuffleManager等,Spark 1.6.0中具体的配置如下:
val shortShuffleMgrNames = Map(
"hash" -> "org.apache.spark.shuffle.hash.HashShuffleManager",
"sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager",
"tungsten-sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager")