热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

【Spark】Spark存储原理shuffle过程

本篇结构:SparkShuffle的发展SparkShuffle中数据结构SparkShuffle原理后记SparkShuffle是sparkjob中某些算子触发的操作。当rdd依

本篇结构:

  • Spark Shuffle 的发展
  • Spark Shuffle 中数据结构
  • Spark Shuffle 原理
  • 后记

Spark Shuffle 是 spark job 中某些算子触发的操作。当 rdd 依赖中出现宽依赖的时候,就会触发 Shuffle 操作,Shuffle 操作通常会伴随着不同 executor/host 之间数据的传输。

Shuffle 操作可能涉及的过程包括数据的排序,聚合,溢写,合并,传输,磁盘IO,网络的 IO 等等。Shuffle 是连接 MapTask 和 ReduceTask 之间的桥梁,Map 的输出到 Reduce 中须经过 Shuffle 环节,Shuffle 的性能高低直接影响了整个程序的性能和吞吐量。

通常 Shuffle 分为两部分:Map 阶段的数据准备( ShuffleMapTask )和Reduce(ShuffleReduceTask) 阶段的数据拷贝处理。一般将在 Map 端的 Shuffle 称之为 Shuffle Write,在 Reduce 端的 Shuffle 称之为 Shuffle Read。

一、Spark Shuffle 的发展

Spark Shuffle 机制总共有三种:

1.1、未优化的 HashShuffle

每一个 ShuffleMapTask 都会为每一个 ReducerTask 创建一个单独的文件,总的文件数是 M * R,其中 M 是 ShuffleMapTask 的数量,R 是 ShuffleReduceTask 的数量。

见下图(来源网络):

《【Spark】Spark 存储原理--shuffle 过程》 image

在处理大数据时,ShuffleMapTask 和 ShuffleReduceTask 的数量很多,创建的磁盘文件数量 M*R 也越多,大量的文件要写磁盘,再从磁盘读出来,不仅会占用大量的时间,而且每个磁盘文件记录的句柄都会保存在内存中(每个人大约 100k),因此也会占用很大的内存空间,频繁的打开和关闭文件,会导致频繁的GC操作,很容易出现 OOM 的情况。

也正是上述原因,该 HashShuffle 如今已退出历史舞台。

1.2、优化后 HashShuffle

在 Spark 0.8.1 版本中,引入了 Consolidation 机制,该机制是对 HashShuffle 的一种优化。

如下图(来源网络):

《【Spark】Spark 存储原理--shuffle 过程》 image

可以明显看出,在一个 core 上连续执行的 ShuffleMapTasks 可以共用一个输出文件 ShuffleFile。

先执行完的 ShuffleMapTask 形成 ShuffleBlock i,后执行的 ShuffleMapTask 可以将输出数据直接追加到 ShuffleBlock i 后面,形成 ShuffleBlock i’,每个 ShuffleBlock 被称为 FileSegment。下一个 stage 的 reducer 只需要 fetch 整个 ShuffleFile 就行了。

这样,每个 worker 持有的文件数降为 cores * R。cores 代表核数,R 是 ShuffleReduceTask 数。

1.3、Sort-Based Shuffle

由于 HashShuffle 会产生很多的磁盘文件,引入 Consolidation 机制虽然在一定程度上减少了磁盘文件数量,但是不足以有效提高 Shuffle 的性能,适合中小型数据规模的大数据处理。

为了让 Spark 在更大规模的集群上更高性能处理更大规模的数据,因此在 Spark 1.1版本中,引入了 SortShuffle。

如下图(来源网络):

《【Spark】Spark 存储原理--shuffle 过程》 image

该机制每一个 ShuffleMapTask 都只创建一个文件,将所有的 ShuffleReduceTask 的输入都写入同一个文件,并且对应生成一个索引文件。

以前的数据是放在内存缓存中,等到数据完了再刷到磁盘,现在为了减少内存的使用,在内存不够用的时候,可以将输出溢写到磁盘,结束的时候,再将这些不同的文件联合内存的数据一起进行归并,从而减少内存的使用量。一方面文件数量显著减少,另一方面减少Writer 缓存所占用的内存大小,而且同时避免 GC 的风险和频率。

