作者:手机用户2502907707 | 来源:互联网 | 2023-08-10 15:53
1.SparkShuffle调优shuffle在spark的算子中产生,也就是运行task的时候才会产生shuffle.2.sortShuffleManagersparkshuff
1.Spark Shuffle调优
shuffle在spark的算子中产生,也就是运行task的时候才会产生shuffle.
2.sortShuffleManager
spark shuffle的默认计算引擎叫sortshuffleManager,它负责shuffle过程的执行、计算和组件的处理,sortshuffleManager会将task进行shuffle操作时产生的临时磁盘文件合并成一个磁盘文件,在下一个stage的shuffle read task拉取自己的数据时,只要根据索引读取每个磁盘文件中的部分数据即可。
3.Shuffle当中可能遇到的问题
1. 数据量非常大,从其他各台机器收集数据占用大量网络。
2. 数据如何分类,即如何Partition,Hash、Sort等;
3. 负载均衡(数据倾斜),因为采用不同的Shuffle方式对数据不同的分类,而分类之后又要跑到具体的节点上计算,如果不恰当的话,很容易产生数据倾斜;
4. 网络传输效率,需要在压缩和解压缩之间做出权衡,序列化和反序列也是要考虑的问题;
说明:具体的Task进行计算的时候尽一切最大可能使得数据具备Process Locality的特性;退而求次是增加数据分片,减少每个Task处理的数据量。
4.参数
1. spark.shuffle.file.buffer(默认值为32K)
该参数是缓冲区的缓冲内存,如果可用的内存资源较为充足的话,可以将缓冲区的值设置大点,这样会较少磁盘IO次数.,如果合理调节该参数,性能会提升1%~5%… 可以设置为64K。
2. spark.reducer.max.SizeFlight(默认为48M)
该参数是stage的每一个task就需要将上一个stage的计算结果中的所有相同key,从各个节点上通过网络都拉取到自己所在的节点上,然后进行key的聚合或连接等操作,如果合理调节该参数(增大),性能会提升1%~5%…
3. Saprk.shuffle.memoryFraction(默认20%)shuffle聚合内存的比例
该参数是数据根据不同的shuffle算子将数据写入内存结构中,内存结构达到阈值会溢出临时文件,这个参数就是则是内存结构的阈值百分比的,不是内存结构的内存大小. 如果内存充足,而且很少使用持久化操作,建议调高这个比例,可以减少频繁对磁盘进行IO操作,合理调节该参数可以将性能提升10%左右。
4. spark.shuffle.io.maxRetries拉取数据重试次数(默认3次)
该参数是stage的task向上一个stage的task计算结果拉取数据,也就是上面那个操作,有时候会因为网络异常原因,导致拉取失败,失败时候默认重新拉取三次,三次过还是失败的话作业就执行失败了,根据具体的业务可以考虑将默认值增大,这样可以避免由于JVM的一些原因或者网络不稳定等因素导致的数据拉取失败.也有助于提高spark作业的稳定性. 可以适当的提升重新拉取的次数,最大为60次.
5.spark.shuffle.io.retryWait(默认是5s)—– 重试间隔时间60s
是每次拉取数据的间隔时间… 建议加大间隔时长(比60s),以增加shuffle操作的稳定性
6. Spark Shuffle的种类
7. HashShuffle 合并机制
合并机制就是复用buffer,开启合并机制的配置是spark.shuffle.consolidateFiles。该参数默认值为false,将其设置为true即可开启优化机制。通常来说,如果我们使用HashShuffleManager,那么都建议开启这个选项。
8. spark.shuffle.sort.bypassMergeThreshold — 200 SortShuffle bypass机制 200次
Executor的堆外内存调优
为了优化JVM和的读写速度,这里的内核kernel与JVM共用同一个堆外内存。这样当JVM中的Executor需要读取数据时,将请求发送给Kernel,kernel将数据准备好,直接拉取到堆外内存中,那么JVM中的Executor就可以直接去堆外内存中拉取数据即可,所以可以增加Executor的堆外内存大小,来提升Executor的读写效率。
Spark底层shuffle的传输方式是使用netty传输,netty在进行网络传输的过程会申请堆外内存(netty是零拷贝),所以使用了堆外内存。默认情况下,这个堆外内存上限默认是每一个executor的内存大小的10%;真正处理大数据的时候,这里都会出现问题,导致spark作业反复崩溃,无法运行;此时就会去调节这个参数,到至少1G(1024M),甚至说2G、4G。
executor在进行shuffle write,优先从自己本地关联的mapOutPutWorker中获取某份数据,如果本地block manager没有的话,那么会通过TransferService,去远程连接其他节点上executor的block manager去获取,尝试建立远程的网络连接,并且去拉取数据。频繁创建对象让JVM堆内存满溢,进行垃圾回收。正好碰到那个exeuctor的JVM在垃圾回收。处于垃圾回过程中,所有的工作线程全部停止;相当于只要一旦进行垃圾回收,spark / executor停止工作,无法提供响应,spark默认的网络连接的超时时长是60s;如果卡住60s都无法建立连接的话,那么这个task就失败了。task失败了就会出现shuffle file cannot find的错误。
那么如何调节等待的时长呢?
在./spark-submit提交任务的脚本里面添加:
–conf spark.core.connection.ack.wait.timeout=300
Executor由于内存不足或者堆外内存不足了,挂掉了,对应的Executor上面的block manager也挂掉了,找不到对应的shuffle map output文件,Reducer端不能够拉取数据。
我们可以调节堆外内存的大小,如何调节?
在./spark-submit提交任务的脚本里面添加
yarn下:
–conf spark.yarn.executor.memoryOverhead=2048 单位M
standalone下:
–conf spark.executor.memoryOverhead=2048单位M