热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

(版本定制)第13课:Spark

本期内容:1.ReceiverBlockTracker容错安全性 2.DStream和JobGenerator容错安全性一:容错安全性 1.ReceivedBlockTracker

本期内容:
1. ReceiverBlockTracker容错安全性 
2. DStream和JobGenerator容错安全性

一:容错安全性 
1. ReceivedBlockTracker负责管理Spark Streaming运行程序的元数据。数据层面 
2. DStream和JobGenerator是作业调度的核心层面,也就是具体调度到什么程度了,从运行的考虑的。DStream是逻辑层面。 
3. 作业生存层面,JobGenerator是Job调度层面,具体调度到什么程度了。从运行的角度的。

谈Driver容错你要考虑Driver中有那些需要维持状态的运行。 
1. ReceivedBlockTracker跟踪了数据,因此需要容错。通过WAL方式容错。 
2. DStreamGraph表达了依赖关系,恢复状态的时候需要根据DStream恢复计算逻辑级别的依赖关系。通过checkpoint方式容错。 
3. JobGenerator表面你是怎么基于ReceiverBlockTracker中的数据,以及DStream构成的依赖关系不断的产生Job的过程。你消费了那些数据,进行到什么程度了。

总结如下:

(版本定制)第13课:Spark Streaming源码解读之Driver容错安全性

ReceivedBlockTracker: 
1. ReceivedBlockTracker会管理Spark Streaming运行过程中所有的数据。并且把数据分配给需要的batches,所有的动作都会被WAL写入到Log中,Driver失败的话,就可以根据历史恢复tracker状态,在ReceivedBlockTracker创建的时候,使用checkpoint保存历史目录。

下面就从Receiver收到数据之后,怎么处理的开始。 
2. ReceiverBlockTracker.addBlock源码如下: 
Receiver接收到数据,把元数据信息汇报上来,然后通过ReceiverSupervisorImpl就将数据汇报上来,就直接通过WAL进行容错. 
当Receiver的管理者,ReceiverSupervisorImpl把元数据信息汇报给Driver的时候,正在处理是交给ReceiverBlockTracker. ReceiverBlockTracker将数据写进WAL文件中,然后才会写进内存中,被当前的Spark Streaming程序的调度器使用的,也就是JobGenerator使用的。JobGenerator不可能直接使用WAL。WAL的数据在磁盘中,这里JobGenerator使用的内存中缓存的数据结构

/** Add received block. This event will get written to the write ahead log (if enabled). */
def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = {
try {
val writeResult = writeToLog(BlockAdditionEvent(receivedBlockInfo)) //接收数据后,先进行WAL
if (writeResult) {
      synchronized {
getReceivedBlockQueue(receivedBlockInfo.streamId) += receivedBlockInfo //当WAL成功后,将Block Info元数据信息加入到Block Queue中
      }
      logDebug(s"Stream ${receivedBlockInfo.streamId} received " +
s"block ${receivedBlockInfo.blockStoreResult.blockId}")
    } else {
      logDebug(s"Failed to acknowledge stream ${receivedBlockInfo.streamId} receiving " +
s"block ${receivedBlockInfo.blockStoreResult.blockId} in the Write Ahead Log.")
    }
    writeResult
  } catch {
case NonFatal(e) =>
      logError(s"Error adding block $receivedBlockInfo", e)
false
}
}

Driver端接收到的数据保存在streamIdToUnallocatedBlockQueues中,具体结构如下:

private type ReceivedBlockQueue = mutable.Queue[ReceivedBlockInfo]
private val streamIdToUnallocatedBlockQueues = new mutable.HashMap[Int, ReceivedBlockQueue]
allocateBlocksToBatch把接收到的数据分配给batch,根据streamId取出Block,由此就知道Spark Streaming处理数据的时候可以有不同数据来源
那到底什么是batchTime? 
batchTime是上一个Job分配完数据之后,开始再接收到的数据的时间。
/**
 * Allocate all unallocated blocks to the given batch.
 * This event will get written to the write ahead log (if enabled).
 */
def allocateBlocksToBatch(batchTime: Time): Unit = synchronized {
if (lastAllocatedBatchTime == null || batchTime > lastAllocatedBatchTime) {
val streamIdToBlocks = streamIds.map { streamId =>
        (streamId, getReceivedBlockQueue(streamId).dequeueAll(x => true)) //根据StreamId获取Block信息
    }.toMap
val allocatedBlocks = AllocatedBlocks(streamIdToBlocks)
if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) {
timeToAllocatedBlocks.put(batchTime, allocatedBlocks)
lastAllocatedBatchTime = batchTime //这里有对batchTime进行赋值,就是上一个job分配完数据后,开始在接收到数据的时间
    } else {
      logInfo(s"Possibly processed batch $batchTime need to be processed again in WAL recovery")
    }
  } else {
logInfo(s"Possibly processed batch $batchTime need to be processed again in WAL recovery")
  }
}

随着时间的推移,会不断产生RDD,这时就需要清理掉一些历史数据,可以通过cleanupOldBatches方法来清理历史数据

/**
 * Clean up block information of old batches. If waitForCompletion is true, this method
 * returns only after the files are cleaned up.
 */
def cleanupOldBatches(cleanupThreshTime: Time, waitForCompletion: Boolean): Unit = synchronized {
require(cleanupThreshTime.milliseconds < clock.getTimeMillis())
val timesToCleanup = timeToAllocatedBlocks.keys.filter { _ < cleanupThreshTime }.toSeq
  logInfo("Deleting batches " + timesToCleanup)
if (writeToLog(BatchCleanupEvent(timesToCleanup))) {
timeToAllocatedBlocks --= timesToCleanup
writeAheadLogOption.foreach(_.clean(cleanupThreshTime.milliseconds, waitForCompletion))
  } else {
    logWarning("Failed to acknowledge batch clean up in the Write Ahead Log.")
  }
}

以上几个方法都进行了WAL动作

(record: ReceivedBlockTrackerLogEvent): = {
(isWriteAheadLogEnabled) {
    logTrace(record)
{
.get.write(ByteBuffer.(Utils.(record))clock.getTimeMillis())
} {
(e) =>
        logWarning(recorde)
}
  } {
}
}

总结: 
WAL对数据的管理包括数据的生成,数据的销毁和消费。上述在操作之后都要先写入到WAL的文件中. 

(版本定制)第13课:Spark Streaming源码解读之Driver容错安全性

JobGenerator: 
Checkpoint会有时间间隔Batch Duractions,Batch执行前和执行后都会进行checkpoint。 
doCheckpoint被调用的前后流程: 
(版本定制)第13课:Spark Streaming源码解读之Driver容错安全性(版本定制)第13课:Spark Streaming源码解读之Driver容错安全性

1、简单看下generateJobs

/** Generate jobs and perform checkpoint for the given `time`.  */
private def generateJobs(time: Time) {
// Set the SparkEnv in this thread, so that job generation code can access the environment
  // Example: BlockRDDs are created in this thread, and it needs to access BlockManager
  // Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed.
SparkEnv.set(ssc.env)
Try {
    jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch
graph.generateJobs(time) // generate jobs using allocated block
} match {
case Success(jobs) =>
val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
      jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
case Failure(e) =>
      jobScheduler.reportError("Error generating jobs for time " + time, e)
  }
eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false)) //job完成后就需要进行checkpoint动作
}

2、processEvent接收到消息事件

/** Processes all events */
private def processEvent(event: JobGeneratorEvent) {
  logDebug("Got event " + event)
  event match {
case GenerateJobs(time) => generateJobs(time)
case ClearMetadata(time) => clearMetadata(time)
case DoCheckpoint(time, clearCheckpointDataLater) =>
doCheckpoint(time, clearCheckpointDataLater) // 调用doCheckpoint方法
case ClearCheckpointData(time) => clearCheckpointData(time)
  }
}

3、doCheckpoint源码如下:

/** Perform checkpoint for the give `time`. */
private def doCheckpoint(time: Time, clearCheckpointDataLater: Boolean) {
if (shouldCheckpoint && (time - graph.zeroTime).isMultipleOf(ssc.checkpointDuration)) {
    logInfo("Checkpointing graph for time " + time)
ssc.graph.updateCheckpointData(time) //最终是进行RDD的Checkpoint
checkpointWriter.write(new Checkpoint(ssc, time), clearCheckpointDataLater)
  }
}

4、DStream中的updateCheckpointData源码如下:最终导致RDD的Checkpoint

/**
 * Refresh the list of checkpointed RDDs that will be saved along with checkpoint of
 * this stream. This is an internal method that should not be called directly. This is
 * a default implementation that saves only the file names of the checkpointed RDDs to
 * checkpointData. Subclasses of DStream (especially those of InputDStream) may override
 * this method to save custom checkpoint data.
 */
private[streaming] def updateCheckpointData(currentTime: Time) {
  logDebug("Updating checkpoint data for time " + currentTime)
checkpointData.update(currentTime)
  dependencies.foreach(_.updateCheckpointData(currentTime))
  logDebug("Updated checkpoint data for time " + currentTime + ": " + checkpointData)
}