但对于 Rueducer 数比较少的情况,Hash Shuffle 要比 Sort Shuffle 快,因此 Sort Shuffle 有个 “fallback” 计划,对于 Reducers 数少于 “spark.shuffle.sort.bypassMergeThreshold” (200 by default),将使用 fallback 计划,hashing 相关数据到分开的文件,然后合并这些文件为一个。

二、Spark Shuffle 中数据结构

2.1、AppendOnlyMap

AppendOnlyMap 单从命名上来看,是一个只能追加元素的 Map 结构。的确,它是只支持追加的 map,可以修改某个 key 对应的 value,但是不能删除已经存在的 key。

底层是由数组结构实现的,当需要对 Key-Value 进行聚合时,会使用AppendOnlyMap 作为 buffer。在插入或者修改元素的时候,会判断是否扩容,如果达到扩容标准,将会对数组 2 倍容量进行扩容,扩容过程中原有元素并不是直接拷贝,而是进行原有元素的重新定位存储,如果集合中存在的数据量大,那么这里的操作将会耗时又耗资源。

存储级别是 Memory-Only ,在 shuffle reduce 数据不会溢写,在数据量不大的情况下可以,但是数据量大时,会极易出现OOM。

2.2、ExternalAppendOnlyMap

继承于AppendOnlyMap ,但是存储级别是 Memory and Disk,即在数据量达到一个阈值的时候,会把数据溢写到磁盘,达到释放内存空间,降低 OOM 的风险的作用。

2.3、PartitionedAppendOnlyMap

是 SortShuffleWriter 中用到的一种数据结构,在 Map 端需要聚合的时候,采用这种数据结构,这种结构也是一种 Hash Table,能够根据 Key,通过 hash(Key),把数据插入到相应的位置。

PartitionedAppendOnlyMap 支持 aggregation,它继承了 SizeTrackingAppendOnlyMap,还实现了 WritablePartitionedPairCollection 接口中的 partitionedDestructiveSortedIterator 抽象方法,在该方法中调用了AppendOnlyMap 的 destructiveSortedIterator 对底层数组进行整理和排序后获得迭代器,数据有可能不会连续存放。

2.4、PartitionedPairBuffer

底层实现和 PartitionedAppendOnlyMap 一样都是 ArrayBuffer,主要用于SortShuffleWriter 中 Map 端不采用聚合和排序时使用。

不支持 aggregation,主要功能是插入值是有顺序的,主要起缓冲作用,只有顺序插入,没有 changeValue(聚合)和 update(更新)操作,数据连续存放在尾部。

三、Spark Shuffle 原理

因为 hash based shuffle 已经退出历史舞台,所以以 spark 2.3 的 sort based shuffle 为例,看 Spark Shuffle 的原理。

Shuffle 的整个生命周期由 ShuffleManager 来管理,Spark 2.3中,唯一的支持方式为 SortShuffleManager,SortShuffleManager 中定义了 writer 和 reader 对应shuffle 的 map 和 reduce 阶段。writer 有三种运行模式:

  • BypassMergeSortShuffleWriter:当前 shuffle 没有聚合, 并且分区数小于 spark.shuffle.sort.bypassMergeThreshold(默认200)
  • UnsafeShuffleWriter:当条件不满足 BypassMergeSortShuffleWriter 时, 并且当前 rdd 的数据支持序列化(即 UnsafeRowSerializer),也不需要聚合, 分区数小于 2^24
  • SortShuffleWriter:其余

3.1、BypassMergeSortShuffleWriter

首先,BypassMergeSortShuffleWriter 的运行机制的触发条件如下:

  • shuffle reduce task(即partition)数量小于spark.shuffle.sort.bypassMergeThreshold 参数的值。
  • 没有map side aggregations。
    note: map side aggregations是指在 map 端的聚合操作,通常来说一些聚合类的算子都会都 map 端的 aggregation。不过对于 groupByKey 和combineByKey, 如果设定 mapSideCombine 为false,就不会有 map side aggregations。

