热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

开发笔记:如何快速低成本开发功能强大的分布式任务调度系统

篇首语:本文由编程笔记#小编为大家整理,主要介绍了如何快速低成本开发功能强大的分布式任务调度系统相关的知识,希望对你有一定的参考价值。

篇首语:本文由编程笔记#小编为大家整理,主要介绍了如何快速低成本开发功能强大的分布式任务调度系统相关的知识,希望对你有一定的参考价值。







文章首发微信公众号:码上观世界,本文字数9000+,推荐收藏细品。


01


分布式任务调度系统指的是什么


本文将要讲的分布式任务调度系统指的是分布式任务编排系统,主要关注任务的编排和调度等功能,解决的是单机单任务定时触发(如Linux的crontab)无法满足多任务管理、依赖调度、任务运行监控和分布式执行等问题。



尽管目前流行的流处理引擎Flink、通用大数据计算框架Spark和离线计算框架MapReduce、Tez等在底层实现上具备完整的任务编排、调度、执行、资源管理和监控等功能,从一定意义讲也可以认为是分布式调度系统,但从数据处理的来源角度讲,他们的源头是一样的。这里的分布式任务只是一个整体应用系统的内部拆分,这里姑且叫它们分布式计算引擎。


本文的分布式任务编排系统的任务是完全异构的,处理的数据、任务类型和业务功能可能完全不一样,任务的编排和属性配置如优先级等也基本需要用户指定,相比上述系统,这里的任务业务属性更强,用户的参与度也最高。另一类相似系统是业务流程管理系统(BPM),如Activiti等,它的突出差异不仅在任务编排上需要用户指定,而且在任务执行的每个环节也需要用户参与,常用于企业内部工作审批流等场景。



这三类系统的共同点都是将任务编排成有向无环图(DAG),执行完DAG的耗时程度依次为分布式计算引擎如Spark等(不考虑实时在线的流处理系统)、分布式任务编排系统、业务流程管理系统。业务流程管理系统的一个流程走完可能需要数天,数周不等。


02


分布式任务编排系统有哪些功能


同分布式计算引擎一样,分布式任务编排系统主要面向大数据应用场景,这样的系统很多,比较知名的系统有Apache Oozie、Azkaban、Apache  Airflow和最近刚进入Apache孵化器的DolphinScheduler 。在了解分布式任务编排系统需要具备什么样的功能,我们以DolphinScheduler为例,从其官方提供的系统架构图来反向推导:



架构图最下层是该系统支持的任务类型,如Flink、Shell、SQL、Python、MR、Spark等(图例中居然有错别字,估计是前端排版小编画的,我们当作没看见),这涉及到脚本类型管理、脚本的上传、下载、版本管理等,对于SQL任务还涉及到UDF函数管理等。


其上是调度系统的Scheduler、Worker,以及任务持久化DB存储系统和分布式协调系统Zookeeper。Scheduler负责任务的调度分发,按照一定调度规则确保任务不重不漏,Worker负责任务的执行。机器的负载均衡和故障转移依赖具体的实现策略不同,可以在Scheduler也可以在Worker端实现,任务状态等信息通过DB持久化和传递,Zookeeper通过分布式锁实现分布式任务分配、Scheduler中多Server的选举以及其他消息监听等。这里涉及到的功能模块主要包括集群管理、调度管理和任务运行管理等。其中集群管理主要涉及到集群的高可用、虚拟化和服务器的动态扩缩容等。调度管理涉及到负载均衡、调度机制、资源管理和血缘管理等。运行管理涉及到任务运行时的异常处理、补数据、日志管理、告警管理和统计报表等。


最上层是用户界面UI,用于任务的维护、运行状态监控及其他报表可视化等,API是UI或者其他外部系统跟调度系统交互的统一入口。这涉及到任务管理和权限管理等,其中任务管理包括任务的增删改查操作、任务编排、任务上下线,有些任务类型,比如数据同步涉及到数据源管理等。权限管理涉及到用户创建等用户信息的维护,角色管理,有些调度系统根据使用场景还涉及到租户管理等。