JobGenerator容错安全性如下图: 
(版本定制)第13课:Spark Streaming源码解读之Driver容错安全性(版本定制)第13课:Spark Streaming源码解读之Driver容错安全性

参考博客:http://blog.csdn.net/snail_gesture/article/details/51492873#comments


推荐阅读
  • Linux重启网络命令实例及关机和重启示例教程
    本文介绍了Linux系统中重启网络命令的实例,以及使用不同方式关机和重启系统的示例教程。包括使用图形界面和控制台访问系统的方法,以及使用shutdown命令进行系统关机和重启的句法和用法。 ... [详细]
  • 基于PgpoolII的PostgreSQL集群安装与配置教程
    本文介绍了基于PgpoolII的PostgreSQL集群的安装与配置教程。Pgpool-II是一个位于PostgreSQL服务器和PostgreSQL数据库客户端之间的中间件,提供了连接池、复制、负载均衡、缓存、看门狗、限制链接等功能,可以用于搭建高可用的PostgreSQL集群。文章详细介绍了通过yum安装Pgpool-II的步骤,并提供了相关的官方参考地址。 ... [详细]
  • 个人学习使用:谨慎参考1Client类importcom.thoughtworks.gauge.Step;importcom.thoughtworks.gauge.T ... [详细]
  • 本文介绍了如何使用Express App提供静态文件,同时提到了一些不需要使用的文件,如package.json和/.ssh/known_hosts,并解释了为什么app.get('*')无法捕获所有请求以及为什么app.use(express.static(__dirname))可能会提供不需要的文件。 ... [详细]
  • 解决nginx启动报错epoll_wait() reported that client prematurely closed connection的方法
    本文介绍了解决nginx启动报错epoll_wait() reported that client prematurely closed connection的方法,包括检查location配置是否正确、pass_proxy是否需要加“/”等。同时,还介绍了修改nginx的error.log日志级别为debug,以便查看详细日志信息。 ... [详细]
  • mac php错误日志配置方法及错误级别修改
    本文介绍了在mac环境下配置php错误日志的方法,包括修改php.ini文件和httpd.conf文件的操作步骤。同时还介绍了如何修改错误级别,以及相应的错误级别参考链接。 ... [详细]
  • 本文介绍了Python高级网络编程及TCP/IP协议簇的OSI七层模型。首先简单介绍了七层模型的各层及其封装解封装过程。然后讨论了程序开发中涉及到的网络通信内容,主要包括TCP协议、UDP协议和IPV4协议。最后还介绍了socket编程、聊天socket实现、远程执行命令、上传文件、socketserver及其源码分析等相关内容。 ... [详细]
  • Linux服务器密码过期策略、登录次数限制、私钥登录等配置方法
    本文介绍了在Linux服务器上进行密码过期策略、登录次数限制、私钥登录等配置的方法。通过修改配置文件中的参数,可以设置密码的有效期、最小间隔时间、最小长度,并在密码过期前进行提示。同时还介绍了如何进行公钥登录和修改默认账户用户名的操作。详细步骤和注意事项可参考本文内容。 ... [详细]
  • Iamtryingtomakeaclassthatwillreadatextfileofnamesintoanarray,thenreturnthatarra ... [详细]
  • Spring源码解密之默认标签的解析方式分析
    本文分析了Spring源码解密中默认标签的解析方式。通过对命名空间的判断,区分默认命名空间和自定义命名空间,并采用不同的解析方式。其中,bean标签的解析最为复杂和重要。 ... [详细]
  • Nginx使用(server参数配置)
    本文介绍了Nginx的使用,重点讲解了server参数配置,包括端口号、主机名、根目录等内容。同时,还介绍了Nginx的反向代理功能。 ... [详细]
  • baresip android编译、运行教程1语音通话
    本文介绍了如何在安卓平台上编译和运行baresip android,包括下载相关的sdk和ndk,修改ndk路径和输出目录,以及创建一个c++的安卓工程并将目录考到cpp下。详细步骤可参考给出的链接和文档。 ... [详细]
  • 本文介绍了Hyperledger Fabric外部链码构建与运行的相关知识,包括在Hyperledger Fabric 2.0版本之前链码构建和运行的困难性,外部构建模式的实现原理以及外部构建和运行API的使用方法。通过本文的介绍,读者可以了解到如何利用外部构建和运行的方式来实现链码的构建和运行,并且不再受限于特定的语言和部署环境。 ... [详细]
  • ZSI.generate.Wsdl2PythonError: unsupported local simpleType restriction ... [详细]
  • http:my.oschina.netleejun2005blog136820刚看到群里又有同学在说HTTP协议下的Get请求参数长度是有大小限制的,最大不能超过XX ... [详细]
author-avatar
二十三点二十三分_465
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有