图片来源网络(正确的应该有三个 ReduceTask):

《【Spark】Spark 存储原理--shuffle 过程》 image

BypassMergeSortShuffleHandle 算法适用于没有聚合,数据量不大的场景。它为 reduce 端的每个分区,创建一个 DiskBlockObjectWriter。根据 Key 判断分区索引,然后添加到对应的 DiskBlockObjectWriter,写入到对应的临时文件。

《【Spark】Spark 存储原理--shuffle 过程》 image

因为写入磁盘文件是通过 Java的 BufferedOutputStream 实现的,BufferedOutputStream 是 Java 的缓冲输出流,首先会将数据缓冲在内存中,当内存缓冲满溢之后再一次写入磁盘文件中,这样可以减少磁盘 IO 次数,提升性能。所以图中会有内存缓冲的概念。

最后,会将所有临时文件合并成一个磁盘文件,并创建一个索引文件标识下游各个 reduce task 的数据在文件中的 start offset与 end offset。

该过程的磁盘写机制其实跟未经优化的 HashShuffleManager 是一样的,也会创建很多的临时文件(所以触发条件中会有 reduce task 数量限制),只是在最后会做一个磁盘文件的合并,对于 shuffle reader 会更友好一些。

public void write(Iterator> records) throws IOException {
assert (partitiOnWriters== null);
if (!records.hasNext()) {
partitiOnLengths= new long[numPartitions];
shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, null);
mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
return;
}
final SerializerInstance serInstance = serializer.newInstance();
final long openStartTime = System.nanoTime();
// DiskBlockObjectWriter 数组,索引是 reduce 端的分区索引
partitiOnWriters= new DiskBlockObjectWriter[numPartitions];
// FileSegment数组,索引是 reduce 端的分区索引
partitiOnWriterSegments= new FileSegment[numPartitions];
// 为每个 reduce 端的分区,创建临时 Block 和文件
for (int i = 0; i final Tuple2 tempShuffleBlockIdPlusFile =
blockManager.diskBlockManager().createTempShuffleBlock();
final File file = tempShuffleBlockIdPlusFile._2();
final BlockId blockId = tempShuffleBlockIdPlusFile._1();
partitionWriters[i] =
blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, writeMetrics);
}
// Creating the file to write to and creating a disk writer both involve interacting with
// the disk, and can take a long time in aggregate when we open many files, so should be
// included in the shuffle write time.
writeMetrics.incWriteTime(System.nanoTime() - openStartTime);
// 遍历数据,根据key找到分区索引,存到对应的文件中
while (records.hasNext()) {
final Product2 record = records.next();
// 获取数据的key
final K key = record._1();
// 根据reduce端的分区器,判断该条数据应该存在reduce端的哪个分区
// 并且通过DiskBlockObjectWriter,存到对应的文件中
partitionWriters[partitioner.getPartition(key)].write(key, record._2());
}
for (int i = 0; i final DiskBlockObjectWriter writer = partitionWriters[i];
// 调用DiskBlockObjectWriter的commitAndGet方法,获取FileSegment,包含写入的数据信息
partitionWriterSegments[i] = writer.commitAndGet();
writer.close();
}
// 获取最终结果的文件名
File output = shuffleBlockResolver.getDataFile(shuffleId, mapId);
// 根据output文件名,生成临时文件。临时文件的名称只是在output文件名后面添加了一个uuid
File tmp = Utils.tempFileWith(output);
try {
// 将所有的文件都合并到tmp文件中,返回每个数据段的长度
partitiOnLengths= writePartitionedFile(tmp);
// 这里writeIndexFileAndCommit会将tmp文件重命名,并且会创建索引文件。
shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, tmp);
} finally {
if (tmp.exists() && !tmp.delete()) {
logger.error("Error while deleting temp file {}", tmp.getAbsolutePath());
}
}
mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
}