除此之外,调度系统为了提高易用性和拥抱技术生态,还需要做大量的工作,比如跟云厂商合作,提供上云工具,数据导入导出工具、与开源大数据技术融合,比如Hive、Spark、Flink等。特别的,面向大数据使用场景,数据开发的标准和规范、数据治理管理等等也是很重要的功能,完全依靠用户自觉很难做到。归纳这些功能如下:




03


分布式任务编排系统的普遍性和特殊性


上面列举的功能不是调度系统全部必备的,那么哪些功能是必须的呢?或者说,调度系统具备什么样的功能才能跟其他调度系统区分开来,每一种完整意义上的调度系统都具有任务管理、运行管理、调度管理、集群管理等功能,其特殊性表现在哪里呢?这就好比问,社会主义制度跟资本主义制度的区别在哪?是是否采用市场经济制度?还是是否采用现代科学技术?虽然不同制度的市场经济形式和科学技术发展路线不同,但显然还不能用来区分两种社会制度,因为这不是本质的区别,生产关系才是!同样,对不同人种,如何区分?靠秉性、肤色、语言、信仰也许可以,但这些只是表象特征,不是普遍性的东西。马克思说,“人的本质是社会关系的总和”,社会活动的结果主要以记忆的形式存在于人脑,人体的其他部位可以没有或者替换,但人脑不可以。如果未来可以,那就不再是这个人而是另一个人了,所以有根据暗号识别人的,虽然不精确,但至少能判断是一伙儿的,因为有相同的知识背景。对不同的车型,虽然人们更直接关注舒适性、操控性、性价比、空间、油耗等,但对一款车真正核心的是发动机,变速箱,底盘等核心部件。


前面提到的Flink和Spark,它们都是计算框架,都可以用于实时流处理,那么它们的独特性区别在哪呢?是流模型、状态管理和容错机制,比如Spark基于微批抽象流模型,而Flink基于事件机制抽象流模型,微批机制是事件机制的特殊形式,因为抽象方式的区别,导致在时间延迟方面的巨大差异,而状态管理和容错机制的不同实现方式对实时流的性能影响和服务级别的影响很大。回过头来看,调度系统的普遍性,其核心是什么?本文认为是调度管理,其他功能也不是不重要,而是,不是少了这部分功能,系统就无法运转,它们可以通过其他方式来替代。这是我们站在系统客观认识的角度进行的价值判断,实际上,价值取决于主体认识。比如一辆车核心价值是什么,不同的车企判断也不一样,整车企业也许认为车辆外观设计与空间布局更重要,其他都可以靠采购实现。零部件厂商也许决定发动机是核心,他们可以专注于发动机的研发和生产。对于调度系统,大厂也许会关注大规模集群的调度性能和可靠性,小厂也许更关注系统能否正常跑起来,至于集群是否必要存在都值得讨论,更别提研发了。实际上对于大小厂几乎都是采用拿来主义,这是开源软件市场的生态贡献。下面选取几个主要功能点详细讨论之。


04


调度功能实现原理


从任务元数据存储数据结构来看,主要有以下几种:


基于队列:待调度的任务按照一定顺序,比如按照优先级、运行时间排序,排在队列头部的任务先获得调度机会。每个任务的运行时间可以事先根据依赖关系提前计算好,可以减少调度时的条件检查,提高调度效率。



基于树结构:树形结构反映了任务的依赖关系,调度顺序按照树的层次遍历方式,从树根到树叶的过程,因为底层任务依赖上层任务,所以调度顺序比较符合依赖逻辑。相比队列的方式,不必对任务进行排序。采用这种存储方式的调度系统有Azkaban等。



