热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

flinkonyarn、资源及状态管理

flink源码学习:http:www.cnblogs.combethunebtjp9168274.htmlflinkonyarn执行任务的两种方式1.yarn-session.sh



flink源码学习:

http://www.cnblogs.com/bethunebtj/p/9168274.html


flink on yarn 执行任务的两种方式

1.yarn-session.sh(开辟资源)+flink run(提交任务)

1.在yarn中起一个守护进程,用于启动多个job,即一个application master 管理多个job
2.启动命令:
./yarn-session.sh -n 4 -jm 1024 -tm 5120 -s 5 -nm yarn-session-jobs -d
参数说明:
(1) -n : 指定number of task manager,指定taskmanager个数
(2) -jm: jobmanager所占用的内存数,单位为MB
(3) -tm: 指定每个taskmanager所占用的内存,单位为MB
(4) -s: 指定每个taskmanager可使用的cpu核数
(5) -nm: 指定Application的名称
(6) -d : 后台启动,session启动后,进程关闭

3.流程说明:
(1) 启动session 后,yarn首先会分配一个Container,用于启动APP master和jobmanager, 所占用内存为-jm指定的内存大小,cpu为1核
(2) 没有启动job之前,jobmanager是不会启动taskmanager的(jobmanager会根据job的并行度,即所占用的slots,来动态的分配taskmanager)
(3) 提交任务到APP master
./flink run -p 3 -yid application_id -d -c com.kn.rt.Test01 ~/jar/dw-1.0-SNAPSHOT.jar
用于启动一个job到指定的APP master中
参数说明:
#1.-p:指定任务的并行度,如果你在程序代码中指定了并行度的话,那么此处的并行度参数不起作用
#2.-yid:指定任务提交到哪一个application—id,默认是提交到本节点最新提交的一个application
#3.-c: job的主入口 + jar path
> 注:job参数要写在-c之前,不然指定参数不起作用...

1.2 flink run -m yarn-cluster(开辟资源+提交任务)

1.启动单个job,即单job单session,实现资源的完全隔离
2.启动job的脚本跟yarn-session 中有差异 ,通过指定 -m yarn-cluster,参数较session都带有-y
./flink run -m yarn-cluster -yn 2 -yjm 1024 -ytm 3076 -ys 3 -ynm yarn-cluster-1 -yqu root.default -c com.kn.rt.Test01 ~/jar/dw-1.0-SNAPSHOT.jar
参数说明:
#1.-m :yarn-cluster,代表启动单session提交单一job
#2.-yn:taskmanager个数
#3.-yjm:jobmanager的内存占用
#4.-ytm:每个taskmanager的内存占用
#5.-ys: 每个taskmanager可使用的CPU核数
#6.-ynm:application 名称
#7.-yqu:指定job的队列名称
#8.-c: 程序主入口+ jar path

1.3.flink on yarn设置参数注意事项

1.资源都统一有yarn来统一管理,配置-jm -tm -s时,要小于container配置的最大内存、最大CPU核数

1.4.flink-conf.yaml (flink on yarn 配置文件)

env.java.home: /usr/local/jdk1.8.0_181
recovery.mode: zookeeper
recovery.zookeeper.quorum: hadoop-bd1:2181,hadoop-bd2:2181,hadoop-bd3:2181
recovery.zookeeper.path.root: /data/dw/flink
recovery.zookeeper.path.namespace: /cluster_yarn
state.backend: filesystem
state.backend.fs.checkpointdir: hdfs://hadoop-bd1:8020/flink/dw/checkpoints
recovery.zookeeper.storageDir: hdfs://hadoop-bd1:8020/flink/dw/recovery
#taskmanager.network.numberOfBuffers: 64000
fs.hdfs.hadoopconf: /etc/hadoop/conf
taskmanager.heap.size: 1024m
taskmanager.network.memory.fraction: 0.1
taskmanager.network.memory.min: 64mb
taskmanager.network.memory.max: 1gb

flink 状态管理及容错

2.1 容错 checkpoint savepoint 机制

在程序中开启checkpoint机制

