热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

[第九章]Job触发流程原理剖析

上一节我们是不是讲到,Driver,Application注册到Master上面后,Master中调用scheduler()进行资源调度,在这个里面通过LaunchDriver()

上一节我们是不是讲到,Driver,Application注册到Master上面后,Master中调用scheduler()进行资源调度,在这个里面通过LaunchDriver(),LaunchExecutor(),向Worker发出启动Driver,Exeutor的请求(或者说命令),

Worker接收到发来的请求,通过创建DriverRunner,ExecutorRunner线程来启动我们的Driver与Application.在启动完成后,根据第一章节我们分析的Spark核心原理,Executor会反向注册到Driver上,这样Driver就清楚哪些Executor在执行Application,实际上到了这个时候,我们的SparkContext已经全部初使化完成。

接下来我们就要继续执行我们自己编写的代码程序,其实一个Application包括多个JOB,那么JOB是如何划分的呢? 实际上一个Action操作,会划分一个JOB,就是说多个Action操作就会有多个JOB,JOB执行的顺序是从第一个开始。
下面我们以wordcount实例来详细分析一下JOB的划分:

val lines = sc.textFile(...)
val words = lines.flatMap(line => line.split(" "))
val pairs = words.map(word => (word, 1))
val counts = pairs.reduceByKey(_ + _)
counts.foreach(count => println(count._1 + ": " + count._2))

看到上面的几行代码,大家是不是太熟悉了,这就是我们学习spark的第一个例子。
首先我们先来分析第一行代码:
val lines = sc.textFile()

def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String] = {
assertNotStopped()
hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
minPartitions).map(pair => pair._2.toString).setName(path)
}

上面的代码是不是很熟悉,

  • 首先,textFile会调用一个hadoopFile(…)的方法,我们看里面的参数,TextInputFormat这是不是很熟悉,这是hadoop里读取文本文件的解析器啊
  • 后面的classOf[LongWritable], classOf[Text]这就是hadoop
    map()函数的k1,v1吧
    这可都是hadoop里的东东。下面我们看hadoopFile方法的代码

def hadoopFile[K, V](
path: String,
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
minPartitions: Int = defaultMinPartitions
): RDD[(K, V)] = {
assertNotStopped()
// A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it.
//这个是不是我们把hadoop配置数据序列化后在广播出去,共享广播这个我们在之前讲过,不会忘记吧!
val cOnfBroadcast= broadcast(new SerializableWritable(hadoopConfiguration))
val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path)
new HadoopRDD(
this,
confBroadcast,
Some(setInputPathsFunc),
inputFormatClass,
keyClass,
valueClass,
minPartitions).setName(path)
}

不难看出,这个方法返回了一个HadoopRDD算子
接着我们看后面调用了map(&#8230;),map方法具体我们这里就不分析,以前的章节我们分析过了,
.map(pair => pair._2.toString).setName(path)
其中的pair是不是一个tuple,也就是我们Hadoop的k1,v1,那么pair._2.toStrig,是不是我们读取文件的每一行内容。

接下来我们继续执行代码:

val words = lines.flatMap(line => line.split(" "))
val pairs = words.map(word => (word, 1))

这两行与上面分析map的相似,我们就不重复了。接着我们执行

val counts = pairs.reduceByKey(_ + _)

这行代码我们惊奇的发现在RDD类里没有reduceByKey方法,这是为什么了?没有方法如何调用。
大家是不是想到scala中的隐式转换,在RDD类中找查到了隐式转换方法rddToPairRDDFunctions(),把rdd转换成了rddToPairRDDFunctions,返回这个对象,如下面代码

implicit def rddToPairRDDFunctions[K, V](rdd: RDD[(K, V)])
(implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null): PairRDDFunctions[K, V] = {
new PairRDDFunctions(rdd)
}

果然在PairRDDFunction类中找到这个方法:

def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = {
combineByKey[V]((v: V) => v, func, func, partitioner)
}`

接下来我们执行最后一行代码,

counts.foreach(count => println(count._1 + ": " + count._2))

当我们看foreach时,明白这就是一个action算子。它调用了sparkContext的runJob方法,来划分一个JOB

def foreach(f: T => Unit) {
val cleanF = sc.clean(f)
//执行SparkContext中的runJob
sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
}

经过多个runJob()重载的调用,最后我们找到最终的调用方法:

def runJob[T, U: ClassTag](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
allowLocal: Boolean,
resultHandler: (Int, U) => Unit) {
if (stopped) {
throw new IllegalStateException("SparkContext has been shutdown")
}
val callSite = getCallSite
val cleanedFunc = clean(func)
logInfo("Starting job: " + callSite.shortForm)
if (conf.getBoolean("spark.logLineage", false)) {
logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
}
//调用sparkContext之前初使化 时创建的DAGScheduler的runJob方法
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, allowLocal,
resultHandler, localProperties.get)
progressBar.foreach(_.finishAll())
rdd.doCheckpoint()

看到这里大家 是不是明白了,最终其实就是调用了sparkContext之前初使化 时创建的DAGScheduler的runJob方法,里面的第一个参数rdd是不是以前的rdd算子。
这里也说明了JOB的划分都是在DAGScheduler里完成的。接下来我们会在在下一章节会对DAGScheduler如何划分JOB进行深入的分析.

本章中每一个字(包括源码注解)都是作者敲出来的,你感觉有用,帮点击&#8217;喜欢&#8217;


推荐阅读
  • 本指南从零开始介绍Scala编程语言的基础知识,重点讲解了Scala解释器REPL(读取-求值-打印-循环)的使用方法。REPL是Scala开发中的重要工具,能够帮助初学者快速理解和实践Scala的基本语法和特性。通过详细的示例和练习,读者将能够熟练掌握Scala的基础概念和编程技巧。 ... [详细]
  • Spark与HBase结合处理大规模流量数据结构设计
    本文将详细介绍如何利用Spark和HBase进行大规模流量数据的分析与处理,包括数据结构的设计和优化方法。 ... [详细]
  • 本文详细介绍了 PHP 中对象的生命周期、内存管理和魔术方法的使用,包括对象的自动销毁、析构函数的作用以及各种魔术方法的具体应用场景。 ... [详细]
  • 在Android应用开发中,实现与MySQL数据库的连接是一项重要的技术任务。本文详细介绍了Android连接MySQL数据库的操作流程和技术要点。首先,Android平台提供了SQLiteOpenHelper类作为数据库辅助工具,用于创建或打开数据库。开发者可以通过继承并扩展该类,实现对数据库的初始化和版本管理。此外,文章还探讨了使用第三方库如Retrofit或Volley进行网络请求,以及如何通过JSON格式交换数据,确保与MySQL服务器的高效通信。 ... [详细]
  • 在过去,我曾使用过自建MySQL服务器中的MyISAM和InnoDB存储引擎(也曾尝试过Memory引擎)。今年初,我开始转向阿里云的关系型数据库服务,并深入研究了其高效的压缩存储引擎TokuDB。TokuDB在数据压缩和处理大规模数据集方面表现出色,显著提升了存储效率和查询性能。通过实际应用,我发现TokuDB不仅能够有效减少存储成本,还能显著提高数据处理速度,特别适用于高并发和大数据量的场景。 ... [详细]
  • 在第二课中,我们将深入探讨Scala的面向对象编程核心概念及其在Spark源码中的应用。首先,通过详细的实战案例,全面解析Scala中的类和对象。作为一门纯面向对象的语言,Scala的类设计和对象使用是理解其面向对象特性的关键。此外,我们还将介绍如何通过阅读Spark源码来进一步巩固对这些概念的理解。这不仅有助于提升编程技能,还能为后续的高级应用开发打下坚实的基础。 ... [详细]
  • 构建高可用性Spark分布式集群:大数据环境下的最佳实践
    在构建高可用性的Spark分布式集群过程中,确保所有节点之间的无密码登录是至关重要的一步。通过在每个节点上生成SSH密钥对(使用 `ssh-keygen -t rsa` 命令并保持默认设置),可以实现这一目标。此外,还需将生成的公钥分发到所有节点的 `~/.ssh/authorized_keys` 文件中,以确保节点间的无缝通信。为了进一步提升集群的稳定性和性能,建议采用负载均衡和故障恢复机制,并定期进行系统监控和维护。 ... [详细]
  • 机器学习算法:SVM(支持向量机)
    SVM算法(SupportVectorMachine,支持向量机)的核心思想有2点:1、如果数据线性可分,那么基于最大间隔的方式来确定超平面,以确保全局最优, ... [详细]
  • 本文节选自《NLTK基础教程——用NLTK和Python库构建机器学习应用》一书的第1章第1.2节,作者Nitin Hardeniya。本文将带领读者快速了解Python的基础知识,为后续的机器学习应用打下坚实的基础。 ... [详细]
  • Hadoop的文件操作位于包org.apache.hadoop.fs里面,能够进行新建、删除、修改等操作。比较重要的几个类:(1)Configurati ... [详细]
  • 2022年7月20日:关键数据与市场动态分析
    2022年7月20日,本文对当日的关键数据和市场动态进行了深入分析。主要内容包括:1. 关键数据的解读与趋势分析;2. 市场动态的变化及其对投资策略的影响;3. 相关经济指标的评估。通过这些分析,帮助读者更好地理解当前市场环境,为决策提供参考。 ... [详细]
  • 本文详细解析了一种实用的函数,用于从URL中提取查询参数。该函数通过处理URL中的搜索部分,能够高效地获取并解析出所需的参数值,适用于各种Web开发场景。 ... [详细]
  • Python 序列图分割与可视化编程入门教程
    本文介绍了如何使用 Python 进行序列图的快速分割与可视化。通过一个实际案例,详细展示了从需求分析到代码实现的全过程。具体包括如何读取序列图数据、应用分割算法以及利用可视化库生成直观的图表,帮助非编程背景的用户也能轻松上手。 ... [详细]
  • 2012年9月12日优酷土豆校园招聘笔试题目解析与备考指南
    2012年9月12日,优酷土豆校园招聘笔试题目解析与备考指南。在选择题部分,有一道题目涉及中国人的血型分布情况,具体为A型30%、B型20%、O型40%、AB型10%。若需确保在随机选取的样本中,至少有一人为B型血的概率不低于90%,则需要选取的最少人数是多少?该问题不仅考察了概率统计的基本知识,还要求考生具备一定的逻辑推理能力。 ... [详细]
  • HBase Java API 进阶:过滤器详解与应用实例
    本文详细探讨了HBase 1.2.6版本中Java API的高级应用,重点介绍了过滤器的使用方法和实际案例。首先,文章对几种常见的HBase过滤器进行了概述,包括列前缀过滤器(ColumnPrefixFilter)和时间戳过滤器(TimestampsFilter)。此外,还详细讲解了分页过滤器(PageFilter)的实现原理及其在大数据查询中的应用场景。通过具体的代码示例,读者可以更好地理解和掌握这些过滤器的使用技巧,从而提高数据处理的效率和灵活性。 ... [详细]
author-avatar
1911530988com
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有