热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

开发笔记:FlinkFlink的slotSharingGroup有什么用

篇首语:本文由编程笔记#小编为大家整理,主要介绍了FlinkFlink的slotSharingGroup有什么用相关的知识,希望对你有一定的参考价值。

篇首语:本文由编程笔记#小编为大家整理,主要介绍了FlinkFlink 的 slotSharingGroup 有什么用相关的知识,希望对你有一定的参考价值。






在这里插入图片描述


1.概述

转载并且补充:Flink控制任务调度:作业链与处理槽共享组(slot-sharing-group)

为了实现并行执行,Flink应用会将算子划分为不同任务,然后将这些任务分配到集群中的不同进程上去执行。和很多其他分布式系统一样,Flink应用的性能很大程度上取决于任务的调度方式。任务被分配到的工作进程、任务间的共存情况以及工作进程中的任务数都会对应用的性能产生显著影响。本节中我们就讨论一下如何通过调整默认行为以及控制作业链与作业分配(处理槽共享组)来提高应用的性能。

其实这两个概念我们可以看作:资源共享链与资源共享组。当我们编写完一个Flink程序,从Client开始执行——>JobManager——>TaskManager——>Slot启动并执行Task的过程中,会对我们提交的执行计划进行优化,其中有两个比较重要的优化过程是:任务链与处理槽共享组,前者是对执行效率的优化,后者是对内存资源的优化


2.执行过程

在这里插入图片描述
Chain:Flink会尽可能地将多个operator链接(chain)在一起形成一个task pipline。每个task pipline在一个线程中执行

优点:它能减少线程之间的切换,减少消息的序列化/反序列化,减少数据在缓冲区的交换(即降低本地数据交换成本),减少了延迟的同时提高整体的吞吐量

这一点请参考:【Flink】 collector 非常慢 一次尴尬的 排查错误方向 chain 与 不chain 的 区别

概述:在StreamGraph转换为JobGraph过程中,关键在于将多个 StreamNode 优化为一个 JobVertex,对应的 StreamEdge 则转化为 JobEdge,并且 JobVertex 和 JobEdge 之间通过 IntermediateDataSet (中间数据集)形成一个生产者和消费者的连接关系。每个JobVertex就是JobManger的一个任务调度单位(任务Task)。为了避免在这个过程中将关联性很强的几个StreamNode(算子)放到不同JobVertex(Task)中,从而导致因为Task执行产生的效率问题(数据交换(网络传输)、线程上下文切换),Flink会在StreamGraph转换为JobGraph过程中将可以优化的算子合并为一个算子链(也就是形成一个Task)。这样就可以把这条链上的算子放到一个线程中去执行,这样就提高了任务执行效率。

可见,StreamGraph转换为JobGraph过程中,实际上是逐条审查每一个StreamEdge和该SteamEdge两头连接的两个StreamNode的特性,来决定该StreamEdge两头的StreamNode是不是可以合并在一起形成算子链。这个判断过程flink给出了明确的规则,我们看一下StreamingJobGraphGenerator中的isChainable()方法:

public static boolean isChainable(StreamEdge edge, StreamGraph streamGraph) {
/** 获取StreamEdge的源和目标StreamNode */
StreamNode upStreamVertex = streamGraph.getSourceVertex(edge);
StreamNode downStreamVertex = streamGraph.getTargetVertex(edge);
/** 获取源和目标StreamNode中的StreamOperator */
StreamOperatorFactory<?> headOperator &#61; upStreamVertex.getOperatorFactory();
StreamOperatorFactory<?> outOperator &#61; downStreamVertex.getOperatorFactory();
/**
* 1、下游节点只有一个输入
* 2、下游节点的操作符不为null
* 3、上游节点的操作符不为null
* 4、上下游节点在一个槽位共享组内
* 5、下游节点的连接策略是 ALWAYS
* 6、上游节点的连接策略是 HEAD 或者 ALWAYS
* 7、edge 的分区函数是 ForwardPartitioner 的实例
* 8、上下游节点的并行度相等
* 9、可以进行节点连接操作
*/

//如果边的下游流节点的入边数目为1&#xff08;也即其为单输入算子&#xff09;
return downStreamVertex.getInEdges().size() &#61;&#61; 1
//边的下游节点对应的算子不为null
&& outOperator !&#61; null
//边的上游节点对应的算子不为null
&& headOperator !&#61; null
//边两端节点有相同的槽共享组名称
&& upStreamVertex.isSameSlotSharingGroup(downStreamVertex)
//边下游算子的链接策略为ALWAYS
&& outOperator.getChainingStrategy() &#61;&#61; ChainingStrategy.ALWAYS
&& (headOperator.getChainingStrategy() &#61;&#61; ChainingStrategy.HEAD ||
//上游算子的链接策略为HEAD或者ALWAYS
headOperator.getChainingStrategy() &#61;&#61; ChainingStrategy.ALWAYS)
//边的分区器类型是ForwardPartitioner
&& (edge.getPartitioner() instanceof ForwardPartitioner)
&& edge.getShuffleMode() !&#61; ShuffleMode.BATCH
//上下游节点的并行度相等
&& upStreamVertex.getParallelism() &#61;&#61; downStreamVertex.getParallelism()
//当前的streamGraph允许链接的
&& streamGraph.isChainingEnabled();
}

3.处理槽共享组

处理槽共享组(出于某中目的将多个Task放到同一个slot中执行)


3.1 Task Slot

TaskManager 是一个 JVM 进程&#xff0c;并会以独立的线程来执行一个task。为了控制一个 TaskManager 能接受多少个 task&#xff0c;Flink 提出了 Task Slot 的概念&#xff0c;通过 Task Slot 来定义Flink 中的计算资源。solt 对TaskManager内存进行平均分配&#xff0c;每个solt内存都相同&#xff0c;加起来和等于TaskManager可用内存&#xff0c;但是仅仅对内存做了隔离&#xff0c;并没有对cpu进行隔离将资源 slot 化意味着来自不同job的task不会为了内存而竞争&#xff0c;而是每个task都拥有一定数量的内存储备

通过调整 task slot 的数量&#xff0c;用户可以定义task之间是如何相互隔离的。每个 TaskManager 有一个slot&#xff0c;也就意味着每个task运行在独立的 JVM 中。每个 TaskManager 有多个slot的话&#xff0c;也就是说多个task运行在同一个JVM中。而在同一个JVM进程中的task&#xff0c;可以共享TCP连接&#xff08;基于多路复用&#xff09;和心跳消息&#xff0c;可以减少数据的网络传输。也能共享一些数据结构&#xff0c;一定程度上减少了每个task的消耗。


3.2 共享槽

问题&#xff1a;

一个TaskManager中至少有一个插槽slot&#xff0c;每个插槽均分内存并且之间是内存隔离的&#xff0c;但是共享CPU。算子根据计算复杂度可以分为资源密集型与非资源密集型算子&#xff08;可以认为有的算子计算时内存需求大&#xff0c;有些算子内存需求小&#xff09;。现在有这么个情况&#xff1a;某个Job下的Tasks中既有资源密集型Task&#xff08;A&#xff09;&#xff0c;又有非资源密集型Task&#xff08;B&#xff09;&#xff0c;他们被分到不同的slot上&#xff0c;这就会产生问题&#xff1a;


  • 有的slot内存使用率大&#xff0c;有的slot内存使用率小&#xff0c;这样就很不公平&#xff0c;一个槽资源没有得到充分的利用&#xff1b;
  • 对于槽资源有限的情况&#xff0c;任务并行度也不高。

解决方案

在这里插入图片描述

默认情况下&#xff0c;Flink 允许subtasks共享slot&#xff0c;条件是它们都来自同一个Job的不同task的subtask。结果可能一个slot持有该job的整个pipeline。允许槽共享&#xff0c;会有以下两个方面的好处&#xff1a;


  • 对于slot有限的场景&#xff0c;我们可以增大每个task的并行度。比如如果不设置SlotSharingGroup&#xff0c;默认所有task在同一个共享组&#xff08;可以共享所有slot&#xff09;&#xff0c;那么Flink集群需要的任务槽与作业中使用的最高并行度正好相同。但是如上图所示&#xff0c;如果我们强制指定了map的slot共享组为test&#xff0c;那么map和map下游的组为test&#xff0c;map的上游source的共享组为默认的default&#xff0c;此时default组中最大并行度为10&#xff0c;test组中最大并行度为20&#xff0c;那么需要的Slot&#61;10&#43;20&#61;30&#xff1b;

  • 能更好的利用资源&#xff1a;如果没有slot共享&#xff0c;那些资源需求不大的map/source/flatmap子任务将和资源需求更大的window/sink占用相同的资源&#xff0c;槽资源没有充分利用&#xff08;内存没有充分利用&#xff09;。


3.2.1 具体共享机制实现

Flink决定哪些任务需要共享slot 以及哪些任务必须放入特定slot。虽然task共享Slot提升资源利用率&#xff0c;但是如果一个Slot中容纳过多task反而会造成资源低下&#xff08;比如极端情况下所有task都分布在一个Slot内&#xff09;。所以在Flink中task需要按照一定规则共享Slot &#xff0c;主要通过SlotSharingGroup和CoLocationGroup定义&#xff1a;


  • CoLocationGroup&#xff1a;强制将subTasksk放到同一个slot中&#xff0c;是一种硬约束&#xff1a;


    • 保证把JobVertices的第n个运行实例和其他相同组内的JobVertices第n个实例运作在相同的slot中&#xff08;所有的并行度相同的subTasks运行在同一个slot &#xff09;&#xff1b;
    • 主要用于迭代流(训练机器学习模型) &#xff0c;用来保证迭代头与迭代尾的第i个subtask能被调度到同一个TaskManager上。
  • SlotSharingGroup: 它是Flink中用来实现slot共享的类&#xff0c;尽可能的允许不同的JobVertices部署在相同的Slot中&#xff0c;但这是一种宽约束&#xff0c;只是尽量做到不能完全保证。


    • 算子的默认group为default&#xff0c;所有任务可以共享同一个slot&#xff1b;
    • 要想确定一个未做SlotSharingGroup设置的算子的group是什么&#xff0c;可以根据上游算子的 group 和自身是否设置 group共同确定&#xff08;也就是说如果下游算子没有设置分组&#xff0c;它继承上游算子的分组&#xff09;&#xff1b;
    • 为了防止不合理的共享&#xff0c;用户可以通过提供的API强制指定operator的共享组。因为不合理的共享槽资源&#xff08;比如默认情况下所有任务共享所有的slot&#xff09;会导致每个槽中运行的线程述增多&#xff0c;增加了机器负载。所以适当设置可以减少每个slot运行的线程数&#xff0c;从而整体上减少机器的负载。比如&#xff1a; someStream.filter(...).slotSharingGroup("group1")就强制指定了filter的slot共享组为group1。

3.2.2 案例

&#64;Test
public void slotSharingGroupTest() throws Exception {
Configuration configuration &#61; new Configuration();
configuration.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER,true);
configuration.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY,7088);
StreamExecutionEnvironment env &#61; StreamExecutionEnvironment.createLocalEnvironment(1,configuration);
env.getConfig().enableObjectReuse();
DataStream<String> text &#61; env.socketTextStream("localhost", 9992, "\\n");
SingleOutputStreamOperator<String> map &#61; text.map(new MapFunction<String, String>() {
&#64;Override
public String map(String value) throws Exception {
return value;
}
});
SingleOutputStreamOperator<String> filter &#61; map.filter(new FilterFunction<String>() {
&#64;Override
public boolean filter(String value) throws Exception {
return true;
}
}).slotSharingGroup("group_03");
SingleOutputStreamOperator<String> bb &#61; filter.map(new MapFunction<String, String>() {
&#64;Override
public String map(String value) throws Exception {
return value;
}
});
SingleOutputStreamOperator<String> cc &#61; bb.filter(new FilterFunction<String>() {
&#64;Override
public boolean filter(String value) throws Exception {
return true;
}
}).slotSharingGroup("group_04");
SingleOutputStreamOperator<String> dd &#61; cc.map(new MapFunction<String, String>() {
&#64;Override
public String map(String value) throws Exception {
return value;
}
});
dd.print();
env.execute("xxx");
}