基于多级反馈队列(Multilevel Feedback Queue,MFQ):该方式借鉴操作系统中的进程调度算法,通过动态调整任务优先级进入不同的队列,可以解决任务长时间得不到调度导致的“饿死”问题。队列可以简单把任务分为几个等级,比如高优先级、中优先级和低优先级几个等级,每个等级分别对应一个队列,每个队列内部可以采用队列或者树形结构,分别采用不同的调度策略,高优先级队列的任务开抢占低优先级队列中的任务,主流调度系统或多或少借鉴了这种思想,如Airflow,DolphinScheduler,参照下面的示例图:



按照任务触发方式划分主要有轮询方式和事件触发方式,轮询方式使用比较广泛,比如Azkaban通过循环遍历任务元数据树,检查满足执行条件的任务节点,Airflow通过定时扫描元数据库,获取满足执行条件的任务列表。相比轮询方式,基于事件的机制更高效,减少CPU空转的机会,比如Oozie每个任务调度结束,会通过回调通知OozieServer,从而决定下一步的当作。DolphinScheduler通过Quartz管理所有的调度任务,但Quartz是基于定时事件触发。尽管看起来事件机制相比轮询高效,但是存在不可靠的问题,比如事件异常导致缺失调度,因此实践中会有补救措施,比如Oozie会结合使用轮询和事件通知机制。


05


资源管理实现原理


进行资源管理,需要明确知道各Worker节点的资源情况,可以参照YARN的方式,通过NodeManager定期将各Worker节点的资源信息汇报到ResourceManager,资源信息通常以二元对表示:



简单介绍YARN里面的几种组件:


  • ResourceManager(RM):负责对各NM上的资源进行统一管理和调度,将AM分配空闲的Container运行并监控其运行状态。对AM申请的资源请求分配相应的空闲Container。主要由两个组件构成:调度器(Scheduler)和应用程序管理器(Applications Manager)。

  • 调度器(Scheduler):调度器根据容量、队列等限制条件(如每个队列分配一定的资源,最多执行一定数量的作业等),将系统中的资源分配给各个正在运行的应用程序。调度器仅根据各个应用程序的资源需求进行资源分配,而资源分配单位是Container,从而限定每个任务使用的资源量。Scheduler不负责监控或者跟踪应用程序的状态,也不负责任务因为各种原因而需要的重启(由ApplicationMaster负责)。总之,调度器根据应用程序的资源要求,以及集群机器的资源情况,为用程序分配封装在Container中的资源。调度器是可插拔的,例如CapacityScheduler、FairScheduler。(PS:在实际应用中,只需要简单配置即可)

  • 应用程序管理器(Application Manager):应用程序管理器负责管理整个系统中所有应用程序,包括应用程序提交、与调度器协商资源以启动AM、监控AM运行状态并在失败时重新启动等,跟踪分给的Container的进度、状态也是其职责。ApplicationMaster是应用框架,它负责向ResourceManager协调资源,并且与NodeManager协同工作完成Task的执行和监控。MapReduce就是原生支持的一种框架,可以在YARN上运行Mapreduce作业。有很多分布式应用都开发了对应的应用程序框架,用于在YARN上运行任务,例如Spark,Storm等。如果需要,我们也可以自己写一个符合规范的YARN application。

  • NodeManager(NM):NM是每个节点上的资源和任务管理器。它会定时地向RM汇报本节点上的资源使用情况和各个Container的运行状态;同时会接收并处理来自AM的Container 启动/停止等请求。ApplicationMaster(AM):用户提交的应用程序均包含一个AM,负责应用的监控,跟踪应用执行状态,重启失败任务等。

  • Container:是YARN中的资源抽象,它封装了某个节点上的多维度资源,如内存、CPU、磁盘、网络等,当AM向RM申请资源时,RM为AM返回的资源便是用Container 表示的。YARN会为每个任务分配一个Container且该任务只能使用该Container中描述的资源。