BypassMergeSortShuffleWriter 所有的中间数据都是在磁盘里,并没有利用内存。而且它只保证分区索引的排序,而并不保证数据的排序。

3.2、SortShuffleWriter

图片来源网络:

《【Spark】Spark 存储原理--shuffle 过程》 image

该模式下,数据首先写入一个内存数据结构中,此时根据不同的 shuffle 算子,可能选用不同的数据结构。有些 shuffle 操作涉及到聚合,对于这种需要聚合的操作,使用 PartitionedAppendOnlyMap 来排序。对于不需要聚合的,则使用 PartitionedPairBuffer 排序。

在进行 shuffle 之前,map 端会先将数据进行排序。排序的规则,根据不同的场景,会分为两种。首先会根据 Key 将元素分成不同的 partition。第一种只需要保证元素的 partitionId 排序,但不会保证同一个 partitionId 的内部排序。第二种是既保证元素的 partitionId 排序,也会保证同一个 partitionId 的内部排序

接着,往内存写入数据,每隔一段时间,当向 MemoryManager 申请不到足够的内存时,或者数据量超过 spark.shuffle.spill.numElementsForceSpillThreshold 这个阈值时 (默认是 Long 的最大值,不起作用),就会进行 Spill 内存数据到文件,然后清空内存数据结构。假设可以源源不断的申请到内存,那么 Write 阶段的所有数据将一直保存在内存中,由此可见,PartitionedAppendOnlyMap 或者 PartitionedPairBuffer 是比较吃内存的。

在溢写到磁盘文件之前,会先根据 key 对内存数据结构中已有的数据进行排序。排序过后,会分批将数据写入磁盘文件。默认的 batch 数量是 10000 条,也就是说,排序好的数据,会以每批 1 万条数据的形式分批写入磁盘文件。写入磁盘文件也是通过 Java 的 BufferedOutputStream 实现的。

一个 task 将所有数据写入内存数据结构的过程中,会发生多次磁盘溢写操作,也就会产生多个临时文件。在将最终排序结果写入到数据文件之前,需要将内存中的 PartitionedAppendOnlyMap 或者 PartitionedPairBuffer 和已经 spill 到磁盘的 SpillFiles 进行合并。

此外,由于一个 task 就只对应一个磁盘文件,也就意味着该 task 为下游 stage 的 task 准备的数据都在这一个文件中,因此还会单独写一份索引文件,其中标识了下游各个 task 的数据在文件中的 start offset 与 end offset。

BypassMergeSortShuffleWriter 与该机制相比:

第一,磁盘写机制不同;第二,不会进行排序。也就是说,启用 BypassMerge 机制的最大好处在于,shuffle write 过程中,不需要进行数据的排序操作,也就节省掉了这部分的性能开销,当然需要满足那两个触发条件。

override def write(records: Iterator[Product2[K, V]]): Unit = {
// 根据是否在map端进行数据合并初始化 ExternalSorter
sorter = if (dep.mapSideCombine) {
require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!")
new ExternalSorter[K, V, C](
context, dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)
} else {
// In this case we pass neither an aggregator nor an ordering to the sorter, because we don't
// care whether the keys get sorted in each partition; that will be done on the reduce side
// if the operation being run is sortByKey.
// 不进行聚合,也不进行排序,reduce端再进行排序,只会根据 key 值获取对应的分区 id,来划分数据,不会在分区内排序,如果结果需要排序,例如sortByKey,会在 reduce 端获取 shuffle 数据后进行
new ExternalSorter[K, V, V](
context, aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer)
}
sorter.insertAll(records)
// Don't bother including the time to open the merged output file in the shuffle write time,
// because it just opens a single file, so is typically too fast to measure accurately
// (see SPARK-3570).
// shuffle输出文件
val output = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)
val tmp = Utils.tempFileWith(output)
try {
val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID)
// sorter 中的数据写出到该文件中
val partitiOnLengths= sorter.writePartitionedFile(blockId, tmp)
// 写出对应的index文件,纪录每个Partition对应的偏移量
shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp)
// shuffleWriter的返回结果
mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)
} finally {
if (tmp.exists() && !tmp.delete()) {
logError(s"Error while deleting temp file ${tmp.getAbsolutePath}")
}
}
}

