热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

【Spark】Spark存储原理shuffle过程

本篇结构:SparkShuffle的发展SparkShuffle中数据结构SparkShuffle原理后记SparkShuffle是sparkjob中某些算子触发的操作。当rdd依

本篇结构:

  • Spark Shuffle 的发展
  • Spark Shuffle 中数据结构
  • Spark Shuffle 原理
  • 后记

Spark Shuffle 是 spark job 中某些算子触发的操作。当 rdd 依赖中出现宽依赖的时候,就会触发 Shuffle 操作,Shuffle 操作通常会伴随着不同 executor/host 之间数据的传输。

Shuffle 操作可能涉及的过程包括数据的排序,聚合,溢写,合并,传输,磁盘IO,网络的 IO 等等。Shuffle 是连接 MapTask 和 ReduceTask 之间的桥梁,Map 的输出到 Reduce 中须经过 Shuffle 环节,Shuffle 的性能高低直接影响了整个程序的性能和吞吐量。

通常 Shuffle 分为两部分:Map 阶段的数据准备( ShuffleMapTask )和Reduce(ShuffleReduceTask) 阶段的数据拷贝处理。一般将在 Map 端的 Shuffle 称之为 Shuffle Write,在 Reduce 端的 Shuffle 称之为 Shuffle Read。

一、Spark Shuffle 的发展

Spark Shuffle 机制总共有三种:

1.1、未优化的 HashShuffle

每一个 ShuffleMapTask 都会为每一个 ReducerTask 创建一个单独的文件,总的文件数是 M * R,其中 M 是 ShuffleMapTask 的数量,R 是 ShuffleReduceTask 的数量。

见下图(来源网络):

《【Spark】Spark 存储原理--shuffle 过程》 image

在处理大数据时,ShuffleMapTask 和 ShuffleReduceTask 的数量很多,创建的磁盘文件数量 M*R 也越多,大量的文件要写磁盘,再从磁盘读出来,不仅会占用大量的时间,而且每个磁盘文件记录的句柄都会保存在内存中(每个人大约 100k),因此也会占用很大的内存空间,频繁的打开和关闭文件,会导致频繁的GC操作,很容易出现 OOM 的情况。

也正是上述原因,该 HashShuffle 如今已退出历史舞台。

1.2、优化后 HashShuffle

在 Spark 0.8.1 版本中,引入了 Consolidation 机制,该机制是对 HashShuffle 的一种优化。

如下图(来源网络):

《【Spark】Spark 存储原理--shuffle 过程》 image

可以明显看出,在一个 core 上连续执行的 ShuffleMapTasks 可以共用一个输出文件 ShuffleFile。

先执行完的 ShuffleMapTask 形成 ShuffleBlock i,后执行的 ShuffleMapTask 可以将输出数据直接追加到 ShuffleBlock i 后面,形成 ShuffleBlock i’,每个 ShuffleBlock 被称为 FileSegment。下一个 stage 的 reducer 只需要 fetch 整个 ShuffleFile 就行了。

这样,每个 worker 持有的文件数降为 cores * R。cores 代表核数,R 是 ShuffleReduceTask 数。

1.3、Sort-Based Shuffle

由于 HashShuffle 会产生很多的磁盘文件,引入 Consolidation 机制虽然在一定程度上减少了磁盘文件数量,但是不足以有效提高 Shuffle 的性能,适合中小型数据规模的大数据处理。

为了让 Spark 在更大规模的集群上更高性能处理更大规模的数据,因此在 Spark 1.1版本中,引入了 SortShuffle。

如下图(来源网络):

《【Spark】Spark 存储原理--shuffle 过程》 image

该机制每一个 ShuffleMapTask 都只创建一个文件,将所有的 ShuffleReduceTask 的输入都写入同一个文件,并且对应生成一个索引文件。

以前的数据是放在内存缓存中,等到数据完了再刷到磁盘,现在为了减少内存的使用,在内存不够用的时候,可以将输出溢写到磁盘,结束的时候,再将这些不同的文件联合内存的数据一起进行归并,从而减少内存的使用量。一方面文件数量显著减少,另一方面减少Writer 缓存所占用的内存大小,而且同时避免 GC 的风险和频率。

