TaskManager启动后,JobManager会与它建立连接,并将作业图(JobGraph)转换成可执行的执行图(ExecutionGraph)分发给可用的TaskManager,然后由TaskManager具体执行任务。
作业管理器(JobManager)
对于一个提交执行的作业,Jobmanager是管理者(Master),负责管理调度,在不考虑高可用的情况下只能有一个。JobManager是一个Flink集群中任务管理和调度的核心,是控制应用执行的主程序。即每个以用都应该被唯一的JobManager所控制执行。
JobManager包括3个组件
JobMaster
jobMaster是JobManager中最核心的组件,负责处理单独的作业。jobMaster和具体的job是一一对应的,多个job可以同时运行在一个Flink集群中,每个job都有一个自己的jobMaster
作业提交时,jobMaster会先接收到要执行的应用。jobMaster会把JobGraph转换成一个物理层面的数据流图,这个图被称作“执行图”,它包含了所有可以并发执行的任务。jobMaster会向资源管理器发出请求,申请执行任务必要的资源。一旦获取到了足够的资源,就会将执行图发到真正运行它们的TaskManager上。
在运行过程中,JobMaster会负责所有需要中央协调的操作。
ResourceManager(资源管理器)
ResourceManager主要负责资源的分配和管理,在Flink集群中只有一个。所谓“资源”,主要指的是TaskManager的任务槽(task Slots)。任务槽就是Flink集群中的资源调配单元,包含了机器用来执行计算的一组CPU和内存资源。每一个任务(task)都需要分配到一个slot上执行。
在Standalone部署时,因为TaskManager是单独启动的,所以ResourceManager只能分发可用TaskManager的任务槽,不能启动新的TaskManager。
在有资源管理平台时,当新的作业申请资源时,ResourceManager会将有空闲的槽位的TaskManager分配给JobManager。如果ResourceManager没有足够的任务槽,它还可以向资源提供平台发起会话,请求提供启动TaskManager进程的容器。另外,ResourceManager还负责停掉空闲的TaskManager,释放计算资源。
Dispatcher(分发器)
Dispatcher主要负责提供一个REST接口,用来提交作业,并且负责每一个新提交的作业启动一个新的JobManager组件。Dispatcher也会启动一个Web UI,用来方便地展示监控作业执行的信息。
任务管理器(TaskManager)
TaskManager是工作者(Worker、Slave),负责执行任务处理数据,所以可以有一个或多个。
TaskManager是Flink中的工作进程,负责数据流的具体计算任务。Flink集群中必须至少有一个TaskManaager;每个TaskManager都包含了一定数量的任务槽。Slot是资源调度的最小单位,slots的数量限制了TaskManager能够并行处理的任务数量。
启动之后,TaskManager会将一个或多个槽位提供给JobMaster调用,jobMaster就可以分配任务来执行。
在执行过程中,TaskManager可以缓冲数据,还可以跟其它运行同一应用的TaskManager交换数据。
作业提交流程
独立模式
独立模式下,只有会话模式和应用模式两种部署方式。两者流程基本类似:TaskManager都需要手动启动,所以当ResourceManager收到JobMaster的请求时,会直接要求TaskManager提供资源。而JobMaster的启动时间点,会话模式是预先启动,应用模式则是在作业提交时启动。
1. 由客户端通过分发器提供的REST接口,将作业提交给JobManager
2. 由分发器启动 JobMaster,并将作业(包含 JobGraph)提交给 JobMaster。
3. JobManager将JobGraph解析为可执行的ExecutionGraph,得到所需的资源数量,然后再向资源管理器请求任务资源槽。
4. 资源管理器判断当前是否有足够的资源;如果没有则启动新的TaskManager。
5. TaskManager启动后,向ResourceManager注册自己可用的任务槽。
6. 资源管理器通知TaskManager为新的作业提供slots。
7. TaskManager连接到对应的JobMaster,提供slots
8. jobMaster将需要执行的任务分发给TaskManager。
9. TaskManager执行任务。
Yarn集群
-
会话模式 提交作业流程
1. 客户端通过REST接口,将作业提交给分发器
2. 分发器启动JobMaster,并将作业(包含JobGraph)提交给JobMaster
3. JobMaster向资源管理器请求资源
4. 资源管理器向Yarn的资源管理器请求container资源
5. Yarn启动新的TaskManager容器
6. TaksManager启动后,向Flink的资源管理器注册自己的可用任务槽
7. 资源管理器通知TaskManager为新的作业提供任务槽
8. TaskManager连接到对应的JobMaster,提供任务槽
9. JobMaster将需要执行的任务分发到TaskManager,执行任务
-
单作业模式提交作业流程
1. 客户端将作业提交给YARN的资源管理器,这一步中会同时将Flink的Jar包和配置上传到HDFS,以便后续启动的Flink相关组件的容器
2. YARN的资源管理器分配容器资源,启动Flink JobManager,并将作业提交给JobMaster
3. JobMaster向资源管理器请求资源
4. 资源管理器请YARN的资源管理器请求容器
5. YARN启动新的TakManager容器
6. TaskManager启动后,向Flink的任务管理器注册自己的可用任务槽
7. 资源管理器通知TaskManager为新的任务提供任务槽
8. TaskManager连接到对应的JobMaster,提供任务槽
9. JobManager将需要执行的任务分发给TaskManager,执行任务
-
应用模式
应用模式与单作业模式的提交流程类似,只是初始提交给YARN资源管理器的不再是具体的作业,而是整个应用。一个应用中可能包含多个作业,这些作业都在Flink集群中启动各自对应的JobMaster。
重要概念
数据流图(Dataflow Graph)
Flink中每一个处理转换操作称为“算子”。
所有的Flink程序都可以归纳为由三部分构成:Source、Transformation和Sink
- Source 表示“元算子”,负责读取数据源。
- Transformation表示“转换算子”,利用各种算子进行加工处理
- Sink表示“下沉算子”,负责数据的输出
在运行时,Flink程序会被映射成所有算子按照逻辑顺序链接在一起的一张图,被称为“逻辑数据流”(数据流图)。
并行度
一个特定算子的子任务(subtask)的个数被称之为其并行度(parallelism)。包含并行子任务的数据流,就是并行数据流,它需要多个分区(stream partition)来分配并行任务。一般情况下,一个流程序的并行度,可以认为就是其所有算子中最大的并行度。一个程序中,不同的算子可能具有不同的并行度
并行度设置
Flink中,可以用不同的方法来设置并行度,它们的有效范围和优先级是不同的。
- 代码中设置
stream.map((_, 1)).setParallelism(2)
注:KeyBy()方法返回的不是算子,所以无法对KeyBy()设置并行度
2. 提交作业时设置
bin/flink run -p 2 -c com.wc.StreamWordCount ./FlinkWc-1.0.jar
- 配置文件中设置
修改集群的配置文件flink-conf.yaml中直接更改默认并行度
parallelism.default: 2
对整个集群上提交的所有作业有效,默认为1。
算子连
在Flink中,并行度相同的一对一算子操作,可以直接连接在一起 形成一个“大”的任务,这样原本的算子就成为真正任务里的一部分,每个任务会被一个线程执行。这样的技术被称为“算子链”。
算子间数据传输
- 一对一
这种模式下,数据流维护着分区以及元素的顺序。Source算子读取数据之后,可以直接发送给 map()算子做处理,它们之间不需要重新分区,也不需要调整数据的顺序。这就意味着 map() 算子的子任务,看到的元素个数和顺序跟 Source 算子的子任务产生的完全一样,保证着“一对一”的关系。map()、filter()、flatMap()等算子都是这种one-to-one 的对应关系 - 重分区
每一个算子的子任务,会根据数据传输的策略,把数据发送到不同的下游目标任务。例如,keyBy()是分组操作,本质上基于键(key)的哈希值(hashCode)进行了重分区;而当并行度改变时,比如从并行度为 2 的 window 算子,要传递到并行度为 1 的 Sink 算子,这时的数据传输方式是再平衡(rebalance),会把数据均匀地向下游子任务分发出去。这些传输方式都会引起重分区(redistribute)的过程
合并算子链
Flink 默认会按照算子链的原则进行链接合并,如果我们想要禁止合并或者自行定义,也可以在代码中对算子做一些特定的设置:
.map((_,1)).disableChaining()
.map((_,1)).startNewChain()
作业图与执行图
Flink 中任务调度执行的图,按照生成顺序可以分成四层:
逻辑流图(StreamGraph)→ 作业图(JobGraph)→ 执行图(ExecutionGraph)→ 物理图(Physical Graph)
- 逻辑流图
根据用户通过DataStream API编写的代码生的最初的DAG图,用来表示程序的拓扑结构 - 作业图
逻辑流图经过优化之后生成的就是作业图,这是提交给JobManager的数据结构,确定了当前作业中所有任务的划分。主要的优化为:将多个符合条件的节点连接在一起合并成一个任务节点,形成算子链,这样可以减少数据交换的消耗。作业图一般在客户端生成,在作业提交时传递给JobMaster。 - 执行图
JobMaster收到作业图后,会根据它来生成执行图。执行图时作业图的并行化版本,是调度最核心的数据结构。 - 物理图
JobMaster生成执行图后,会将它分发给TaskManager;各个TaskManager会根据执行图部署任务,最终的物理执行过程也会形成一张“图”,一般就叫做物理图,这只是具体执行层面的图,并不是一个具体的数据结构
任务和任务槽
任务槽
Flink中每一个work都是一个JVM进程,它可以启动多个独立的线程,来并行执行多个任务。
为了控制并发量,需要在TaskManager上对每个任务运行所占用的资源做出明确的划分,这就是所谓的任务槽。每个任务槽其实表示了TaskManager拥有计算资源的一个固定大小的子集。
任务槽数量的设置
可以通过集群的配置文件来设定TaskManager的槽数量
taskmanager.numberOfTaskSlots: 8
通过调整槽的数量,可以控制子任务间的隔离级别。槽目前只能用来隔离内存,不会涉及CPU隔离
任务对任务槽的共享
默认情况下,Flink允许子任务共享槽。如果希望某个算子对应的任务完全独占一个槽,或者只有某一部分共享槽,可以通过设置“槽共享组”手动指定
.map((_,1)).slotSharingGroup("1")
这样属于同一个槽共享组的子任务,才会开启槽共享;不同组之间的任务时完全隔离的,必须分配到不同的槽上
任务槽和并行度的关系
任务槽时静态的概念,是指TaskManager具有的并发执行能力,可以通过参数taskmanager…numberOfTaskSlots进行设置;而并行度是动态概念,就是TaskManager运行程序时实际使用的并发能力,可以通过参数parallelism.default进行设置。