热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Spark内存管理再探

Spark内存管理再探之前写过一篇Sparkonyarn的内存管理分配,初探,这次再来深入了解它更加底层的一些东西,之前博客的连接Sparkonyarn内存管理分配初探1.静态

Spark内存管理再探

之前写过一篇Spark on yarn的内存管理分配,初探,这次再来深入了解它更加底层的一些东西,之前博客的连接 Spark on yarn 内存管理分配初探


1. 静态内存管理


1.1存储内存分配

通过代码可以看出,存储空间可用内存 = 运行时最大内存 x 分配给存储空间的比例 x 安全系数

// 默认最小内存为32M,单位为字节
private val MIN_MEMORY_BYTES = 32 * 1024 * 1024// 获取存储空间最大内存,单位为字节
private def getMaxStorageMemory(conf: SparkConf): Long = {// 从JVM运行时数据区中获取,拿到值的是堆的大小,实际值会小于堆区大小-Xmxval systemMaxMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory)// 分配给存储空间的内存比例val memoryFraction = conf.getDouble("spark.storage.memoryFraction", 0.6)// 实际可用区域需要再乘以一个安全系数val safetyFraction = conf.getDouble("spark.storage.safetyFraction", 0.9)(systemMaxMemory * memoryFraction * safetyFraction).toLong
}

所以静态内存管理中,1G的堆区,实际分配给存储空间的内存大约只有其中的 ***54%***,由于driver端与executor端使用的是不同的jvm,造成堆区的内存大小不同,需要根据 –driver-memory(spark.driver.memory) 以及 –executor-memory(spark.executor.memory) 来具体计算


1.2执行内存分配

执行内存跟存储内存类似,就是分配的比例不同

// 获取执行空间最大内存,单位为字节
private def getMaxExecutionMemory(conf: SparkConf): Long &#61; {// 从JVM运行时数据区中获取&#xff0c;拿到值的是堆的大小&#xff0c;实际值会小于堆区大小-Xmxval systemMaxMemory &#61; conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory)// 堆区内存&#xff08;系统内存&#xff09;需要大于32M的阈值if (systemMaxMemory < MIN_MEMORY_BYTES) {throw new IllegalArgumentException(s"System memory $systemMaxMemory must " &#43;// 这里可以发现&#xff0c;通过调整driver端的内存大小来增加堆区大小s"be at least $MIN_MEMORY_BYTES. Please increase heap size using the --driver-memory " &#43;s"option or spark.driver.memory in Spark configuration.")}// 读取配置文件&#xff0c;配置执行空间内存if (conf.contains("spark.executor.memory")) {val executorMemory &#61; conf.getSizeAsBytes("spark.executor.memory")// 执行空间内存也需要大于32M的阈值if (executorMemory < MIN_MEMORY_BYTES) {throw new IllegalArgumentException(s"Executor memory $executorMemory must be at least " &#43;// 通过调整executor端分配的内存来增加执行空间内存s"$MIN_MEMORY_BYTES. Please increase executor memory using the " &#43;s"--executor-memory option or spark.executor.memory in Spark configuration.")}}// 分配给执行空间的内存比例val memoryFraction &#61; conf.getDouble("spark.shuffle.memoryFraction", 0.2)// 实际可用区域需要再乘以一个安全系数val safetyFraction &#61; conf.getDouble("spark.shuffle.safetyFraction", 0.8)(systemMaxMemory * memoryFraction * safetyFraction).toLong
}

在静态内存管理中&#xff0c;在 1g 的堆区大小下&#xff0c;实际分配给执行空间的内存大约只有其中的 16%,driver跟executor的分配也同样需要指定的大小具体计算.


1.3堆外内存重新分配

静态内存管理不支持使用堆外内存做存储空间&#xff0c;因此将其全部分配给执行空间

// 将用作存储空间的堆外内存池全部重新分配给执行空间的堆外内存池
offHeapExecutionMemoryPool.incrementPoolSize(offHeapStorageMemoryPool.poolSize)
// 将存储空间的堆外内存池清零
offHeapStorageMemoryPool.decrementPoolSize(offHeapStorageMemoryPool.poolSize)

2.统一内存管理


2.1介绍

统一内存管理在执行空间和存储空间之间设置了一个软边界&#xff0c;这样任何一方都可以从另一方借用内存。执行和存储之间共享的区域默认占总的堆区的300M&#xff0c;可通过 spark.memory.fraction 配置&#xff0c;默认值为0.6。

这个共享区域可以进行更细的划分&#xff0c;例如在共享空间中&#xff0c;通过 spark.memory.storagefraction 设置存储空间占用的比重&#xff0c;默认为0.5。这就意味着默认情况下存储区域的大小为堆空间的0.6 * 0.5&#61;0.3。

