SparkEnv是Spark的执行环境对象,其中包括众多与Executor执行相关的对象。由于local模式下Driver会创建Executor,而cluster模式下Worker启动的CoarseGrainedExecutorBackend进程也会创建Executor,所以SparkEnv存在于Driver或者CoarseGrainedExecutorBackend进程中。
下面是SparkEnv的主构造函数定义:
/*** :: DeveloperApi ::* Holds all the runtime environment objects for a running Spark instance (either master or worker),* including the serializer, RpcEnv, block manager, map output tracker, etc. Currently* Spark code finds the SparkEnv through a global variable, so all the threads can access the same* SparkEnv. It can be accessed by SparkEnv.get (e.g. after creating a SparkContext).* 保存所有Spark运行时master和worker的环境对象, 包括serializer, RpcEnv, block manager和map output tracker等。* 现在Spark代码通过一个全局变量来引用SparkEnv,因此所有线程可以访问同一个SparkEnv对象。* 在创建完SparkContext对象后,它可以通过SparkEnv.get方法访问。*/@DeveloperApiclass SparkEnv (val executorId: String,private[spark] val rpcEnv: RpcEnv,val serializer: Serializer,val closureSerializer: Serializer,val serializerManager: SerializerManager,val mapOutputTracker: MapOutputTracker,val shuffleManager: ShuffleManager,val broadcastManager: BroadcastManager,val blockManager: BlockManager,val securityManager: SecurityManager,val metricsSystem: MetricsSystem,val memoryManager: MemoryManager,val outputCommitCoordinator: OutputCommitCoordinator,val conf: SparkConf) extends Logging {...}object SparkEnv extends Logging {// 存储全局的SparkEnv对象,并且为volatile变量,保证各线程得到相同的值@volatile private var env: SparkEnv = _/*** Returns the SparkEnv.*/def get: SparkEnv = {env}
主要对权限和账号进行设置。如果使用Hadoop YARN作为集群管理器,则需要使用证书生成secret key登录,最后给当前系统设置默认的口令认证实例,验证器实例采用匿名内部类实现。
/*** Spark class responsible for security.* 负责安全的spark类* In general this class should be instantiated by the SparkEnv and most components* should access it from that. There are some cases where the SparkEnv hasn't been* initialized yet and this class must be instantiated directly.* 通常这个类应该被SparkEnv初始化,然后大多数组件应该通过SparkEnv对象访问它。* This class implements all of the configuration related to security features described* in the "Security" document. Please refer to that document for specific features implemented* here.* 这个类实现了所有在官网'Security'文档中描述的有关安全特性的配置,具体实现的特性可参考[文档](https://spark.apache.org/docs/latest/security.html "spark安全文档")。*/private[spark] class SecurityManager(sparkConf: SparkConf,val ioEncryptionKey: Option[Array[Byte]] = None,authSecretFileConf: ConfigEntry[Option[String]] = AUTH_SECRET_FILE)extends Logging with SecretKeyHolder {// Set our own authenticator to properly negotiate user/password for HTTP connections.// This is needed by the HTTP client fetching from the HttpServer. Put here so its// only set once.// 设定你自己的验证器为HTTP连接协商正确的用户名/密码。// 用户名/密码是给从HTTP服务器获取的HTTP客户端使用的。把它放在这里是为了保证它只被设置一次。// 用于每次使用HTTP client从HTTP服务器获取用户的用户和密码。这是由于Spark的节点间通信往往需要动态协商用户名、密码if (authOn) {Authenticator.setDefault(new Authenticator() {override def getPasswordAuthentication(): PasswordAuthentication = {var passAuth: PasswordAuthentication = nullval userInfo = getRequestingURL().getUserInfo()if (userInfo != null) {val parts = userInfo.split(":", 2)passAuth = new PasswordAuthentication(parts(0), parts(1).toCharArray())}return passAuth}})}...}
用来进行Master和Worker间的分布式通信及数据传输。RpcEnv是RPC的环境对象,管理着整个RpcEndpoint的生命周期,其主要功能有:根据name或uri注册endpoints、管理各种消息的处理、停止endpoints。其中RpcEnv只能通过NettyRpcEnvFactory创建得到。
RpcEndpoint是一个通信端,例如Spark集群中的Master,或Worker,都是一个RpcEndpoint。但是,如果想要与一个RpcEndpoint端进行通信,一定需要获取到该RpcEndpoint一个RpcEndpointRef,通过RpcEndpointRef与RpcEndpoint进行通信,只能通过一个RpcEnv环境对象来获取RpcEndpoint对应的RPCEndpointRef。客户端通过RpcEndpointRef发消息,首先通过RpcEnv来处理这个消息,找到这个消息具体发给谁,然后路由给RpcEndpoint实体。
为什么从Spark 1.6以后用Netty代替Akka作为底层通信框架,实现类为NettyRpcEnv:
用来配置各Spark组件的序列化、压缩和加密。serializer默认实现为org.apache.spark.serializer.JavaSerializer,用户可以通过spark.serializer
属性配置其他的序列化实现,如org.apache.spark.serializer.KryoSerializer。
/*** Component which configures serialization, compression and encryption for various Spark* components, including automatic selection of which [[Serializer]] to use for shuffles.* SerializerManager组件用来配置各Spark组件的序列化、压缩和加密,包括自动选择用于shuffle的[[Serializer]]*/private[spark] class SerializerManager(defaultSerializer: Serializer,conf: SparkConf,encryptionKey: Option[Array[Byte]]) {// Whether to compress broadcast variables that are stored// 是否压缩存储的广播变量private[this] val compressBroadcast = conf.get(config.BROADCAST_COMPRESS)// Whether to compress shuffle output that are stored// 是否压缩存储的shuffle输出private[this] val compressShuffle = conf.get(config.SHUFFLE_COMPRESS)// Whether to compress RDD partitions that are stored serialized// 是否压缩存储序列化的RDD分区private[this] val compressRdds = conf.get(config.RDD_COMPRESS)// Whether to compress shuffle output temporarily spilled to disk// 是否压缩shuffle临时spill到磁盘上的输出private[this] val compressShuffleSpill = conf.get(config.SHUFFLE_SPILL_COMPRESS)...}
实现类固定为org.apache.spark.serializer.JavaSerializer,用户不能够自己指定。JavaSerializer采用Java语言内建的ObjectOutputStream将闭包序列化,目前Task的序列化只支持Java序列化。
val closureSerializer = new JavaSerializer(conf)/*** :: DeveloperApi ::* A Spark serializer that uses Java's built-in serialization.* 一个基于java内建序列化实现的spark序列化器* @note This serializer is not guaranteed to be wire-compatible across different versions of* Spark. It is intended to be used to serialize/de-serialize data within a single* Spark application.* */@DeveloperApiclass JavaSerializer(conf: SparkConf) extends Serializer with Externalizable {...}
Spark对闭包序列化前,会通过工具类org.apache.spark.util.ClosureCleaner尝试clean掉闭包中无关的外部对象引用,ClosureCleaner对闭包的处理是在运行期间,相比Scala编译器,能更精准的去除闭包中无关的引用。这样做,一方面可以尽可能保证闭包可被序列化,另一方面可以减少闭包序列化后的大小,便于网络传输。
用于将配置信息和序列化后的RDD、Job以及ShuffleDependency等信息在本地存储。如果为了容灾,也会复制到其他节点上。
// Called by SparkContext or Executor before using Broadcast// BroadcastManager必须在初始化方法initialize调用后才能使用private def initialize() {synchronized {if (!initialized) {broadcastFactory = new TorrentBroadcastFactorybroadcastFactory.initialize(isDriver, conf, securityManager)initialized = true}}}// 通过原子长整型对象来为广播对象生成唯一的idprivate val nextBroadcastId = new AtomicLong(0)// newBroadcast实际代理了工厂TorrentBroadcastFactory的newBroadcast方法来生成广播对象def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean): Broadcast[T] = {broadcastFactory.newBroadcast[T](value_, isLocal, nextBroadcastId.getAndIncrement())}// unbroadcast方法实际代理了工厂TorrentBroadcastFactory的unbroadcast方法将广播对象移除def unbroadcast(id: Long, removeFromDriver: Boolean, blocking: Boolean) {broadcastFactory.unbroadcast(id, removeFromDriver, blocking)}
用于跟踪map阶段任务的输出状态,此状态便于reduce阶段任务获取地址及中间输出结果。
在Driver中则创建MapOutputTrackerMaster,否则创建MapOutputTrackerWorker。通过registerOrLookupEndpoint方法,如果是在Driver中则设置trackerEndpoint为MapOutputTrackerMasterEndpoint,否则利用RpcUtils.makeDriverRef找到Driver中trackerEndpoint的rpc引用。map任务的状态正是由Executor向Driver发送GetMapOutputStatuses消息,将map任务状态同步回来。
val mapOutputTracker = if (isDriver) {new MapOutputTrackerMaster(conf, broadcastManager, isLocal)} else {new MapOutputTrackerWorker(conf)}// Have to assign trackerEndpoint after initialization as MapOutputTrackerEndpoint// requires the MapOutputTracker itselfmapOutputTracker.trackerEndpoint = registerOrLookupEndpoint(MapOutputTracker.ENDPOINT_NAME,new MapOutputTrackerMasterEndpoint(rpcEnv, mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], conf))def registerOrLookupEndpoint(name: String, endpointCreator: => RpcEndpoint):RpcEndpointRef = {if (isDriver) {logInfo("Registering " + name)rpcEnv.setupEndpoint(name, endpointCreator)} else {RpcUtils.makeDriverRef(name, conf, rpcEnv)}}// Executor向Driver发送消息寻问对应shuffleId的map任务状态if (fetchedStatuses == null) {// We won the race to fetch the statuses; do sologInfo("Doing the fetch; tracker endpoint = " + trackerEndpoint)// This try-finally prevents hangs due to timeouts:try {val fetchedBytes = askTracker[Array[Byte]](GetMapOutputStatuses(shuffleId))fetchedStatuses = MapOutputTracker.deserializeMapStatuses(fetchedBytes)logInfo("Got the output locations")mapStatuses.put(shuffleId, fetchedStatuses)} finally {fetching.synchronized {fetching -= shuffleIdfetching.notifyAll()}}}
MapOutputTrackerMaster中通过shuffleStatuses来维护跟踪各个map任务的输出状态,其中key对应shuffleId,ShuffleStatus保存各个map任务对应的状态信息MapStatus。由于MapStatus维护了map输出Block的地址BlockManagerId,所以reduce任务知道从何处获取map任务的中间输出。
// HashMap for storing shuffleStatuses in the driver.// Statuses are dropped only by explicit de-registering.// Exposed for testing// 在driver中用来存储shuffle状态的map,只有被显示注销状态才会被清除val shuffleStatuses = new ConcurrentHashMap[Int, ShuffleStatus]().asScala/*** Helper class used by the [[MapOutputTrackerMaster]] to perform bookkeeping for a single* ShuffleMapStage.* [[MapOutputTrackerMaster]]使用的工具类,用来订阅一个ShuffleMapStage的状态信息* This class maintains a mapping from mapIds to `MapStatus`. It also maintains a cache of* serialized map statuses in order to speed up tasks' requests for map output statuses.* 这个类保存了一个mapId到MapStatus的映射。* 为了加速tasks请求map输出状态,同样也保存了一个序列化的map任务状态缓存。* All public methods of this class are thread-safe.* 该类的所有public方法都是线程安全的。*/private class ShuffleStatus(numPartitions: Int) {// All accesses to the following state must be guarded with `this.synchronized`./*** MapStatus for each partition. The index of the array is the map partition id.* Each value in the array is the MapStatus for a partition, or null if the partition* is not available. Even though in theory a task may run multiple times (due to speculation,* stage retries, etc.), in practice the likelihood of a map output being available at multiple* locations is so small that we choose to ignore that case and store only a single location* for each output.* 存储每个partition对应的map任务状态,数组的下标是map任务的partition id值。* 数组中的每个值对应每个partition的MapStatus,如果某个partition还不可用则为null。* 尽管理论上一个task可能运行多次(由于推测执行或者stage重试等),* 但实际上一个map任务的输出同时在多个地方可用的概率很小,因此我们选择忽略这种情况而只为每个输出存储一个单一地址。*/// Exposed for testingval mapStatuses = new Array[MapStatus](numPartitions)...}/*** Result returned by a ShuffleMapTask to a scheduler. Includes the block manager address that the* task ran on as well as the sizes of outputs for each reducer, for passing on to the reduce tasks.* ShuffleMapTask任务返回给调度器的结果。包含了任务运行的块管理器的地址,以及为每个reduce task输出数据的大小。*/private[spark] sealed trait MapStatus {/** Location where this task was run. */def location: BlockManagerId...}
负责管理本地及远程的block数据的shuffle操作。ShuffleManager默认为通过反射方式生成的org.apache.spark.shuffle.sort.SortShuffleManager实例,目前还支持org.apache.spark.shuffle.sort.SortShuffleManager方式,可以通过修改属性spark.shuffle.manager为tungsten-sort改变。
/*** Pluggable interface for shuffle systems. A ShuffleManager is created in SparkEnv on the driver* and on each executor, based on the spark.shuffle.manager setting. The driver registers shuffles* with it, and executors (or tasks running locally in the driver) can ask to read and write data.* shuffle系统的可插拔接口。ShuffleManager基于spark.shuffle.manager配置被在Driver和每个Executor的SparkEnv中创建。* Driver利用它注册shuffle过程,executors(或者运行在driver本地的任务)可以用它来读写数据。* NOTE: this will be instantiated by SparkEnv so its constructor can take a SparkConf and* boolean isDriver as parameters.* 注意它将被SparkEnv初始化,所以它的构造器会需要一个SparkConf对象和布尔值isDriver作为参数。*/private[spark] trait ShuffleManager {...}
实现类为UnifiedMemoryManager动态内存管理器,execution部分和storage部分可以相互借用内存。
总的来说内存分为三大块,包括storageMemory(存储内存)、executionMemory(执行内存)和系统预留,其中storageMemory用来缓存rdd,unroll partition,存放direct task result、广播变量,在 Spark Streaming receiver 模式中存放每个 batch 的 blocks。executionMemory用于shuffle、join、sort、aggregation 中的缓存。除了这两者以外的内存都是预留给系统的。storageMemory和executionMemory初始状态是内存各占一半,但其中一方内存不足时可以向对方借用,对内存资源进行合理有效的利用,提高了整体资源的利用率。当storageMemory占用executionMemory内存时,如果此时executionMemory内存不足,则cached blocks会被从内存中清除直到释放足够的借用内存来满足executionMemory的要求。反之当storageMemory不足时,executionMemory也不会立即清除内存来返还给storageMemory。
/*** A [[MemoryManager]] that enforces a soft boundary between execution and storage such that* either side can borrow memory from the other.* UnifiedMemoryManager是一个在execution和storage间实行动态边界的[[MemoryManager]],因此两者可以相互借用内存。* The region shared between execution and storage is a fraction of (the total heap space - 300MB)* configurable through `spark.memory.fraction` (default 0.6). The position of the boundary* within this space is further determined by `spark.memory.storageFraction` (default 0.5).* This means the size of the storage region is 0.6 * 0.5 = 0.3 of the heap space by default.* execution和storage共同占有的区域是(总堆内存-300M)的一个系数,通过 `spark.memory.fraction` (默认值0.6)配置。* execution和storage在堆内存中的边界位置由参数`spark.memory.storageFraction` (默认值0.5)决定。* 这意味着storage区域的大小默认为堆内存大小的0.6 * 0.5 = 0.3倍。* Storage can borrow as much execution memory as is free until execution reclaims its space.* When this happens, cached blocks will be evicted from memory until sufficient borrowed* memory is released to satisfy the execution memory request.* 只要execution内存空闲则Storage可以就可以借用,直到execution回收它自己的空间。* Similarly, execution can borrow as much storage memory as is free. However, execution* memory is *never* evicted by storage due to the complexities involved in implementing this.* The implication is that attempts to cache blocks may fail if execution has already eaten* up most of the storage space, in which case the new blocks will be evicted immediately* according to their respective storage levels.* 同样,只要storage内存空闲则execution同样可以借用。* 然而考虑到实现的复杂性,execution内存从不会因为storage回收自己内存而被清除。* 实际上是,如果execution已经占用了大部分的storage空间,当试图缓存blocks时可能会失败,* 这种情况下新的blocks会根据它们对应的存储级别而被立即清除。* @param onHeapStorageRegionSize Size of the storage region, in bytes.* This region is not statically reserved; execution can borrow from* it if necessary. Cached blocks can be evicted only if actual* storage memory usage exceeds this region.* storage区域大小,单位为字节。这个区域不会被固定保留,如有必要execution可以借用。* 如果实际的storage使用内存超过了这个区域大小,缓存的blocks会被清除。*/private[spark] class UnifiedMemoryManager(conf: SparkConf,val maxHeapMemory: Long,onHeapStorageRegionSize: Long,numCores: Int)extends MemoryManager(conf,numCores,onHeapStorageRegionSize,maxHeapMemory - onHeapStorageRegionSize) {...}
具体实现为NettyBlockTransferService,使用Netty提供的异步事件驱动的网络应用框架,提供web服务及客户端,获取远程节点上Block的集合。
负责对BlockManager的管理和协调,具体操作依赖于BlockManagerMasterEndpoint。通过registerOrLookupEndpoint方法查找或者注册BlockManagerMasterEndpoint,对Driver和Executor处理BlockManagerMaster的方式不同:
当前应用程序是Driver,则创建BlockManagerMasterEndpoint,并且注册到Dispatcher中,注册名为 BlockManagerMaster;
当前应用程序是Executor,则从远端Driver实例的NettyRpcEnv的Dispatcher中查找BlockManagerMasterEndpoint的引用。
val blockManagerMaster = new BlockManagerMaster(registerOrLookupEndpoint(BlockManagerMaster.DRIVER_ENDPOINT_NAME,new BlockManagerMasterEndpoint(rpcEnv, isLocal, conf, listenerBus)),conf, isDriver)/*** BlockManagerMasterEndpoint is an [[ThreadSafeRpcEndpoint]] on the master node to track statuses* of all slaves' block managers.* BlockManagerMasterEndpoint是在master节点上的一个[[ThreadSafeRpcEndpoint]],为了跟踪所有slaves的块管理器的状态。*/private[spark]class BlockManagerMasterEndpoint(override val rpcEnv: RpcEnv,val isLocal: Boolean,conf: SparkConf,listenerBus: LiveListenerBus)extends ThreadSafeRpcEndpoint with Logging {...}
负责对Block的管理。BlockManager对象在SparkContext初始化创建SparkEnv执行环境被创建,而在 SparkContext后续的初始化过程中调用其initialize()完成其初始化。
// NB: blockManager is not valid until initialize() is called later.// BlockManager只有在初始化方法initialize被调用后才是有效的val blockManager = new BlockManager(executorId, rpcEnv, blockManagerMaster,serializerManager, conf, memoryManager, mapOutputTracker, shuffleManager,blockTransferService, securityManager, numUsableCores)/*** Manager running on every node (driver and executors) which provides interfaces for putting and* retrieving blocks both locally and remotely into various stores (memory, disk, and off-heap).* 管理器运行在driver和executor每个节点上,为从本地和远程存储和检索blocks到不同的存储(内存、磁盘和堆外内存)提供接口。* Note that [[initialize()]] must be called before the BlockManager is usable.* 注意[[initialize()]]方法必须在BlockManager使用前被调用。*/private[spark] class BlockManager(executorId: String,rpcEnv: RpcEnv,val master: BlockManagerMaster,val serializerManager: SerializerManager,val conf: SparkConf,memoryManager: MemoryManager,mapOutputTracker: MapOutputTracker,shuffleManager: ShuffleManager,val blockTransferService: BlockTransferService,securityManager: SecurityManager,numUsableCores: Int)extends BlockDataManager with BlockEvictionHandler with Logging {...}
可以看出创建度量系统根据当前实例是Driver还是Executor有所区别。
val metricsSystem = if (isDriver) {// 当前实例为Driver则创建度量系统并指定实例名为driver// Don't start metrics system right now for Driver.// 在Driver中不要立即启动度量系统。// We need to wait for the task scheduler to give us an app ID.// 我们需要等待SparkContext中的任务调度器TaskScheculer告诉我们应用程序ID。// Then we can start the metrics system.// 然后我们可以启动度量系统。MetricsSystem.createMetricsSystem(MetricsSystemInstances.DRIVER, conf, securityManager)} else {// 当前实例为Executor则创建度量系统并指定实例名为executor// We need to set the executor ID before the MetricsSystem is created because sources and// sinks specified in the metrics configuration file will want to incorporate this executor's// ID into the metrics they report.// 我们需要在度量系统创建前设置spark.executor.id属性为当前Executor的ID,// 因为在指标配置文件中指定的数据源和收集器想要将此executor的ID包括到它们上报的指标中。conf.set(EXECUTOR_ID, executorId)val ms = MetricsSystem.createMetricsSystem(MetricsSystemInstances.EXECUTOR, conf,securityManager)ms.start()ms}
构造MetricsSystem的过程最重要的是调用了MetricsConfig的initialize方法。initialize方法主要负责加载metrics.properties文件,可通过spark.metrics.conf参数修改。
/**
* Load properties from various places, based on precedence
* If the same property is set again latter on in the method, it overwrites the previous value
*/
def initialize() {// Add default properties in case there's no properties filesetDefaultProperties(properties)loadPropertiesFromFile(conf.get(METRICS_CONF))// Also look for the properties in provided Spark configurationval prefix = "spark.metrics.conf."conf.getAll.foreach {case (k, v) if k.startsWith(prefix) =>properties.setProperty(k.substring(prefix.length()), v)case _ =>}
当Spark应用程序使用了Spark SQL(包括Hive)或者需要将任务的输出保存到HDFS时,OutputCommitCoordinator将决定任务是否可以提交输出到HDFS(使用"第一个提交者获胜"的策略)。
val outputCommitCoordinator = mockOutputCommitCoordinator.getOrElse {new OutputCommitCoordinator(conf, isDriver)}val outputCommitCoordinatorRef = registerOrLookupEndpoint("OutputCommitCoordinator",new OutputCommitCoordinatorEndpoint(rpcEnv, outputCommitCoordinator))outputCommitCoordinator.coordinatorRef = Some(outputCommitCoordinatorRef)/*** Authority that decides whether tasks can commit output to HDFS. Uses a "first committer wins"* policy.** OutputCommitCoordinator is instantiated in both the drivers and executors. On executors, it is* configured with a reference to the driver's OutputCommitCoordinatorEndpoint, so requests to* commit output will be forwarded to the driver's OutputCommitCoordinator.** This class was introduced in SPARK-4879; see that JIRA issue (and the associated pull requests)* for an extensive design discussion.*/private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean) extends Logging {...}
无论是Driver还是Executor,最后都由OutputCommitCoordinator的属性coordinatorRef持有 OutputCommitCoordinatorEndpoint的引用: