作者:书友80922185 | 来源:互联网 | 2023-09-04 14:59
1总体框架结构图Spark应用程序架构由上图我们可以看到Spark应用程序架构主要由DriverProgram和Executor构成,Driver负责运行main()和创建Spar
1 总体框架结构图
Spark应用程序架构
由上图我们可以看到Spark应用程序架构主要由Driver Program和Executor构成,Driver负责运行main()和创建SparkContext,Executor主要负责执行Task任务。
2 各大重要组件和概念
2.1 重要概念
概念 | 功能 | 模型 | 备注 |
---|
Application | 用户定义的Spark应用程序,用户提交后,Spark会为应用分配资源,调度和执行相关任务 | | . |
RDD Graph | Spark会分析Spark应用,将程序中所涉及的RDD按照一定的依赖关系构建RDD Graph,也就是根据算子来构建有向无环图,每个RDD Graph都会转化成一个Job | | |
Job | – 一个App中可以包含多个Job,每个Job都是由一个RDD Graph转化而来的 – 由Action算子触发 | | . |
Stage | – 每个Job会根据RDD之间的宽依赖(Shuffle Dependency)来划分成多个Stage,每一个Stage中包含一组Task(也就是TaskSet) – 每个Stage中的Task类型都是相同的 | | . |
Task | – 一个分区对应一个Task – Task执行RDD中对应的算子,Task被封装TaskRunner后放入Executor的线程中来执行,并由TaskRunner来进行调度 – Task类型有ShuffleMapTask和ResultTask | | . |
Worker | 可以运行App代码的节点,如Yarn中NodeManager | | . |
DAGScheduler | 用来构建基于Stage的有向无环图,划分的依据是根据RDD之间的宽依赖 | | . |
TaskScheduler | – 将TaskSet提交给Executor去运行 – 维护TaskSet的运行状态标签,负责Task的失效重执行 | | . |
2.2 相关组件
组件 | 功能 | 模型 | 备注 |
---|
Driver | – 运行Application的main函数并创建SparkContext – SparkContext的作用就是与Cluster Manager通讯,进行资源申请、任务的调度等。 | | . |
Executor | – 某个App运行在Worker上的一个进程,用于执行App的Task – 每个App都有独立的Executor,Executor上运行的Task取决于分配的core数 | | . |
Cluster Manager | 资源调度服务,用于资源的分配和调度,有standalone、mesos和yarn三种模式 | | . |
3 应用的提交执行流程
3.1 运行模式
1)Spark应用的运行模式有以下几种:
- local[n]:本地模式
- standalone:使用spark内部的资源管理器,master-slaves架构
- mesos : Apache下的一个资源管理框架
- yarn : Hadoop下的一个资源管理框架
2)根据Driver的运行位置来划分有以下两种运行模式:
- Cluster:Driver和Executor运行在集群中的Worker上
- Client:Driver运行在客户端上,Executor运行在Worker上
3)命令行
- spark-shell:bin/spark-shell –master yarn –deploy-mode client(yarn的client模式)
- spark-submit:spark-shell底层也是调用spark-submit,如果你想知道它有什么参数,可以直接在spark目录下输入 spark-submit来查看
3.2 流程
这里我们就以 Spark On Yarn 为例来说明
3.2.1 Yarn-Client 模式
流程如下:
- 在客户端通过spark-submit向Yarn提交Application
- 应用在Client启动Driver,创建SparkContext并进行初始化
- ResourceManger为应用申请一个Container来启动Application Master,用来与Client中的SparkContext进行通讯。
- Application Master即承担起了为应用申请资源运行Executor的责任,一旦申请到资源,那么就会在指定的NodeManager的Container上创建Executor进程,Executor会向SparkContext注册,保持通讯
- SparkContext会根据任务所需资源,数据本地性等条件指定Task的运行在哪个Executor上,将Task分发到指定Executor并开始执行,Executor会周期向SparkContext汇报任务运行情况
- 一旦所有任务执行完毕,SparkContext会向ResourceManger申请注销自己并关闭
3.2.2 Yarn-Cluster 模式
流程如下:
- 在客户端通过spark-submit向Yarn提交Application
- ResourceManger为应用申请一个Container来启动Application Master,并且在Application Master进行SparkContext等初始化。
- Application Master即承担起了为应用申请资源运行Executor的责任,一旦申请到资源,那么就会在指定的NodeManager的Container上创建Executor进程,Executor会向SparkContext注册,保持通讯
- Applicaiton Master会根据任务所需资源,数据本地性等条件指定Task的运行在哪个Executor上,将Task分发到指定Executor并开始执行,Executor会周期向Applicaiton Master汇报任务运行情况
- 一旦所有任务执行完毕,Applicaiton Master会向ResourceManger申请注销自己并关闭
3.3 Job的全生命周期
- action算子触发
- 内部实际就是sc.runJob,最终调用了submitJob
- submit中最终调用了DAGScheduler的submitStage,由最后一个stage往前找,直到parentStage缺失调用submitMissingTask
- DAGScheduler将TaskSet提交给TaskScheduler执行
- TaskScheduler中实际调用了submit方法,而该方法是使用远程调用的方式来调用CoraseGraintedExecutorBackend的launchTask方法
- launchTask则是将包装了Task的TadkRunner丢进线程池中去运行
4 Spark的读写流程
4.1 写入流程
- RDD调用compute( )方法进行指定分区的写入。
- CacheManager中调用BlockManater判断数据是否已经写入,如果未写则写入。
- BlockManager中数据与其他节点同步。(与副本节点通讯)
- BlockManager根据存储级别写入指定的存储层。(存储级别决定了数据是存储在内存还是磁盘,或者是两者结合)
- BlockManager向主节点汇报存储状态。
4.2 读取流程
- RDD调用compute( )方法进行指定分区的读取。
- 调用BlockManager的get()进行读取
- 在本地同步读取数据块,首先看能否在内存读取数据块,如果不能读取,则看能否从Tachyon读取数据块,如果仍不能读取,则看能否从磁盘读取数据块。
- 如果本地没有数据,就进行远程读取:远程获取调用路径,然后getRemote调用doGetRemote,通过BlockManagerWorker.syncGetBlock从远程获取数据。
5 Spark各组件之间的通信方式和流程5.1 通讯框架
1.6之前
- Spark在模块间通信使用的是AKKA框架。AKKA基于Scala开发,用于编写Actor应用。
- Actors是一些包含状态和行为的对象。它们通过显式传递消息来进行通信,消息会被发送到它们的消息队列中。对消息队列中的消息进行处理作出响应
- RPC是单独通过AKKA实现,数据以及文件传输是采用netty实现,而AKKA底层也是使用Netty来实现的
1.6 后
- 通过netty封装了一套简洁的类似于AKKA Actor模式的RPC接口,并逐步抛弃AKKA这个重量级框架
- 2.0 之后,所有的网络功能模块都是通过Netty来实现的
5.2 通讯的简单流程
- 客户端发送请求消息,经过Encoder加上头信息,通过网络发送给服务端
- 服务端收到消息之后,经过TransportFrameDecoder进行处理,得到消息类型和消息体,在解析得到具体的请求信息
- 最后由TransportChannelHandler处理具体的请求信息,根据消息类型判断是否回应
6 Spark的容错机制
分布式系统常用的容错机制
- 数据检查点:成本高,耗费资源
- 记录数据的更新:耗费资源较少
Spark所采用的容错机制是记录数据更新和数据检查点相结合的形式。原因如下:
- RDD只支持粗粒度转换,即在大量记录上执行的单个操作。将创建RDD的一系列Lineage(即血统)记录下来,以便恢复丢失的分区。也就是说当子RDD分区丢失时,只要重新计算它对应的父RDD分区就可以进行恢复,这对于窄依赖来说不存在冗余计算。
- 宽依赖也就是Shuffle Dependency,也就是发生在shuffle阶段的依赖,如果子RDD分区丢失了,那么重新计算的代价就比较大,因为有多个父RDD分区对应该子RDD分区,那么重新计算时所有的父RDD分区都得重新计算,造成了计算冗余
以下情况需要添加检查点
- DAG中依赖线过长,如果重算,则开销太大
- 在Shuffle Dependency上做CheckPoint(检查点)获得的收益更大
在RDD计算中,通过检查点机制进行容错,传统做检查点有两种方式:通过冗余数据和日志记录更新操作。RDD中所采用的就是通过冗余数据来缓存数据,然后对已进行冗余
操作的RDD执行删除该祖先的RDD依赖
官方建议,做检查点的RDD做好先缓存在内存中,否则需要进行重新计算
7 Spark的Shuffle机制
Shuffle中文意思就是混洗,跟MapReduce中的Shuffle的思想是相同,就是数据的重新分区和组合
Spark的Shuffle是在stage的承接阶段完成的,前面的stage会根据后面stage的分区数来将数据按照一定的规则分成相应的bucket,然后写到磁盘上。后续的stage会从元数据那里获得指定数据的所在节点,将数据拉取本地做进一步的操作
Shuffle分为两个阶段
- Shuffle Write :Shuffle的第一步骤,就是将前一个stage中的数据写到磁盘,用于第二步的Fetch
- 主要是在ShuffleMapTask中执行(runTask)方法
- 如果设置了map端进行聚合的话,那么会先执行数据在map的合并,减少网络传输
- Consolidate Shuffle相比普通Shuffle的优势在:普通shuffle产生的Shuffle文件数为map任务数 x Reduce任务数,而Consolidate Shuffle的理论Shuffle文件数为Spark Core数 x Reducer数,而概念上C_Shuffle引入了文件数组,一个Bucket不再对应一个文件,而是对应文件中的一个segment
- Shuffle Fetch:通过本地或者远程抓取需要的数据到本节点进行计算,完成操作如保存数据到指定路径或者作为下一个Shuffle的Shuffle Write
Shuffle Aggregator并不会对所有情况下的数据进行排序,所以Aggregator分为不需要外排和需要外排两种方式
- 不需要外排:数据全部保存在内存当中,使用AppendOnlyMap来进行数据的存储,数据是来一个处理一个,更新到map中,所以reducer的内存必须足够大,能够存储该分区的所有key和count的值
- 需要外排:reduce类型的操作,内存没办法存放所有的key-value对,必须借助外部磁盘
在Reduce端,各个Task会并发启动多个线程同时从多个Map Task端拉取数据。由于Reduce阶段的主要任务是对数据进行按组规约。也就是说,需要将数据分成若干组,以便以组为单位进行处理。大家知道,分组的方式非常多,常见的有:Map/HashTable(key相同的,放到同一个value list中)和Sort(按key进行排序,key相同的一组,经排序后会挨在一起),这两种方式各有优缺点,第一种复杂度低,效率高,但是需要将数据全部放到内存中,第二种方案复杂度高,但能够借助磁盘(外部排序)处理庞大的数据集。Spark前期采用了第一种方案,而在最新的版本中加入了第二种方案, MapReduce则从一开始就选用了基于sort的方案。
摘抄自董西成的文章链接如下http://dongxicheng.org/framework-on-yarn/apache-spark-shuffle-details/