但对于 Rueducer 数比较少的情况,Hash Shuffle 要比 Sort Shuffle 快,因此 Sort Shuffle 有个 “fallback” 计划,对于 Reducers 数少于 “spark.shuffle.sort.bypassMergeThreshold” (200 by default),将使用 fallback 计划,hashing 相关数据到分开的文件,然后合并这些文件为一个。

二、Spark Shuffle 中数据结构

2.1、AppendOnlyMap

AppendOnlyMap 单从命名上来看,是一个只能追加元素的 Map 结构。的确,它是只支持追加的 map,可以修改某个 key 对应的 value,但是不能删除已经存在的 key。

底层是由数组结构实现的,当需要对 Key-Value 进行聚合时,会使用AppendOnlyMap 作为 buffer。在插入或者修改元素的时候,会判断是否扩容,如果达到扩容标准,将会对数组 2 倍容量进行扩容,扩容过程中原有元素并不是直接拷贝,而是进行原有元素的重新定位存储,如果集合中存在的数据量大,那么这里的操作将会耗时又耗资源。

存储级别是 Memory-Only ,在 shuffle reduce 数据不会溢写,在数据量不大的情况下可以,但是数据量大时,会极易出现OOM。

2.2、ExternalAppendOnlyMap

继承于AppendOnlyMap ,但是存储级别是 Memory and Disk,即在数据量达到一个阈值的时候,会把数据溢写到磁盘,达到释放内存空间,降低 OOM 的风险的作用。

2.3、PartitionedAppendOnlyMap

是 SortShuffleWriter 中用到的一种数据结构,在 Map 端需要聚合的时候,采用这种数据结构,这种结构也是一种 Hash Table,能够根据 Key,通过 hash(Key),把数据插入到相应的位置。

PartitionedAppendOnlyMap 支持 aggregation,它继承了 SizeTrackingAppendOnlyMap,还实现了 WritablePartitionedPairCollection 接口中的 partitionedDestructiveSortedIterator 抽象方法,在该方法中调用了AppendOnlyMap 的 destructiveSortedIterator 对底层数组进行整理和排序后获得迭代器,数据有可能不会连续存放。

2.4、PartitionedPairBuffer

底层实现和 PartitionedAppendOnlyMap 一样都是 ArrayBuffer,主要用于SortShuffleWriter 中 Map 端不采用聚合和排序时使用。

不支持 aggregation,主要功能是插入值是有顺序的,主要起缓冲作用,只有顺序插入,没有 changeValue(聚合)和 update(更新)操作,数据连续存放在尾部。

三、Spark Shuffle 原理

因为 hash based shuffle 已经退出历史舞台,所以以 spark 2.3 的 sort based shuffle 为例,看 Spark Shuffle 的原理。

Shuffle 的整个生命周期由 ShuffleManager 来管理,Spark 2.3中,唯一的支持方式为 SortShuffleManager,SortShuffleManager 中定义了 writer 和 reader 对应shuffle 的 map 和 reduce 阶段。writer 有三种运行模式:

  • BypassMergeSortShuffleWriter:当前 shuffle 没有聚合, 并且分区数小于 spark.shuffle.sort.bypassMergeThreshold(默认200)
  • UnsafeShuffleWriter:当条件不满足 BypassMergeSortShuffleWriter 时, 并且当前 rdd 的数据支持序列化(即 UnsafeRowSerializer),也不需要聚合, 分区数小于 2^24
  • SortShuffleWriter:其余

3.1、BypassMergeSortShuffleWriter

首先,BypassMergeSortShuffleWriter 的运行机制的触发条件如下:

  • shuffle reduce task(即partition)数量小于spark.shuffle.sort.bypassMergeThreshold 参数的值。
  • 没有map side aggregations。
    note: map side aggregations是指在 map 端的聚合操作,通常来说一些聚合类的算子都会都 map 端的 aggregation。不过对于 groupByKey 和combineByKey, 如果设定 mapSideCombine 为false,就不会有 map side aggregations。