3.3、UnsafeShuffleWriter

触发条件有三个:

  • 原始数据首先被序列化处理,并且再也不需要反序列,在其对应的元数据被排序后,需要 Serializer 支持 relocation,在指定位置读取对应数据。 KryoSerializer 和 spark sql 自定义的序列化器 支持这个特性。

  • 没有指定 aggregation 或者key排序, 因为 key 没有编码到排序指针中,所以只有 partition 级别的排序。

  • 分区数目必须小于 16777216 ,因为 partition number 使用24bit 表示的。

下面内容来自 Spark ShuffleWriter 原理。

UnsafeShuffleWriter 首先将数据序列化,保存在 MemoryBlock 中。然后将该数据的地址和对应的分区索引,保存在 ShuffleInMemorySorter 内存中,利用ShuffleInMemorySorter 根据分区排序。当内存不足时,会触发 spill 操作,生成spill 文件。最后会将所有的 spill文 件合并在同一个文件里。

整个过程可以想象成归并排序。ShuffleExternalSorter 负责分片的读取数据到内存,然后利用 ShuffleInMemorySorter 进行排序。排序之后会将结果存储到磁盘文件中。这样就会有很多个已排序的文件, UnsafeShuffleWriter 会将所有的文件合并。

下图来自 Spark ShuffleWriter 原理,表示了map端一个分区的shuffle过程:

《【Spark】Spark 存储原理--shuffle 过程》

UnsafeShuffleWriter 是对 SortShuffleWriter 的优化,大体上也和 SortShuffleWriter 差不多。从内存使用角度看,主要差异在以下两点:

  • 一方面,在 SortShuffleWriter 的 PartitionedAppendOnlyMap 或者 PartitionedPairBuffer 中,存储的是键值或者值的具体类型,也就是 Java 对象,是反序列化过后的数据。而在 UnsafeShuffleWriter 的 ShuffleExternalSorter 中数据是序列化以后存储到实际的 Page 中,而且在写入数据过程中会额外写入长度信息。总体而言,序列化以后数据大小是远远小于序列化之前的数据。

  • 另一方面,UnsafeShuffleWriter 中需要额外的存储记录(LongArray),它保存着分区信息和实际指向序列化后数据的指针(经过编码的Page num 以及 Offset)。相对于 SortShuffleWriter, UnsafeShuffleWriter 中这部分存储的开销是额外的。

四、后记

这篇文章东拼西凑,大多不是自己的东西,惭愧。

详见:

https://zhangchenchen.github.io/2018/09/26/deep-in-spark-shuffle/

Spark ShuffleWriter 原理