06


负载均衡实现原理


在调度实现原理介绍中,如果采用队列的先进先出(FIFO Scheduler)调度策略,加入某队头的任务需要较大资源,而当前集群无法满足,就会导致后续任务的阻塞,小任务反而无法运行,为了解决该问题,可以使用多队列的方式(Capacity Scheduler),并且限制每个队列最小和最大可用资源大小。多队列方式无法最大化利用集群资源,比如当较大任务进入集群,虽然当前集群其他队列为空,也无法利用其资源,于是需要一种公平调度策略(Fair Scheduler),与Capacity Scheduler相比,可做到资源共享和抢占。当其他队列为空,可用使用其他队列的资源,当其他队列提交了任务,如果当前任务一定时间内没有归还资源,可以强制回收占用的资源。YARN提供了上述三种资源调度方式,但是Mesos的调度算法应用更典型,这里以Mesos为例介绍,Mesos采用2级调度方式:应用框架将申请到的资源在框架内部二次分配,如下图:



Mesos的资源分配算法解决的问题是,将当前空闲资源分配给哪些框架以及分配多少给框架,其最重要的资源分配有两种算法:


1


最大最小公平算法


最大最小公平算法(Max-  minFairness,MMF)是一种在兼顾公平的前提下,尽可能让更多人满意的资源分配算法。为什么这么说呢?因为这个算法有 3 个主要原则:


  • 按照用户对资源需求量递增的顺序进行空闲资源分配;

  • 不存在用户得到的资源超过自己需求的情况;

  • 对于分配的资源不满足需求的用户,所获得的资源是相等的。


在执行资源分配时,最大最小公平算法按照上述 3 条原则进行多次迭代,每次迭代中资源均平均分配,如果还有剩余资源,就进入下一次迭代,一直到所有用户资源得到满足或集群资源分配完毕,迭代结束。


假设,现在有总量为 100 的空闲资源,有 4 个用户 A、B、C、D 对该资源的需求量分别为(35,10,25,45),分配流程如下所示:


1. 按照用户对资源的需求量升序排列,则 4 个用户的需求量为(B:10,C:25,A:35,D:45)。


2. 平均分配空闲资源。资源空闲总量 100,除以用户数 4,则平均空闲资源量为 25;按照第一步中需求量分配后,用户资源需求量为(0,0,10,20),且用户 B 由于资源需求量小于 25,因此会剩余资源。此时空闲资源量为 15,资源需求人数为 2。


3. 重复第二步,平均分配资源,15/2=7.5,即分别为用户 A 和 D 分配 7.5 份资源,此时用户资源需求量为(0,0,2.5,12.5),空闲资源量为 0,资源需求人数为 2。


4. 所有资源已分配完,算法终止。至此,对于需求量为(10,25,35,45)的用户们来说,分配到的资源是(10,25,32.5,32.5)。这个算法的另外一个结束条件是,资源分配满足了所有用户的资源需求,即当没有用户有资源需求时,算法也会终止。


2


主导资源公平算法


最大最小公平算法采用了绝对公平的方式分配资源,会导致大量的资源浪费,比如用户需求量为 35 和 45 的用户 A 和用户 D,均分配了 32.5 的空闲资源,但由于资源不满足需求,这两个用户均无法使用。


而主导资源公平算法(Dominant Resource Fairness,DRF)考虑用户公平性的前提下,还考虑了用户对不同资源类型的需求,以尽可能地合理分配资源。也就是说,同样的资源量,主导资源公平算法可以尽可能地满足更多的用户。


在 Mesos 中,框架对资源的需求往往包括对 CPU、内存等多种类型资源的需求。针对多种资源的需求,主导资源公平算法首先计算已经分配给用户的每一种资源的占用率(Resource Share),比如已经分配的 CPU 占总资源量的多少,已经分配的内存占总资源量的多少。所有资源占用率中的最大值称作该用户的主导资源占用率,而主导资源占用率对应的资源就是用户的主导资源。