图片来源网络(正确的应该有三个 ReduceTask):

《【Spark】Spark 存储原理--shuffle 过程》 image

BypassMergeSortShuffleHandle 算法适用于没有聚合,数据量不大的场景。它为 reduce 端的每个分区,创建一个 DiskBlockObjectWriter。根据 Key 判断分区索引,然后添加到对应的 DiskBlockObjectWriter,写入到对应的临时文件。

《【Spark】Spark 存储原理--shuffle 过程》 image

因为写入磁盘文件是通过 Java的 BufferedOutputStream 实现的,BufferedOutputStream 是 Java 的缓冲输出流,首先会将数据缓冲在内存中,当内存缓冲满溢之后再一次写入磁盘文件中,这样可以减少磁盘 IO 次数,提升性能。所以图中会有内存缓冲的概念。

最后,会将所有临时文件合并成一个磁盘文件,并创建一个索引文件标识下游各个 reduce task 的数据在文件中的 start offset与 end offset。

该过程的磁盘写机制其实跟未经优化的 HashShuffleManager 是一样的,也会创建很多的临时文件(所以触发条件中会有 reduce task 数量限制),只是在最后会做一个磁盘文件的合并,对于 shuffle reader 会更友好一些。

public void write(Iterator> records) throws IOException {
assert (partitiOnWriters== null);
if (!records.hasNext()) {
partitiOnLengths= new long[numPartitions];
shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, null);
mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
return;
}
final SerializerInstance serInstance = serializer.newInstance();
final long openStartTime = System.nanoTime();
// DiskBlockObjectWriter 数组,索引是 reduce 端的分区索引
partitiOnWriters= new DiskBlockObjectWriter[numPartitions];
// FileSegment数组,索引是 reduce 端的分区索引
partitiOnWriterSegments= new FileSegment[numPartitions];
// 为每个 reduce 端的分区,创建临时 Block 和文件
for (int i = 0; i final Tuple2 tempShuffleBlockIdPlusFile =
blockManager.diskBlockManager().createTempShuffleBlock();
final File file = tempShuffleBlockIdPlusFile._2();
final BlockId blockId = tempShuffleBlockIdPlusFile._1();
partitionWriters[i] =
blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, writeMetrics);
}
// Creating the file to write to and creating a disk writer both involve interacting with
// the disk, and can take a long time in aggregate when we open many files, so should be
// included in the shuffle write time.
writeMetrics.incWriteTime(System.nanoTime() - openStartTime);
// 遍历数据,根据key找到分区索引,存到对应的文件中
while (records.hasNext()) {
final Product2 record = records.next();
// 获取数据的key
final K key = record._1();
// 根据reduce端的分区器,判断该条数据应该存在reduce端的哪个分区
// 并且通过DiskBlockObjectWriter,存到对应的文件中
partitionWriters[partitioner.getPartition(key)].write(key, record._2());
}
for (int i = 0; i final DiskBlockObjectWriter writer = partitionWriters[i];
// 调用DiskBlockObjectWriter的commitAndGet方法,获取FileSegment,包含写入的数据信息
partitionWriterSegments[i] = writer.commitAndGet();
writer.close();
}
// 获取最终结果的文件名
File output = shuffleBlockResolver.getDataFile(shuffleId, mapId);
// 根据output文件名,生成临时文件。临时文件的名称只是在output文件名后面添加了一个uuid
File tmp = Utils.tempFileWith(output);
try {
// 将所有的文件都合并到tmp文件中,返回每个数据段的长度
partitiOnLengths= writePartitionedFile(tmp);
// 这里writeIndexFileAndCommit会将tmp文件重命名,并且会创建索引文件。
shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, tmp);
} finally {
if (tmp.exists() && !tmp.delete()) {
logger.error("Error while deleting temp file {}", tmp.getAbsolutePath());
}
}
mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
}