推荐阅读
  • 本文介绍了iOS数据库Sqlite的SQL语句分类和常见约束关键字。SQL语句分为DDL、DML和DQL三种类型,其中DDL语句用于定义、删除和修改数据表,关键字包括create、drop和alter。常见约束关键字包括if not exists、if exists、primary key、autoincrement、not null和default。此外,还介绍了常见的数据库数据类型,包括integer、text和real。 ... [详细]
  • 向QTextEdit拖放文件的方法及实现步骤
    本文介绍了在使用QTextEdit时如何实现拖放文件的功能,包括相关的方法和实现步骤。通过重写dragEnterEvent和dropEvent函数,并结合QMimeData和QUrl等类,可以轻松实现向QTextEdit拖放文件的功能。详细的代码实现和说明可以参考本文提供的示例代码。 ... [详细]
  • 本文介绍了一种轻巧方便的工具——集算器,通过使用集算器可以将文本日志变成结构化数据,然后可以使用SQL式查询。集算器利用集算语言的优点,将日志内容结构化为数据表结构,SPL支持直接对结构化的文件进行SQL查询,不再需要安装配置第三方数据库软件。本文还详细介绍了具体的实施过程。 ... [详细]
  • 本文介绍了在实现了System.Collections.Generic.IDictionary接口的泛型字典类中如何使用foreach循环来枚举字典中的键值对。同时还讨论了非泛型字典类和泛型字典类在foreach循环中使用的不同类型,以及使用KeyValuePair类型在foreach循环中枚举泛型字典类的优势。阅读本文可以帮助您更好地理解泛型字典类的使用和性能优化。 ... [详细]
  • vue使用
    关键词: ... [详细]
  • Linux服务器密码过期策略、登录次数限制、私钥登录等配置方法
    本文介绍了在Linux服务器上进行密码过期策略、登录次数限制、私钥登录等配置的方法。通过修改配置文件中的参数,可以设置密码的有效期、最小间隔时间、最小长度,并在密码过期前进行提示。同时还介绍了如何进行公钥登录和修改默认账户用户名的操作。详细步骤和注意事项可参考本文内容。 ... [详细]
  • 本文讨论了一个关于cuowu类的问题,作者在使用cuowu类时遇到了错误提示和使用AdjustmentListener的问题。文章提供了16个解决方案,并给出了两个可能导致错误的原因。 ... [详细]
  • 怀疑是每次都在新建文件,具体代码如下 ... [详细]
  • XML介绍与使用的概述及标签规则
    本文介绍了XML的基本概念和用途,包括XML的可扩展性和标签的自定义特性。同时还详细解释了XML标签的规则,包括标签的尖括号和合法标识符的组成,标签必须成对出现的原则以及特殊标签的使用方法。通过本文的阅读,读者可以对XML的基本知识有一个全面的了解。 ... [详细]
  • Go GUIlxn/walk 学习3.菜单栏和工具栏的具体实现
    本文介绍了使用Go语言的GUI库lxn/walk实现菜单栏和工具栏的具体方法,包括消息窗口的产生、文件放置动作响应和提示框的应用。部分代码来自上一篇博客和lxn/walk官方示例。文章提供了学习GUI开发的实际案例和代码示例。 ... [详细]
  • 本文讨论了clone的fork与pthread_create创建线程的不同之处。进程是一个指令执行流及其执行环境,其执行环境是一个系统资源的集合。在调用系统调用fork创建一个进程时,子进程只是完全复制父进程的资源,这样得到的子进程独立于父进程,具有良好的并发性。但是二者之间的通讯需要通过专门的通讯机制,另外通过fork创建子进程系统开销很大。因此,在某些情况下,使用clone或pthread_create创建线程可能更加高效。 ... [详细]
  • Oracle seg,V$TEMPSEG_USAGE与Oracle排序的关系及使用方法
    本文介绍了Oracle seg,V$TEMPSEG_USAGE与Oracle排序之间的关系,V$TEMPSEG_USAGE是V_$SORT_USAGE的同义词,通过查询dba_objects和dba_synonyms视图可以了解到它们的详细信息。同时,还探讨了V$TEMPSEG_USAGE的使用方法。 ... [详细]
  • JDK源码学习之HashTable(附带面试题)的学习笔记
    本文介绍了JDK源码学习之HashTable(附带面试题)的学习笔记,包括HashTable的定义、数据类型、与HashMap的关系和区别。文章提供了干货,并附带了其他相关主题的学习笔记。 ... [详细]
  • 海马s5近光灯能否直接更换为H7?
    本文主要介绍了海马s5车型的近光灯是否可以直接更换为H7灯泡,并提供了完整的教程下载地址。此外,还详细讲解了DSP功能函数中的数据拷贝、数据填充和浮点数转换为定点数的相关内容。 ... [详细]
  • 模板引擎StringTemplate的使用方法和特点
    本文介绍了模板引擎StringTemplate的使用方法和特点,包括强制Model和View的分离、Lazy-Evaluation、Recursive enable等。同时,还介绍了StringTemplate语法中的属性和普通字符的使用方法,并提供了向模板填充属性的示例代码。 ... [详细]
author-avatar
郎郎2502918483
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有