假设系统中的资源共包括 18 个 CPU 和 36 GB 内存&#xff0c;有两个 Framework&#xff08;Framework A 和 Framework B&#xff09;分别运行了两种任务&#xff0c;假设 Framework A 运行内存密集型任务&#xff0c;Framework B 运行 CPU 密集型任务&#xff0c;且每个任务所需要的资源量是一致的&#xff0c;分别是 <2 CPU, 8 GB> 和 <6 CPU, 2 GB>。


1&#xff0c;计算资源分配量。


假设 x 和 y 分别是 Framework A 和 Framework B 分配的任务数&#xff0c;那么 Framework A 消耗的资源为{2x CPU&#xff0c;8x GB}&#xff0c;Framework B 消耗的资源数为{6y CPU&#xff0c;2y GB}&#xff0c;分配给两个 Framework 的总资源量为&#xff08;2x&#43;6y&#xff09;个 CPU 和&#xff08;8x&#43;2y&#xff09;GB 内存。


2&#xff0c;确定主导资源。


对于 Framework A 来说&#xff0c;每个任务要消耗总 CPU 资源的 2/18&#xff0c;总内存资源的 8/36&#xff0c;所以 Framework A 的主导资源为内存&#xff1b;对于 Framework B 来说&#xff0c;每个任务要消耗总 CPU 资源的 6/18 和总内存资源的 2/36&#xff0c;因而 Framework B 的主导资源为 CPU。


3&#xff0c;DRF 算法的核心


是平衡所有用户的主导资源占用率&#xff0c;尽可能试图最大化所有用户中最小的主导资源占用率。通过求解下列公式&#xff0c;可以计算出 Framework A 和 Framework B 分配的任务数&#xff0c;并且要在满足公式的条件下&#xff0c;使得 x 和 y 越大越好。



通过求解可以得出&#xff1a;x&#61;3&#xff0c;即 Framework A 可以运行 3 个任务&#xff1b;y&#61;2&#xff0c;即 Framework B 可以运行 2 个任务。这样分配的话&#xff0c;每个 Framework 获取了相同比例的主导资源&#xff0c;即&#xff1a;A 获取了 2/3 的内存&#xff0c;B 获取了 2/3 的 CPU&#xff0c;从而在主导资源上体现了调度算法的公平性。


在实际任务分配过程中&#xff0c;主导资源率是根据已经分配给 Framework 的资源&#xff0c;占集群中总资源量的多少进行计算的&#xff0c;并且在每次分配过程中&#xff0c;会选择主导资源最小的 Framework 进行分配&#xff0c;也就是试图最大化所有用户中最小的主导资源占用率。


07


集群高可用


调度系统的高可用指的是Scheduler和Worker的稳定性和可靠性&#xff0c;因为部署架构上&#xff0c;Scheduler和Worker通常是1对多或者多对多的关系&#xff0c;因此要求这两各组件有一定的扩展性&#xff0c;最好能做到动态伸缩。比如Airflow的架构中&#xff0c;Scheduler和Worker解耦&#xff0c;独立部署&#xff0c;通过数据库交互信息。图中Scheduler只有单个节点&#xff0c;实际上&#xff0c;Scheduler和Worker相互独立&#xff0c;互不影响&#xff0c;Scheduler重启对worker影响也不大&#xff0c;所以通过Linux服务器上的服务进程管理其状态即可。而在DolphinScheduler 中&#xff0c;通过ZooKeeper做了多Scheduler选主机制。



08


分布式任务调度系统快速开发案例


本文以Airflow为例介绍如何开发分布式调度系统&#xff0c;为什么要选用Airflow&#xff0c;理由有以下几点&#xff1a;