BypassMergeSortShuffleWriter 所有的中间数据都是在磁盘里,并没有利用内存。而且它只保证分区索引的排序,而并不保证数据的排序。

3.2、SortShuffleWriter

图片来源网络:

《【Spark】Spark 存储原理--shuffle 过程》 image

该模式下,数据首先写入一个内存数据结构中,此时根据不同的 shuffle 算子,可能选用不同的数据结构。有些 shuffle 操作涉及到聚合,对于这种需要聚合的操作,使用 PartitionedAppendOnlyMap 来排序。对于不需要聚合的,则使用 PartitionedPairBuffer 排序。

在进行 shuffle 之前,map 端会先将数据进行排序。排序的规则,根据不同的场景,会分为两种。首先会根据 Key 将元素分成不同的 partition。第一种只需要保证元素的 partitionId 排序,但不会保证同一个 partitionId 的内部排序。第二种是既保证元素的 partitionId 排序,也会保证同一个 partitionId 的内部排序

接着,往内存写入数据,每隔一段时间,当向 MemoryManager 申请不到足够的内存时,或者数据量超过 spark.shuffle.spill.numElementsForceSpillThreshold 这个阈值时 (默认是 Long 的最大值,不起作用),就会进行 Spill 内存数据到文件,然后清空内存数据结构。假设可以源源不断的申请到内存,那么 Write 阶段的所有数据将一直保存在内存中,由此可见,PartitionedAppendOnlyMap 或者 PartitionedPairBuffer 是比较吃内存的。

在溢写到磁盘文件之前,会先根据 key 对内存数据结构中已有的数据进行排序。排序过后,会分批将数据写入磁盘文件。默认的 batch 数量是 10000 条,也就是说,排序好的数据,会以每批 1 万条数据的形式分批写入磁盘文件。写入磁盘文件也是通过 Java 的 BufferedOutputStream 实现的。

一个 task 将所有数据写入内存数据结构的过程中,会发生多次磁盘溢写操作,也就会产生多个临时文件。在将最终排序结果写入到数据文件之前,需要将内存中的 PartitionedAppendOnlyMap 或者 PartitionedPairBuffer 和已经 spill 到磁盘的 SpillFiles 进行合并。

此外,由于一个 task 就只对应一个磁盘文件,也就意味着该 task 为下游 stage 的 task 准备的数据都在这一个文件中,因此还会单独写一份索引文件,其中标识了下游各个 task 的数据在文件中的 start offset 与 end offset。

BypassMergeSortShuffleWriter 与该机制相比:

第一,磁盘写机制不同;第二,不会进行排序。也就是说,启用 BypassMerge 机制的最大好处在于,shuffle write 过程中,不需要进行数据的排序操作,也就节省掉了这部分的性能开销,当然需要满足那两个触发条件。

override def write(records: Iterator[Product2[K, V]]): Unit = {
// 根据是否在map端进行数据合并初始化 ExternalSorter
sorter = if (dep.mapSideCombine) {
require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!")
new ExternalSorter[K, V, C](
context, dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)
} else {
// In this case we pass neither an aggregator nor an ordering to the sorter, because we don't
// care whether the keys get sorted in each partition; that will be done on the reduce side
// if the operation being run is sortByKey.
// 不进行聚合,也不进行排序,reduce端再进行排序,只会根据 key 值获取对应的分区 id,来划分数据,不会在分区内排序,如果结果需要排序,例如sortByKey,会在 reduce 端获取 shuffle 数据后进行
new ExternalSorter[K, V, V](
context, aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer)
}
sorter.insertAll(records)
// Don't bother including the time to open the merged output file in the shuffle write time,
// because it just opens a single file, so is typically too fast to measure accurately
// (see SPARK-3570).
// shuffle输出文件
val output = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)
val tmp = Utils.tempFileWith(output)
try {
val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID)
// sorter 中的数据写出到该文件中
val partitiOnLengths= sorter.writePartitionedFile(blockId, tmp)
// 写出对应的index文件,纪录每个Partition对应的偏移量
shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp)
// shuffleWriter的返回结果
mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)
} finally {
if (tmp.exists() && !tmp.delete()) {
logError(s"Error while deleting temp file ${tmp.getAbsolutePath}")
}
}
}