//开启checkpoint,并且设置消息消费机制为exactly_once,保证计算结果的准确性
env.enableCheckpointing(10000, CheckpointingMode.EXACTLY_ONCE)
//设置backend的文件存放方式:本地文件 or hdfs(常用)
// env.setStateBackend(new FsStateBackend("file:///data/www/check_point"))
env.setStateBackend(new FsStateBackend("hdfs://10.1.0.4:8020/user/mp")) //checkpoint文件保留到hdfs
// 设置checkpoint在cancel任务时,保留checkpoint文件
env.getCheckpointConfig.enableExternalizedCheckpoints(
ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
//设置checkpoint文件的保存版本最多为2
env.getCheckpointConfig.setMaxConcurrentCheckpoints(2)

2.2 在hdfs中查看checkpoint文件信息

(1)hdfs dfs -ls /user/mp #每一次启动一个任务,都会随机生成一个jobid
(2)根据jobid查看当前checkpoint版本信息
/user/mp/ea69f28ec72647727d7b4284ae494589
(3)查看check文件
/user/mp/ea69f28ec72647727d7b4284ae494589/chk-2035 (用于恢复job的checkpoint路径)
/user/mp/ea69f28ec72647727d7b4284ae494589/shared
/user/mp/ea69f28ec72647727d7b4284ae494589/taskowned
(4)其中chk-2035 存有_metadata

2.3 根据checkpoint文件恢复job

bin/flink run -s -yid <指定applicationid> -yjm -ytm -ys -d -c


(1)从指定的checkpoint版本恢复job
(2)提交任务到指定的application
(3)可以重新调整 jm tm slots 参数...

2.4 手动触发保存savepoint

(1)savepoint也可以保存在hdfs中
(2)bin/flink savepoint -yid
(3)eg:bin/flink savepoint ea69f28ec72647727d7b4284ae494589 hdfs:///user/mp/savepoint -yid application_1557217934896_0105
(4)查看hdfs的savepoint文件:/user/mp/savepoint/savepoint-ea69f2-c740c68b737c,其中带有_metadata文件,用于恢复job

2.5 通过savepoint恢复任务

bin/flink run -d -s -yid -yjm -ytm -c


eg:bin/flink run -d -s hdfs:///user/mp/savepoint/savepoint-ea69f2-c740c68b737c -yid application_1557217934896_0105 -c com.kn.rt.KafkaWordCount /data/www/check_point/flink-checkpoint-1.0-SNAPSHOT.jar

2.6 初始化状态及快照管理

往往在新起一个job时,需要迭代计算外部存储的数据作为初始化

实现CheckPointedFunction接口类,实现initializeState接口,即可初始化state数据

继承RichSinkFunction 方法的同时,通过with 关键字继承CheckpointedFunction

class MySink extends RichSinkFunction[Tuple2[String,Long]] with CheckpointedFunction{
private var conn: COnnection= _
private var ps: Statement = _
override def open(parameters: Configuration): Unit = {
super.open(parameters)
try {
val prop = new Properties
try
prop.load(classOf[MysqlSink].getClassLoader.getResourceAsStream("db.properties"))
catch {
case e: IOException =>
e.printStackTrace()
}
val msyql_driver = prop.get("mysql_driver").asInstanceOf[String]
val msyql_url = prop.get("mysql_url").asInstanceOf[String]
val msyql_user = prop.get("mysql_user").asInstanceOf[String]
val msyql_passwd = prop.get("mysql_passwd").asInstanceOf[String]
Class.forName(msyql_driver)
cOnn= DriverManager.getConnection(msyql_url, msyql_user, msyql_passwd)
ps = conn.createStatement()
} catch {
case e: Exception =>
// logger.error("jdbc exception:{}",e);
e.printStackTrace()
}
}
override def invoke(value: (String, Long), context: SinkFunction.Context[_]): Unit = {
super.invoke(value, context)
ps.execute("insert into result1(word,num) values ('" + value._1 + "'," + value._2 + ")")
}
override def close(): Unit = {
super.close()
if (ps != null){
ps.close()
}
if (conn != null){
conn.close()
}
}
//初始化state数据,该方法在首次初始化时调用,或者在恢复前一个快照的时候执行
override def initializeState(context: FunctionInitializationContext): Unit = {
val state = context.getKeyedStateStore.getState(new ValueStateDescriptor[Tuple2[String,Long]]("wordCount",
TypeInformation.of(new TypeHint[Tuple2[String,Long]](){})))

}
//该方法在每次执行快照的时候执行
override def snapshotState(context: FunctionSnapshotContext): Unit = ???
}

3. flink资源管理

3.1 flink 先了解以下几个概念


3.1.1 JobManager


(1)jobmanager 主要是负责接收客户端的job,调度job,协调checkpoint等

(2)无论是standalone 还是flink on yarn 模式都会启动jobmanager 和taskmanager



3.1.2 TaskManager


(1)TaskManager执行具体的Task,也就是具体做事的

(2)taskmanager 对资源进行了隔离,引入了slot的概念

(3)taskmanager 和 Jobmanager通讯采用actor的机制



3.1.3 job


(1)client 提交的任务



3.1.4 task及subtask


(1)先介绍什么是subtask?每个算子的一个并行化实例就是一个subtask,多个subtask可以通过chain操作合成一个task,再作为一个整体task来调度执行,(task采用单独的线程,即链式操作中的执行是串行的)

(2)solt是一个jvm进程,一个solt中可以执行多个task(线程) ,同一个jvm中的task,可以共享TCP和心跳信息,减少网络传输,共享一些数据结构,在一定程度上减少task的消耗

(3)同一个job的不同task的subtask才能共享slot



3.1.5 slot 及 shared slot


(1)slots代表着TaskManager的一个固定的资源子集,一个taskmanager有3个solts,那么每个solt平分1/3的托管内存

(2)将资源solt化,意味着来自不同job的task不会为了内存而竞争,每个task拥有一定内存的储备;需要注意的是,slot只是隔离了内存,但没有隔离cpu

(3)尽可能地让多个task共享一个slot (SlotSharingGroup)



3.2 flink 资源分配与并行度管理(runtime)

[外链图片转存失败(img-2fO7ute0-1562289945279)(https://ci.apache.org/projects/flink/flink-docs-release-1.4/fig/tasks_chains.svg)]


(1)如上图:我们的数据流操作为source(p=2) -> map(p=2) -> keyby、window、apply(p=2) -> sink (p=1)

(2)source、map等非密集型操作,其并行度为2,所以我们可以将source和map链在一起,tm会在2个solt中分别启用task1: source[1]-> map[1]; task2: source[2] -> map[2]

(3)keyby、window、apply等密集型操作,并行度为2,需要做shuffle操作,涉及到slot间数据共享,会在上面的2个solt中分别启用 task3: keyby[1] 和task4: keyby[2] 两个task

(4)sink操作,并行度为1,那么只需要solt1中启用task5: sink[1] 即可,数据汇总做sink操作

(5)那么slot1中含有整个数据pipeline,solt1中有3个task:task1,task3,task5 ;slot2:task2 task4

(6)合理设置算子并行度



3.2.1 并行度


(1)slot是静态的概念,是指taskmanager具有的并发执行能力

(2)parallelism是动态的概念,是指程序运行时实际使用的并发能力,并行度不能大于可用solt数

(3)设置parallelism有多中方式,优先级为api>env>p>file

(4)Flink 集群所需的taskslots数与job中最高的并行度一致

image

备注:(这个task slot运行图,跟上图中的任务分配不一致仅供理解…)




推荐阅读
  • 2018深入java目标计划及学习内容
    本文介绍了作者在2018年的深入java目标计划,包括学习计划和工作中要用到的内容。作者计划学习的内容包括kafka、zookeeper、hbase、hdoop、spark、elasticsearch、solr、spring cloud、mysql、mybatis等。其中,作者对jvm的学习有一定了解,并计划通读《jvm》一书。此外,作者还提到了《HotSpot实战》和《高性能MySQL》等书籍。 ... [详细]
  • Hibernate延迟加载深入分析-集合属性的延迟加载策略
    本文深入分析了Hibernate延迟加载的机制,特别是集合属性的延迟加载策略。通过延迟加载,可以降低系统的内存开销,提高Hibernate的运行性能。对于集合属性,推荐使用延迟加载策略,即在系统需要使用集合属性时才从数据库装载关联的数据,避免一次加载所有集合属性导致性能下降。 ... [详细]
  • Servlet多用户登录时HttpSession会话信息覆盖问题的解决方案
    本文讨论了在Servlet多用户登录时可能出现的HttpSession会话信息覆盖问题,并提供了解决方案。通过分析JSESSIONID的作用机制和编码方式,我们可以得出每个HttpSession对象都是通过客户端发送的唯一JSESSIONID来识别的,因此无需担心会话信息被覆盖的问题。需要注意的是,本文讨论的是多个客户端级别上的多用户登录,而非同一个浏览器级别上的多用户登录。 ... [详细]
  • Kylin 单节点安装
    软件环境Hadoop:2.7,3.1(sincev2.5)Hive:0.13-1.2.1HBase:1.1,2.0(sincev2.5)Spark(optional)2.3.0K ... [详细]
  • Java序列化对象传给PHP的方法及原理解析
    本文介绍了Java序列化对象传给PHP的方法及原理,包括Java对象传递的方式、序列化的方式、PHP中的序列化用法介绍、Java是否能反序列化PHP的数据、Java序列化的原理以及解决Java序列化中的问题。同时还解释了序列化的概念和作用,以及代码执行序列化所需要的权限。最后指出,序列化会将对象实例的所有字段都进行序列化,使得数据能够被表示为实例的序列化数据,但只有能够解释该格式的代码才能够确定数据的内容。 ... [详细]
  • 海马s5近光灯能否直接更换为H7?
    本文主要介绍了海马s5车型的近光灯是否可以直接更换为H7灯泡,并提供了完整的教程下载地址。此外,还详细讲解了DSP功能函数中的数据拷贝、数据填充和浮点数转换为定点数的相关内容。 ... [详细]
  • RouterOS 5.16软路由安装图解教程
    本文介绍了如何安装RouterOS 5.16软路由系统,包括系统要求、安装步骤和登录方式。同时提供了详细的图解教程,方便读者进行操作。 ... [详细]
  • Android工程师面试准备及设计模式使用场景
    本文介绍了Android工程师面试准备的经验,包括面试流程和重点准备内容。同时,还介绍了建造者模式的使用场景,以及在Android开发中的具体应用。 ... [详细]
  • Asp.net Mvc Framework 七 (Filter及其执行顺序) 的应用示例
    本文介绍了在Asp.net Mvc中应用Filter功能进行登录判断、用户权限控制、输出缓存、防盗链、防蜘蛛、本地化设置等操作的示例,并解释了Filter的执行顺序。通过示例代码,详细说明了如何使用Filter来实现这些功能。 ... [详细]
  • MySQL数据库锁机制及其应用(数据库锁的概念)
    本文介绍了MySQL数据库锁机制及其应用。数据库锁是计算机协调多个进程或线程并发访问某一资源的机制,在数据库中,数据是一种供许多用户共享的资源,如何保证数据并发访问的一致性和有效性是数据库必须解决的问题。MySQL的锁机制相对简单,不同的存储引擎支持不同的锁机制,主要包括表级锁、行级锁和页面锁。本文详细介绍了MySQL表级锁的锁模式和特点,以及行级锁和页面锁的特点和应用场景。同时还讨论了锁冲突对数据库并发访问性能的影响。 ... [详细]
  • 微信官方授权及获取OpenId的方法,服务器通过SpringBoot实现
    主要步骤:前端获取到code(wx.login),传入服务器服务器通过参数AppID和AppSecret访问官方接口,获取到OpenId ... [详细]
  • 本文总结了初学者在使用dubbo设计架构过程中遇到的问题,并提供了相应的解决方法。问题包括传输字节流限制、分布式事务、序列化、多点部署、zk端口冲突、服务失败请求3次机制以及启动时检查。通过解决这些问题,初学者能够更好地理解和应用dubbo设计架构。 ... [详细]
  • struts2重点——ValueStack和OGNL
    一、值栈(ValueStack)1.实现类:OGNLValueStack2.对象栈:CompoundRoot( ... [详细]
  • 开发笔记:读《分布式一致性原理》JAVA客户端API操作2
    篇首语:本文由编程笔记#小编为大家整理,主要介绍了读《分布式一致性原理》JAVA客户端API操作2相关的知识,希望对你有一定的参考价值。创 ... [详细]
  • 前言折腾了一段时间hadoop的部署管理,写下此系列博客记录一下。为了避免各位做部署这种重复性的劳动,我已经把部署的步骤写成脚本,各位只需要按着本文把脚本执行完,整个环境基本就部署 ... [详细]
author-avatar
傻孩纸黄国帅哟
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有