可以看到运行结果如下
在这里插入图片描述
每个slotSharingGroup不能互相chain在一起&#xff0c;每个slotSharingGroup内部的算子可以chain在一起。

但是有些疑问&#xff1a;


  1. 分开做性能很好嘛&#xff1f;

  2. 我们一般都是默认的&#xff0c;正确所有的chain在一起。这样效率更高吗&#xff1f;






推荐阅读
  • 本文介绍了在rhel5.5操作系统下搭建网关+LAMP+postfix+dhcp的步骤和配置方法。通过配置dhcp自动分配ip、实现外网访问公司网站、内网收发邮件、内网上网以及SNAT转换等功能。详细介绍了安装dhcp和配置相关文件的步骤,并提供了相关的命令和配置示例。 ... [详细]
  • Java容器中的compareto方法排序原理解析
    本文从源码解析Java容器中的compareto方法的排序原理,讲解了在使用数组存储数据时的限制以及存储效率的问题。同时提到了Redis的五大数据结构和list、set等知识点,回忆了作者大学时代的Java学习经历。文章以作者做的思维导图作为目录,展示了整个讲解过程。 ... [详细]
  • 阿,里,云,物,联网,net,core,客户端,czgl,aliiotclient, ... [详细]
  • 本文介绍了Web学习历程记录中关于Tomcat的基本概念和配置。首先解释了Web静态Web资源和动态Web资源的概念,以及C/S架构和B/S架构的区别。然后介绍了常见的Web服务器,包括Weblogic、WebSphere和Tomcat。接着详细讲解了Tomcat的虚拟主机、web应用和虚拟路径映射的概念和配置过程。最后简要介绍了http协议的作用。本文内容详实,适合初学者了解Tomcat的基础知识。 ... [详细]
  • HDFS2.x新特性
    一、集群间数据拷贝scp实现两个远程主机之间的文件复制scp-rhello.txtroothadoop103:useratguiguhello.txt推pushscp-rr ... [详细]
  • 本文讨论了在openwrt-17.01版本中,mt7628设备上初始化启动时eth0的mac地址总是随机生成的问题。每次随机生成的eth0的mac地址都会写到/sys/class/net/eth0/address目录下,而openwrt-17.01原版的SDK会根据随机生成的eth0的mac地址再生成eth0.1、eth0.2等,生成后的mac地址会保存在/etc/config/network下。 ... [详细]
  • Java太阳系小游戏分析和源码详解
    本文介绍了一个基于Java的太阳系小游戏的分析和源码详解。通过对面向对象的知识的学习和实践,作者实现了太阳系各行星绕太阳转的效果。文章详细介绍了游戏的设计思路和源码结构,包括工具类、常量、图片加载、面板等。通过这个小游戏的制作,读者可以巩固和应用所学的知识,如类的继承、方法的重载与重写、多态和封装等。 ... [详细]
  • 安装mysqlclient失败解决办法
    本文介绍了在MAC系统中,使用django使用mysql数据库报错的解决办法。通过源码安装mysqlclient或将mysql_config添加到系统环境变量中,可以解决安装mysqlclient失败的问题。同时,还介绍了查看mysql安装路径和使配置文件生效的方法。 ... [详细]
  • VScode格式化文档换行或不换行的设置方法
    本文介绍了在VScode中设置格式化文档换行或不换行的方法,包括使用插件和修改settings.json文件的内容。详细步骤为:找到settings.json文件,将其中的代码替换为指定的代码。 ... [详细]
  • 个人学习使用:谨慎参考1Client类importcom.thoughtworks.gauge.Step;importcom.thoughtworks.gauge.T ... [详细]
  • 利用Visual Basic开发SAP接口程序初探的方法与原理
    本文介绍了利用Visual Basic开发SAP接口程序的方法与原理,以及SAP R/3系统的特点和二次开发平台ABAP的使用。通过程序接口自动读取SAP R/3的数据表或视图,在外部进行处理和利用水晶报表等工具生成符合中国人习惯的报表样式。具体介绍了RFC调用的原理和模型,并强调本文主要不讨论SAP R/3函数的开发,而是针对使用SAP的公司的非ABAP开发人员提供了初步的接口程序开发指导。 ... [详细]
  • Java学习笔记之面向对象编程(OOP)
    本文介绍了Java学习笔记中的面向对象编程(OOP)内容,包括OOP的三大特性(封装、继承、多态)和五大原则(单一职责原则、开放封闭原则、里式替换原则、依赖倒置原则)。通过学习OOP,可以提高代码复用性、拓展性和安全性。 ... [详细]
  • 本文介绍了如何使用C#制作Java+Mysql+Tomcat环境安装程序,实现一键式安装。通过将JDK、Mysql、Tomcat三者制作成一个安装包,解决了客户在安装软件时的复杂配置和繁琐问题,便于管理软件版本和系统集成。具体步骤包括配置JDK环境变量和安装Mysql服务,其中使用了MySQL Server 5.5社区版和my.ini文件。安装方法为通过命令行将目录转到mysql的bin目录下,执行mysqld --install MySQL5命令。 ... [详细]
  • CentOS 6.5安装VMware Tools及共享文件夹显示问题解决方法
    本文介绍了在CentOS 6.5上安装VMware Tools及解决共享文件夹显示问题的方法。包括清空CD/DVD使用的ISO镜像文件、创建挂载目录、改变光驱设备的读写权限等步骤。最后给出了拷贝解压VMware Tools的操作。 ... [详细]
  • 本文介绍了如何清除Eclipse中SVN用户的设置。首先需要查看使用的SVN接口,然后根据接口类型找到相应的目录并删除相关文件。最后使用SVN更新或提交来应用更改。 ... [详细]
author-avatar
丝兰郁香
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有