1. Airflow具备上述列举的完整调度系统功能&#xff0c;如任务管理、集群管理、调度管理、运行管理、脚本管理和权限管理等&#xff0c;而且这些功能模块不同于现有各调度系统的实现方式&#xff0c;在易用性、可靠性、扩展性、生态兼容性等方面都完胜现有类似系统。


2. 在任务管理上&#xff0c;Airflow提供基于git代码仓库拓展用户脚本程序&#xff0c;提供基于Python API的方式来管理任务和任务编排&#xff0c;对于熟悉Python的用户来说&#xff0c;绝对是一大福音。


3. Airflow基于分布式架构&#xff0c;解耦调度器、工作机器、Web UI等系统组件&#xff0c;这些组件本身相互独立&#xff0c;且通过Linux服务器本身的服务管理&#xff0c;能够做到自动失败重启&#xff0c;进一步提高系统的可用性。另外&#xff0c;对于Scheduler HA业界也有相关的三方组件&#xff0c;对于系统资源的监控&#xff0c;业界也有Flower组件可用。


4. Airflow 各组件不仅可以自由扩展&#xff0c;而且都还支持虚拟化&#xff0c;基于K8s架构能够实现自动扩缩容。更重要的是Airflow提供了一种调度框架&#xff0c;支持不同的资源管理集群&#xff0c;如Celery、Mesos和K8s 。在任务分发消息处理上&#xff0c;支持Redis&#xff0c;RabbitMQ、mysql等&#xff0c;可满足不同场景的需要。


5. 在运行管理上&#xff0c;支持任务日志的本地存储和S3远程存储&#xff0c;Airflow Web UI提供丰富的异常处理、诊断功能和集群管理功能&#xff0c;如 DAG管理、任务重跑、补数据、任务历史记录查询、日志查看、数据源管理和队列管理等。


6. 完善的生态系统&#xff0c;Airflow 跟云厂商和大数据生态技术紧密结合&#xff0c;将调度系统使用的常见功能&#xff0c;如数据迁移、数据查询、数据处理和云服务访问等提供了完善便捷的工具&#xff0c;支持主流的Amazon、Google、Azure等一系列产品。在工具研发上&#xff0c;接纳了广泛的开源贡献。


7. Airflow系统内置了大量的算子Operator&#xff0c;Operator是基本的任务调度单位&#xff0c;比如Python脚本类型任务PythonOperator、Mysql迁移到Hive的任务MySqlToHiveTransfer&#xff0c;操作Hive的任务HiveOperator&#xff0c;运行Docker命令的任务DockerOperator等等&#xff0c;应有尽有&#xff0c;除此之外&#xff0c;还有大量三方贡献的任务类型。


上述只是列举的几点理由&#xff0c;还有更多&#xff0c;不再赘述&#xff0c;可以说&#xff0c;在Airflow中没有做不到的事情&#xff0c;只有想不到的事情。Airflow功能既然如此强大&#xff0c;那缺点呢&#xff1f;一个直接的就是缺少任务维护的界面操作&#xff0c;Airflow Web UI提供主要是“读”类的操作&#xff0c;“写”类的操作不多&#xff0c;“写”类操作主要通过编程实现&#xff0c;于是对非技术用户就有一定门槛。毕竟Airflow也有自己的受众&#xff0c;UI收到用户操作习惯和审美标准的影响&#xff0c;而对编程的方式&#xff0c;大家的认同感还是普遍的&#xff0c;所以Airflow不提供这类UI也算是明智的&#xff0c;完全可以交给用户自己来开发&#xff0c;我们来看看如何开发自己的Web UI界面&#xff0c;提供基本的任务创建、修改、依赖管理和调度运行等功能。


1


任务管理


可以独立管理每种任务类型&#xff0c;比如通过下图展示任务列表&#xff0c;来进行任务详情的查看&#xff0c;编辑&#xff0c;删除、历史执行记录查看等。