3.3、UnsafeShuffleWriter

触发条件有三个:

  • 原始数据首先被序列化处理,并且再也不需要反序列,在其对应的元数据被排序后,需要 Serializer 支持 relocation,在指定位置读取对应数据。 KryoSerializer 和 spark sql 自定义的序列化器 支持这个特性。

  • 没有指定 aggregation 或者key排序, 因为 key 没有编码到排序指针中,所以只有 partition 级别的排序。

  • 分区数目必须小于 16777216 ,因为 partition number 使用24bit 表示的。

下面内容来自 Spark ShuffleWriter 原理。

UnsafeShuffleWriter 首先将数据序列化,保存在 MemoryBlock 中。然后将该数据的地址和对应的分区索引,保存在 ShuffleInMemorySorter 内存中,利用ShuffleInMemorySorter 根据分区排序。当内存不足时,会触发 spill 操作,生成spill 文件。最后会将所有的 spill文 件合并在同一个文件里。

整个过程可以想象成归并排序。ShuffleExternalSorter 负责分片的读取数据到内存,然后利用 ShuffleInMemorySorter 进行排序。排序之后会将结果存储到磁盘文件中。这样就会有很多个已排序的文件, UnsafeShuffleWriter 会将所有的文件合并。

下图来自 Spark ShuffleWriter 原理,表示了map端一个分区的shuffle过程:

《【Spark】Spark 存储原理--shuffle 过程》

UnsafeShuffleWriter 是对 SortShuffleWriter 的优化,大体上也和 SortShuffleWriter 差不多。从内存使用角度看,主要差异在以下两点:

  • 一方面,在 SortShuffleWriter 的 PartitionedAppendOnlyMap 或者 PartitionedPairBuffer 中,存储的是键值或者值的具体类型,也就是 Java 对象,是反序列化过后的数据。而在 UnsafeShuffleWriter 的 ShuffleExternalSorter 中数据是序列化以后存储到实际的 Page 中,而且在写入数据过程中会额外写入长度信息。总体而言,序列化以后数据大小是远远小于序列化之前的数据。

  • 另一方面,UnsafeShuffleWriter 中需要额外的存储记录(LongArray),它保存着分区信息和实际指向序列化后数据的指针(经过编码的Page num 以及 Offset)。相对于 SortShuffleWriter, UnsafeShuffleWriter 中这部分存储的开销是额外的。

四、后记

这篇文章东拼西凑,大多不是自己的东西,惭愧。

详见:

https://zhangchenchen.github.io/2018/09/26/deep-in-spark-shuffle/

Spark ShuffleWriter 原理