存储可以尽可能多地借用没有使用的执行空间内存&#xff0c;直到执行空间收回需要使用时&#xff0c;将原先部分回收。当执行空间回收存储空间内存时&#xff0c;缓存块将从内存中移出&#xff0c;直到释放足够的借用内存以满足执行空间所需的内存请求。同样&#xff0c;执行可以借用尽可能多的空闲存储内存&#xff0c;但是执行内存不会被存储空间驱逐。这意味着如果执行空间吃掉了大部分存储空间的内存&#xff0c;缓存块的尝试可能会失败。这种情况下&#xff0c;新块将根据其各自的存储级别立即收回。

// 留了300M的预留空间
private val RESERVED_SYSTEM_MEMORY_BYTES &#61; 300 * 1024 * 1024private def getMaxMemory(conf: SparkConf): Long &#61; {// 获取jvm的最大内存val systemMemory &#61; conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory)// 计算预留内存val reservedMemory &#61; conf.getLong("spark.testing.reservedMemory",if (conf.contains("spark.testing")) 0 else RESERVED_SYSTEM_MEMORY_BYTES)// 计算jvm最小内存&#xff0c;为预留内存的1.5倍&#xff0c;向上取整val minSystemMemory &#61; (reservedMemory * 1.5).ceil.toLong// 要求系统内存应该为预留内存的1.5倍以上&#xff0c;否则会造成内存不足,程序也就不会往下执行if (systemMemory < minSystemMemory) {throw new IllegalArgumentException(s"System memory $systemMemory must " &#43;s"be at least $minSystemMemory. Please increase heap size using the --driver-memory " &#43;s"option or spark.driver.memory in Spark configuration.")}// SPARK-12759 Check executor memory to fail fast if memory is insufficient// 检查executor端内存是否足够if (conf.contains("spark.executor.memory")) {val executorMemory &#61; conf.getSizeAsBytes("spark.executor.memory")if (executorMemory < minSystemMemory) {throw new IllegalArgumentException(s"Executor memory $executorMemory must be at least " &#43;s"$minSystemMemory. Please increase executor memory using the " &#43;s"--executor-memory option or spark.executor.memory in Spark configuration.")}}// 计算剩余内存val usableMemory &#61; systemMemory - reservedMemory// 计算可用内存val memoryFraction &#61; conf.getDouble("spark.memory.fraction", 0.6)(usableMemory * memoryFraction).toLong}//默认存储内存跟执行内存的计算方式
def apply(conf: SparkConf, numCores: Int): UnifiedMemoryManager &#61; {val maxMemory &#61; getMaxMemory(conf)new UnifiedMemoryManager(conf,maxHeapMemory &#61; maxMemory,// 默认存储空间占用可用空间的一半onHeapStorageRegionSize &#61;(maxMemory * conf.getDouble("spark.memory.storageFraction", 0.5)).toLong,numCores &#61; numCores)}

2.2执行空间内存分配


执行池的借用规则



  1. 当执行池的部分内存被存储池借用时&#xff0c;首先将原本属于自己的空间强制回收
  2. 当存储池有空闲内存时&#xff0c;可以占用存储池的空间&#xff0c;存储池无法强制回收
  3. 当存储池原本就属于自己的内存都占满时&#xff0c;执行池无法强制驱逐&#xff0c;也无法占用

def maybeGrowExecutionPool(extraMemoryNeeded: Long): Unit &#61; {// 当执行空间内存不足时if (extraMemoryNeeded > 0) {// 借用规则val memoryReclaimableFromStorage &#61; math.max(storagePool.memoryFree,storagePool.poolSize - storageRegionSize)if (memoryReclaimableFromStorage > 0) {// 只回收足够执行任务的内存&#xff0c;而不是一次性收回val spaceToReclaim &#61; storagePool.freeSpaceToShrinkPool(// 有可能没办法一次性收回所需要的内存&#xff0c;需要分多次收回math.min(extraMemoryNeeded, memoryReclaimableFromStorage))storagePool.decrementPoolSize(spaceToReclaim)executionPool.incrementPoolSize(spaceToReclaim)}}}

最大的执行池&#xff0c;等于驱逐完存储内存后的执行池的大小

执行区内存池自身最大的内存量平均分配给活动任务&#xff0c;以限制每个任务的执行内存分配。保持这个值大于执行池的大小是很重要的&#xff0c;因为执行池大小不会将通过移出存储空间释放内存作为潜在内存。SPARK-12155

此外&#xff0c;该值应保持在 maxMemory 以下&#xff0c;以权衡任务间执行内存分配的公平性&#xff0c;否则&#xff0c;任务可能会占用执行内存的公平份额&#xff0c;错误地认为其他任务可以获取无法收回的存储内存部分。


2.3存储空间内存分配