接下来&#xff0c;下面图中以数据迁移类型为例来看任务创建和编辑&#xff0c;流程走完后一条完整的任务记录被保存下来&#xff0c;用于后续的任务编排。



再比如下图&#xff0c;可以创建数据分析类的任务&#xff0c;支持纯界面操作和自定义SQL&#xff1a;



2


任务编排


任务编排将不同类型的任务按照一定的处理逻辑形成拓扑依赖结构&#xff0c;下图中左边可以拖拉不同类型的节点&#xff08;包括开始和结束的虚拟节点&#xff09;&#xff0c;拖到画板上形成DAG依赖关系。任务类型支持常见的数据迁移、数据分析等。



所有已经创建的编排都可以在列表中查询到&#xff0c;然后可以进行重新修改、查看DAG的调度记录等&#xff0c;然后看看任务编排如何存储&#xff0c;比如下图中&#xff0c;以一个实际任务节点为例&#xff0c;DAG的编排信息通过JSON格式持久化&#xff1a;



{


    "edges":[


        {


            "id":"d907f002",


            "index":1,


            "source":"9945c6b8",


            "target":"534e8af8"


        },


        {


            "id":"b244abfa",


            "index":3,


            "source":"9945c6b8",


            "target":"e088c942"


        },


        {


            "id":"d5c431e7",


            "index":5,


            "source":"534e8af8",


            "target":"e8a13959"


        },


        {


            "id":"6e70ac20",


            "index":6,


            "source":"e088c942",


            "target":"e8a13959"


        },


        {


            "id":"eed65d70",


            "index":9,


            "source":"e8a13959",


            "target":"ba51555b"


        }


    ],


    "nodes":[


        {


            "id":"9945c6b8",


            "type":"node",


            "index":0,


            "label":"开始"


        },


        {


            "id":"e088c942",


            "type":"node",


            "index":2,


            "label":"ods_rectify_notice_df",


            "taskId":75,


            "taskType":0,


            "projectId":6


        },


        {


            "id":"e8a13959",


            "type":"node",


            "index":4,


            "label":"dwd_project_success_df",


            "taskId":783,


            "taskType":2,


            "projectId":6


        },


        {


            "id":"534e8af8",


            "type":"node",


            "index":7,


            "label":"ods_joint_acceptance_df",


            "taskId":68,


            "taskType":0,


            "projectId":6


        },


        {


            "id":"ba51555b",


            "type":"node",


            "index":8,


            "label":"结束"


        }


    ]


}


持久化信息中&#xff0c;主要保存了节点和边&#xff0c;其中节点信息主要包括任务id、类型、展示标签等&#xff0c;其他关于展示用到的颜色、位置等信息已略去。边信息表示了节点的依赖关系&#xff0c;每条依赖关系都用一条链接的原节点和目标节点的边表示。


3


动态创建DAG


任务的拓扑依赖关系中保存了重要的节点和边信息&#xff0c;通过节点和表能够很自然地动态映射到Airflow DAG&#xff0c;关键逻辑见下图&#xff1a;



其中create_operator将UI中的任务类型转换成Airflow中的Operator&#xff0c;这就用到了Airflow的自定义Operator功能&#xff0c;结合Airflow中内置的Operator和Hook&#xff0c;实现起来再容易不过了。


4


任务重复调度问题解决



比如上图的任务依赖关系中任务t5依赖t1&#xff0c;t2&#xff0c;t3&#xff0c;任务t6依赖任务t3&#xff0c;t4&#xff0c;因为任务t3被共同依赖&#xff0c;就存在重复执行的问题&#xff0c;如果我们把这整个依赖图放在一个大整体的DAG中&#xff0c;如图蓝色的DAG3中&#xff0c;通过可视化&#xff0c;我们很容易看出哪些任务重复&#xff0c;但实际上&#xff0c;可能没有那么容易&#xff0c;原因是调度系统的任务通常面向部门或者整个公司&#xff0c;任务量成千上万&#xff0c;不可能在一个DAG中展示&#xff0c;而且不利于团队协作。


