作者:捡耙活哟752 | 来源:互联网 | 2023-08-28 17:14
Spark内存管理再探之前写过一篇Sparkonyarn的内存管理分配,初探,这次再来深入了解它更加底层的一些东西,之前博客的连接Sparkonyarn内存管理分配初探1.静态
Spark内存管理再探 之前写过一篇Spark on yarn的内存管理分配,初探,这次再来深入了解它更加底层的一些东西,之前博客的连接 Spark on yarn 内存管理分配初探
1. 静态内存管理 1.1存储内存分配 通过代码可以看出,存储空间可用内存 = 运行时最大内存 x 分配给存储空间的比例 x 安全系数
private val MIN_MEMORY_BYTES = 32 * 1024 * 1024 private def getMaxStorageMemory ( conf: SparkConf) : Long = { val systemMaxMemory = conf. getLong ( "spark.testing.memory" , Runtime. getRuntime. maxMemory) val memoryFraction = conf. getDouble ( "spark.storage.memoryFraction" , 0.6 ) val safetyFraction = conf. getDouble ( "spark.storage.safetyFraction" , 0.9 ) ( systemMaxMemory * memoryFraction * safetyFraction) . toLong}
所以静态内存管理中,1G的堆区,实际分配给存储空间的内存大约只有其中的 ***54%***,由于driver端与executor端使用的是不同的jvm,造成堆区的内存大小不同,需要根据 –driver-memory(spark.driver.memory) 以及 –executor-memory(spark.executor.memory) 来具体计算
1.2执行内存分配 执行内存跟存储内存类似,就是分配的比例不同
private def getMaxExecutionMemory ( conf: SparkConf) : Long &#61; { val systemMaxMemory &#61; conf. getLong ( "spark.testing.memory" , Runtime. getRuntime. maxMemory) if ( systemMaxMemory < MIN_MEMORY_BYTES) { throw new IllegalArgumentException ( s"System memory $systemMaxMemory must " &#43; s"be at least $MIN_MEMORY_BYTES. Please increase heap size using the --driver-memory " &#43; s"option or spark.driver.memory in Spark configuration." ) } if ( conf. contains ( "spark.executor.memory" ) ) { val executorMemory &#61; conf. getSizeAsBytes ( "spark.executor.memory" ) if ( executorMemory < MIN_MEMORY_BYTES) { throw new IllegalArgumentException ( s"Executor memory $executorMemory must be at least " &#43; s"$MIN_MEMORY_BYTES. Please increase executor memory using the " &#43; s"--executor-memory option or spark.executor.memory in Spark configuration." ) } } val memoryFraction &#61; conf. getDouble ( "spark.shuffle.memoryFraction" , 0.2 ) val safetyFraction &#61; conf. getDouble ( "spark.shuffle.safetyFraction" , 0.8 ) ( systemMaxMemory * memoryFraction * safetyFraction) . toLong}
在静态内存管理中&#xff0c;在 1g 的堆区大小下&#xff0c;实际分配给执行空间的内存大约只有其中的 16%,driver跟executor的分配也同样需要指定的大小具体计算.
1.3堆外内存重新分配 静态内存管理不支持使用堆外内存做存储空间&#xff0c;因此将其全部分配给执行空间
offHeapExecutionMemoryPool. incrementPoolSize ( offHeapStorageMemoryPool. poolSize) offHeapStorageMemoryPool. decrementPoolSize ( offHeapStorageMemoryPool. poolSize)
2.统一内存管理 2.1介绍 统一内存管理在执行空间和存储空间之间设置了一个软边界&#xff0c;这样任何一方都可以从另一方借用内存。执行和存储之间共享的区域默认占总的堆区的300M&#xff0c;可通过 spark.memory.fraction 配置&#xff0c;默认值为0.6。
这个共享区域可以进行更细的划分&#xff0c;例如在共享空间中&#xff0c;通过 spark.memory.storagefraction 设置存储空间占用的比重&#xff0c;默认为0.5。这就意味着默认情况下存储区域的大小为堆空间的0.6 * 0.5&#61;0.3。
存储可以尽可能多地借用没有使用的执行空间内存&#xff0c;直到执行空间收回需要使用时&#xff0c;将原先部分回收。当执行空间回收存储空间内存时&#xff0c;缓存块将从内存中移出&#xff0c;直到释放足够的借用内存以满足执行空间所需的内存请求。同样&#xff0c;执行可以借用尽可能多的空闲存储内存&#xff0c;但是执行内存不会被存储空间驱逐。这意味着如果执行空间吃掉了大部分存储空间的内存&#xff0c;缓存块的尝试可能会失败。这种情况下&#xff0c;新块将根据其各自的存储级别立即收回。
private val RESERVED_SYSTEM_MEMORY_BYTES &#61; 300 * 1024 * 1024 private def getMaxMemory ( conf: SparkConf) : Long &#61; { val systemMemory &#61; conf. getLong ( "spark.testing.memory" , Runtime. getRuntime. maxMemory) val reservedMemory &#61; conf. getLong ( "spark.testing.reservedMemory" , if ( conf. contains ( "spark.testing" ) ) 0 else RESERVED_SYSTEM_MEMORY_BYTES) val minSystemMemory &#61; ( reservedMemory * 1.5 ) . ceil. toLongif ( systemMemory < minSystemMemory) { throw new IllegalArgumentException ( s"System memory $systemMemory must " &#43; s"be at least $minSystemMemory. Please increase heap size using the --driver-memory " &#43; s"option or spark.driver.memory in Spark configuration." ) } if ( conf. contains ( "spark.executor.memory" ) ) { val executorMemory &#61; conf. getSizeAsBytes ( "spark.executor.memory" ) if ( executorMemory < minSystemMemory) { throw new IllegalArgumentException ( s"Executor memory $executorMemory must be at least " &#43; s"$minSystemMemory. Please increase executor memory using the " &#43; s"--executor-memory option or spark.executor.memory in Spark configuration." ) } } val usableMemory &#61; systemMemory - reservedMemoryval memoryFraction &#61; conf. getDouble ( "spark.memory.fraction" , 0.6 ) ( usableMemory * memoryFraction) . toLong} def apply ( conf: SparkConf, numCores: Int) : UnifiedMemoryManager &#61; { val maxMemory &#61; getMaxMemory ( conf) new UnifiedMemoryManager ( conf, maxHeapMemory &#61; maxMemory, onHeapStorageRegionSize &#61; ( maxMemory * conf. getDouble ( "spark.memory.storageFraction" , 0.5 ) ) . toLong, numCores &#61; numCores) }
2.2执行空间内存分配 执行池的借用规则
当执行池的部分内存被存储池借用时&#xff0c;首先将原本属于自己的空间强制回收 当存储池有空闲内存时&#xff0c;可以占用存储池的空间&#xff0c;存储池无法强制回收 当存储池原本就属于自己的内存都占满时&#xff0c;执行池无法强制驱逐&#xff0c;也无法占用 def maybeGrowExecutionPool ( extraMemoryNeeded: Long) : Unit &#61; { if ( extraMemoryNeeded > 0 ) { val memoryReclaimableFromStorage &#61; math. max ( storagePool. memoryFree, storagePool. poolSize - storageRegionSize) if ( memoryReclaimableFromStorage > 0 ) { val spaceToReclaim &#61; storagePool. freeSpaceToShrinkPool ( math. min ( extraMemoryNeeded, memoryReclaimableFromStorage) ) storagePool. decrementPoolSize ( spaceToReclaim) executionPool. incrementPoolSize ( spaceToReclaim) } } }
最大的执行池&#xff0c;等于驱逐完存储内存后的执行池的大小
执行区内存池自身最大的内存量平均分配给活动任务&#xff0c;以限制每个任务的执行内存分配。保持这个值大于执行池的大小是很重要的&#xff0c;因为执行池大小不会将通过移出存储空间释放内存作为潜在内存。SPARK-12155
此外&#xff0c;该值应保持在 maxMemory 以下&#xff0c;以权衡任务间执行内存分配的公平性&#xff0c;否则&#xff0c;任务可能会占用执行内存的公平份额&#xff0c;错误地认为其他任务可以获取无法收回的存储内存部分。
2.3存储空间内存分配 if ( numBytes > storagePool. memoryFree) { val memoryBorrowedFromExecution &#61; Math. min ( executionPool. memoryFree, numBytes - storagePool. memoryFree) executionPool. decrementPoolSize ( memoryBorrowedFromExecution) storagePool. incrementPoolSize ( memoryBorrowedFromExecution) }
存储池的借用规则
当执行池有空闲内存时&#xff0c;可以借用执行池的内存 如果借用的量小于空闲内存&#xff0c;借刚好的内存量就够了 如果借用的量大于空闲内存&#xff0c;只能将空闲内存全借了&#xff0c;但是无法进行驱逐 当存储池原本就有一部分内存被执行池占用时&#xff0c;也无法将原本属于存储池的内存进行驱逐。