if (numBytes > storagePool.memoryFree) {// 当存储池空间不够时&#xff0c;可以向执行池借空闲内存val memoryBorrowedFromExecution &#61; Math.min(executionPool.memoryFree,numBytes - storagePool.memoryFree)// 更新存储池和执行池的内存值executionPool.decrementPoolSize(memoryBorrowedFromExecution)storagePool.incrementPoolSize(memoryBorrowedFromExecution)}

存储池的借用规则



  1. 当执行池有空闲内存时&#xff0c;可以借用执行池的内存
  2. 如果借用的量小于空闲内存&#xff0c;借刚好的内存量就够了
  3. 如果借用的量大于空闲内存&#xff0c;只能将空闲内存全借了&#xff0c;但是无法进行驱逐
  4. 当存储池原本就有一部分内存被执行池占用时&#xff0c;也无法将原本属于存储池的内存进行驱逐。

推荐阅读
  • 第二章:Kafka基础入门与核心概念解析
    本章节主要介绍了Kafka的基本概念及其核心特性。Kafka是一种分布式消息发布和订阅系统,以其卓越的性能和高吞吐量而著称。最初,Kafka被设计用于LinkedIn的活动流和运营数据处理,旨在高效地管理和传输大规模的数据流。这些数据主要包括用户活动记录、系统日志和其他实时信息。通过深入解析Kafka的设计原理和应用场景,读者将能够更好地理解其在现代大数据架构中的重要地位。 ... [详细]
  • 本指南从零开始介绍Scala编程语言的基础知识,重点讲解了Scala解释器REPL(读取-求值-打印-循环)的使用方法。REPL是Scala开发中的重要工具,能够帮助初学者快速理解和实践Scala的基本语法和特性。通过详细的示例和练习,读者将能够熟练掌握Scala的基础概念和编程技巧。 ... [详细]
  • 技术日志:深入探讨Spark Streaming与Spark SQL的融合应用
    技术日志:深入探讨Spark Streaming与Spark SQL的融合应用 ... [详细]
  • 2020年9月15日,Oracle正式发布了最新的JDK 15版本。本次更新带来了许多新特性,包括隐藏类、EdDSA签名算法、模式匹配、记录类、封闭类和文本块等。 ... [详细]
  • 浅析python实现布隆过滤器及Redis中的缓存穿透原理_python
    本文带你了解了位图的实现,布隆过滤器的原理及Python中的使用,以及布隆过滤器如何应对Redis中的缓存穿透,相信你对布隆过滤 ... [详细]
  • Spring – Bean Life Cycle
    Spring – Bean Life Cycle ... [详细]
  • 零拷贝技术是提高I/O性能的重要手段,常用于Java NIO、Netty、Kafka等框架中。本文将详细解析零拷贝技术的原理及其应用。 ... [详细]
  • 本文探讨了 Kafka 集群的高效部署与优化策略。首先介绍了 Kafka 的下载与安装步骤,包括从官方网站获取最新版本的压缩包并进行解压。随后详细讨论了集群配置的最佳实践,涵盖节点选择、网络优化和性能调优等方面,旨在提升系统的稳定性和处理能力。此外,还提供了常见的故障排查方法和监控方案,帮助运维人员更好地管理和维护 Kafka 集群。 ... [详细]
  • 本文探讨了Android系统中支持的图像格式及其在不同版本中的兼容性问题,重点涵盖了存储、HTTP传输、相机功能以及SparseArray的应用。文章详细分析了从Android 10 (API 29) 到Android 11 的存储规范变化,并讨论了这些变化对图像处理的影响。此外,还介绍了如何通过系统升级和代码优化来解决版本兼容性问题,以确保应用程序在不同Android版本中稳定运行。 ... [详细]
  • 在C#中开发MP3播放器时,我正在考虑如何高效存储元数据以便快速检索。选择合适的数据结构,如字典或数组,对于优化性能至关重要。字典能够提供快速的键值对查找,而数组则在连续存储和遍历方面表现优异。根据具体需求,合理选择数据结构将显著提升应用的响应速度和用户体验。 ... [详细]
  • FastDFS Nginx 扩展模块的源代码解析与技术剖析
    FastDFS Nginx 扩展模块的源代码解析与技术剖析 ... [详细]
  • 揭秘腾讯云CynosDB计算层设计优化背后的不为人知的故事与技术细节
    揭秘腾讯云CynosDB计算层设计优化背后的不为人知的故事与技术细节 ... [详细]
  • 本文详细探讨了OpenCV中人脸检测算法的实现原理与代码结构。通过分析核心函数和关键步骤,揭示了OpenCV如何高效地进行人脸检测。文章不仅提供了代码示例,还深入解释了算法背后的数学模型和优化技巧,为开发者提供了全面的理解和实用的参考。 ... [详细]
  • 在CentOS上部署和配置FreeSWITCH
    在CentOS系统上部署和配置FreeSWITCH的过程涉及多个步骤。本文详细介绍了从源代码安装FreeSWITCH的方法,包括必要的依赖项安装、编译和配置过程。此外,还提供了常见的配置选项和故障排除技巧,帮助用户顺利完成部署并确保系统的稳定运行。 ... [详细]
  • HBase在金融大数据迁移中的应用与挑战
    随着最后一台设备的下线,标志着超过10PB的HBase数据迁移项目顺利完成。目前,新的集群已在新机房稳定运行超过两个月,监控数据显示,新集群的查询响应时间显著降低,系统稳定性大幅提升。此外,数据消费的波动也变得更加平滑,整体性能得到了显著优化。 ... [详细]
author-avatar
捡耙活哟752
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有