即使在展示时候分区分权限展示&#xff0c;但面向整体任务调度&#xff0c;下层任务获得调度的机会会更低&#xff0c;增大了任务的延迟度。一个可行的方式是按照工作小组或者业务逻辑组划分不同的子DAG&#xff0c;如图红色的DAG1和绿色的DAG2&#xff0c;每个子DAG有独立的调度属性&#xff0c;如调度时间&#xff0c;周期、告警通知人等。尽管划分子DAG&#xff0c;仍然没有解决重复任务的问题&#xff0c;但这个问题不是用户关心的&#xff0c;而是调度系统应该关心的问题&#xff0c;因此需要从系统层面来发现和解决&#xff0c;怎么做呢&#xff1f;


假设上述所有任务都是同一周期&#xff0c;比如都是每日调度一次&#xff0c;那么每次调度都会生成如下调度记录:


&#xff08;dag_id,task_id,execution_date,start_time,end_time,status,retry_num&#xff09;

将上面的存储表字段dag_id,task_id,execution_date设为唯一键&#xff0c;任务调度前&#xff0c;尝试向表插入该调度记录&#xff0c;如果插入成功&#xff0c;说明没有重复。反之&#xff0c;如果插入失败&#xff0c;则存在几种情况&#xff1a;


1. 如果当前调度任务状态为成功&#xff0c;可能是其他DAG中已经执行过&#xff0c;此时可以直接忽略。如果是外部人工触发DAG&#xff0c;因为人工触发DAG&#xff0c;其execution_date跟自动调度不一样&#xff0c;会产生唯一键冲突&#xff0c;此时需要重置任务状态&#xff0c;重新调度。


2. 如果当前调度任务状态为失败&#xff0c;需要验证失败重试次数是否超过一定次数&#xff0c;如果超过&#xff0c;跳过本次调度&#xff0c;否则继续调度&#xff0c;同时更新调度次数。


另外&#xff0c;在任务开发阶段&#xff0c;可能需要频繁手动触发任务&#xff0c;这个时候就不应该走插入记录开始的处理逻辑&#xff0c;可以直接忽略&#xff0c;允许无限次重跑记录。完整的处理逻辑如下图&#xff1a;



09


小结


本文系统介绍了分布式任务编排系统应该具备的功能以及根据价值取舍所看重的功能点&#xff0c;并且跟其他类似系统做了区分&#xff0c;然后结合案例讲述了如何低成本的开发分布式调度系统&#xff0c;Airflow以其强大的功能&#xff0c;本文没有全面挖掘&#xff0c;这里以一个角度探讨了如何在其基础上开发适合自己需要的UI以及跟现有其他调度系统并存。通过案例可知&#xff0c;开发者只需要开发自己的UI和相关配置信息的持久化&#xff0c;通过一个适配器即可对接Airflow&#xff0c;这样就可以做到一套UI&#xff0c;多种调度内核&#xff0c;还可以使用现有系统比如DolphinScheduler 的UI&#xff0c;Airflow的调度内核。因为Airflow作为调度框架&#xff0c;可以对接不同类型的集群&#xff0c;这样一来&#xff0c;Airflow就成了事实上的万能胶调度框架&#xff01;


10


参考资料


http://hadoop.apache.org/docs/r2.6.0/hadoop-yarn/hadoop-yarn-site/YARN.html


https://oozie.apache.org/docs/5.2.1/DG_Overview.html


https://dolphinscheduler.apache.org/zh-cn/docs/latest/user_doc/architecture-design.html


https://mp.weixin.qq.com/s/K-CQJaorfEkCJSchyTQ6gg


公众号后台回复“低成本调度系统开发”&#xff0c;可获取文章pdf版本和后续更新。





推荐阅读
author-avatar
mobiledu2502924293
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有