推荐阅读
  • 本指南从零开始介绍Scala编程语言的基础知识,重点讲解了Scala解释器REPL(读取-求值-打印-循环)的使用方法。REPL是Scala开发中的重要工具,能够帮助初学者快速理解和实践Scala的基本语法和特性。通过详细的示例和练习,读者将能够熟练掌握Scala的基础概念和编程技巧。 ... [详细]
  • Netty框架中运用Protobuf实现高效通信协议
    在Netty框架中,通过引入Protobuf来实现高效的通信协议。为了使用Protobuf,需要先准备好环境,包括下载并安装Protobuf的代码生成器`protoc`以及相应的源码包。具体资源可从官方下载页面获取,确保版本兼容性以充分发挥其性能优势。此外,配置好开发环境后,可以通过定义`.proto`文件来自动生成Java类,从而简化数据序列化和反序列化的操作,提高通信效率。 ... [详细]
  • 本文介绍了如何利用Apache POI库高效读取Excel文件中的数据。通过实际测试,除了分数被转换为小数存储外,其他数据均能正确读取。若在使用过程中发现任何问题,请及时留言反馈,以便我们进行更新和改进。 ... [详细]
  • 解决Only fullscreen opaque activities can request orientation错误的方法
    本文介绍了在使用PictureSelectorLight第三方框架时遇到的Only fullscreen opaque activities can request orientation错误,并提供了一种有效的解决方案。 ... [详细]
  • 本文详细介绍了 PHP 中对象的生命周期、内存管理和魔术方法的使用,包括对象的自动销毁、析构函数的作用以及各种魔术方法的具体应用场景。 ... [详细]
  • php更新数据库字段的函数是,php更新数据库字段的函数是 ... [详细]
  • 在《Cocos2d-x学习笔记:基础概念解析与内存管理机制深入探讨》中,详细介绍了Cocos2d-x的基础概念,并深入分析了其内存管理机制。特别是针对Boost库引入的智能指针管理方法进行了详细的讲解,例如在处理鱼的运动过程中,可以通过编写自定义函数来动态计算角度变化,利用CallFunc回调机制实现高效的游戏逻辑控制。此外,文章还探讨了如何通过智能指针优化资源管理和避免内存泄漏,为开发者提供了实用的编程技巧和最佳实践。 ... [详细]
  • 深入解析Android 4.4中的Fence机制及其应用
    在Android 4.4中,Fence机制是处理缓冲区交换和同步问题的关键技术。该机制广泛应用于生产者-消费者模式中,确保了不同组件之间高效、安全的数据传输。通过深入解析Fence机制的工作原理和应用场景,本文探讨了其在系统性能优化和资源管理中的重要作用。 ... [详细]
  • 深入剖析Java中SimpleDateFormat在多线程环境下的潜在风险与解决方案
    深入剖析Java中SimpleDateFormat在多线程环境下的潜在风险与解决方案 ... [详细]
  • 使用Maven JAR插件将单个或多个文件及其依赖项合并为一个可引用的JAR包
    本文介绍了如何利用Maven中的maven-assembly-plugin插件将单个或多个Java文件及其依赖项打包成一个可引用的JAR文件。首先,需要创建一个新的Maven项目,并将待打包的Java文件复制到该项目中。通过配置maven-assembly-plugin,可以实现将所有文件及其依赖项合并为一个独立的JAR包,方便在其他项目中引用和使用。此外,该方法还支持自定义装配描述符,以满足不同场景下的需求。 ... [详细]
  • 深入理解排序算法:集合 1(编程语言中的高效排序工具) ... [详细]
  • 本文介绍了如何利用 Delphi 中的 IdTCPServer 和 IdTCPClient 控件实现高效的文件传输。这些控件在默认情况下采用阻塞模式,并且服务器端已经集成了多线程处理,能够支持任意大小的文件传输,无需担心数据包大小的限制。与传统的 ClientSocket 相比,Indy 控件提供了更为简洁和可靠的解决方案,特别适用于开发高性能的网络文件传输应用程序。 ... [详细]
  • 在Linux系统中,通过使用`read`和`write`函数可以实现文件的高效复制操作。`open`函数用于打开或创建文件,其返回值为文件描述符,成功时返回一个有效的文件描述符,失败时返回-1。`path`参数指定了要操作的文件路径,而`oflag`参数则定义了文件的打开模式和属性。此外,为了确保数据的完整性和一致性,还需要合理处理文件读取和写入过程中的错误和异常情况。 ... [详细]
  • 本文深入探讨了 Git 与 SVN 的高效使用技巧,旨在帮助开发者轻松应对版本控制中的各种挑战。通过详细解析两种工具的核心功能与最佳实践,读者将能够更好地掌握版本管理的精髓,提高开发效率。 ... [详细]
  • 在深入研究 React 项目的过程中,特别是在探索 react-router 源码时,我发现了其中蕴含的中间件概念。这激发了我对中间件的进一步思考与整理。本文将详细探讨 Redux 中间件的原理及其在实际项目中的应用,帮助读者更好地理解和使用这一强大工具。通过具体示例和代码解析,我们将揭示中间件如何提升应用的状态管理和异步操作处理能力。 ... [详细]
author-avatar
郎郎2502918483
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有