热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

FlinkFLinkPipelineExecutorFactory基于工厂模式的任务提交与SPI机制

篇首语:本文由编程笔记#小编为大家整理,主要介绍了FlinkFLink PipelineExecutorFactory 基于工厂模式的任务提交与SPI机制相关的知识,希望对你有一定的参考价值。

篇首语:本文由编程笔记#小编为大家整理,主要介绍了FlinkFLink PipelineExecutorFactory 基于工厂模式的任务提交与SPI机制相关的知识,希望对你有一定的参考价值。







1.概述

转载: Flink1.10基于工厂模式的任务提交与SPI机制

Flink任务执行模式包含了yarn-session、standalone、per-job、local, 在1.10中又增加k8s的执行模式,那么在任务提交过程中如何根据不同的执行模式进行任务提交呢?

主要通过两个接口来实现:PipelineExecutorFactoryPipelineExecutorPipelineExecutorFactory用于在不同模式下创建不同的PipelineExecutor, 用于提交任务,PipelineExecutorFactory表示的一个创建执行器工厂接口,PipelineExecutor 表示一个执行器接口,正如你所想这里使用的就是经典的工厂设计模式,在任务提交过程中会根据不同的提交模式, 使用不同的PipelineExecutorFactory创建不同的PipelineExecutor。

public interface PipelineExecutorFactory
/**
* Returns the name of the executor that this factory creates.
*/

String getName();
/**
根据configuration判断是否满足当前的factory
*/

boolean isCompatibleWith(final Configuration configuration);
/**
* 获取对应模式下的executor
*/

PipelineExecutor getExecutor(final Configuration configuration);

PipelineExecutorFactory几个实现分别为:


  1. LocalExecutorFactory(local)

  2. RemoteExecutorFactory(standalone)

  3. YarnJobClusterExecutorFactory(per-job)

  4. YarnSessionClusterExecutorFactory(yarn-session)

public interface PipelineExecutor


/**
* 执行任务
*/

CompletableFuture<JobClient> execute(final Pipeline pipeline, final Configuration configuration) throws Exception;

PipelineExecutor对应实现:


  1. RemoteExecutor(standalone)

  2. LocalExecutor(local)

  3. YarnJobClusterExecutor(per-job)

  4. YarnSessionClusterExecutor(yarn-session)


那么具体是如何选择factory呢&#xff1f;由PipelineExecutorServiceLoader接口来完成&#xff0c;其只有一个实现类DefaultExecutorServiceLoader&#xff0c; 透过命名你可能会才想到这里面用到了ServiceLoader&#xff0c;你的猜想是正确的&#xff0c;它就是通过SPI机制去加载flink所提供的不同factory&#xff0c;在META-INF.services 下可以找到其对应的配置&#xff1a;


DefaultExecutorServiceLoader.java部分源码

//SPI机制
private static final ServiceLoader<PipelineExecutorFactory> defaultLoader &#61; ServiceLoader.load(PipelineExecutorFactory.class);


//获取对应的factory
public PipelineExecutorFactory getExecutorFactory(final Configuration configuration)
checkNotNull(configuration);


final List<PipelineExecutorFactory> compatibleFactories &#61; new ArrayList<>();
final Iterator<PipelineExecutorFactory> factories &#61; defaultLoader.iterator();
while (factories.hasNext())
try
final PipelineExecutorFactory factory &#61; factories.next();
//判断标准 根据任务启动配置
if (factory !&#61; null && factory.isCompatibleWith(configuration))
compatibleFactories.add(factory);

catch (Throwable e)
if (e.getCause() instanceof NoClassDefFoundError)
LOG.info("Could not load factory due to missing dependencies.");
else
throw e;



//只能有一个factory符合要求
if (compatibleFactories.size() > 1)
final String configStr &#61;
configuration.toMap().entrySet().stream()
.map(e -> e.getKey() &#43; "&#61;" &#43; e.getValue())
.collect(Collectors.joining("\\n"));


throw new IllegalStateException("Multiple compatible client factories found for:\\n" &#43; configStr &#43; ".");

return compatibleFactories.isEmpty() ? null : compatibleFactories.get(0);

ServiceLoader.load(PipelineExecutorFactory.class) 会从类路径的META-INF.services下找到PipelineExecutorFactory的全路径文件&#xff0c;然后实例化出所有的factory&#xff0c;通过PipelineExecutorFactory.isCompatibleWith找到匹配的factory。

此处的PipelineExecutor体系参考&#xff1a;【Flink】Flink PipelineExecutor 体系







推荐阅读
  • Hadoop 2.6 主要由 HDFS 和 YARN 两大部分组成,其中 YARN 包含了运行在 ResourceManager 的 JVM 中的组件以及在 NodeManager 中运行的部分。本文深入探讨了 Hadoop 2.6 日志文件的解析方法,并详细介绍了 MapReduce 日志管理的最佳实践,旨在帮助用户更好地理解和优化日志处理流程,提高系统运维效率。 ... [详细]
  • MongoDB Aggregates.group() 方法详解与编程实例 ... [详细]
  • 基于Net Core 3.0与Web API的前后端分离开发:Vue.js在前端的应用
    本文介绍了如何使用Net Core 3.0和Web API进行前后端分离开发,并重点探讨了Vue.js在前端的应用。后端采用MySQL数据库和EF Core框架进行数据操作,开发环境为Windows 10和Visual Studio 2019,MySQL服务器版本为8.0.16。文章详细描述了API项目的创建过程、启动步骤以及必要的插件安装,为开发者提供了一套完整的开发指南。 ... [详细]
  • 深入剖析Java中SimpleDateFormat在多线程环境下的潜在风险与解决方案
    深入剖析Java中SimpleDateFormat在多线程环境下的潜在风险与解决方案 ... [详细]
  • 优化后的标题:深入探讨网关安全:将微服务升级为OAuth2资源服务器的最佳实践
    本文深入探讨了如何将微服务升级为OAuth2资源服务器,以订单服务为例,详细介绍了在POM文件中添加 `spring-cloud-starter-oauth2` 依赖,并配置Spring Security以实现对微服务的保护。通过这一过程,不仅增强了系统的安全性,还提高了资源访问的可控性和灵活性。文章还讨论了最佳实践,包括如何配置OAuth2客户端和资源服务器,以及如何处理常见的安全问题和错误。 ... [详细]
  • 本文详细介绍了在 Android 7.1 系统中调整屏幕分辨率和默认音量设置的方法。针对系统默认音量过大的问题,提供了具体的步骤来降低系统、铃声、媒体和闹钟的默认音量,以提升用户体验。此外,还涵盖了如何通过系统设置或使用第三方工具来优化屏幕分辨率,确保设备显示效果更加清晰和流畅。 ... [详细]
  • Netty框架中运用Protobuf实现高效通信协议
    在Netty框架中,通过引入Protobuf来实现高效的通信协议。为了使用Protobuf,需要先准备好环境,包括下载并安装Protobuf的代码生成器`protoc`以及相应的源码包。具体资源可从官方下载页面获取,确保版本兼容性以充分发挥其性能优势。此外,配置好开发环境后,可以通过定义`.proto`文件来自动生成Java类,从而简化数据序列化和反序列化的操作,提高通信效率。 ... [详细]
  • 2017-09-07前端日报精选JavaScriptEventLoop机制详解与Vue.js中实践应用Redux基础与实践如何用js获取虚拟键盘高度?( ... [详细]
  • 工作原理_一文理解 Spark 基础概念及工作原理
    篇首语:本文由编程笔记#小编为大家整理,主要介绍了一文理解Spark基础概念及工作原理相关的知识,希望对你有一定的参考价值。 ... [详细]
  • 1、概述hdfs文件系统主要设计为了存储大文件的文件系统;如果有个TB级别的文件,我们该怎么存储呢?分布式文件系统未出现的时候࿰ ... [详细]
  • 各个组件confspark-env.sh配置spark的环境变量confspark-default.conf配置spark应用默认的配置项和spark-env.sh有重合之处,可在 ... [详细]
  • 两种方式实现Flink异步IO查询Mysql
    如官网所描述的Flink支持两种方式实现异步IO查询外部系统http ... [详细]
  • 本文是Java并发编程系列的开篇之作,将详细解析Java 1.5及以上版本中提供的并发工具。文章假设读者已经具备同步和易失性关键字的基本知识,重点介绍信号量机制的内部工作原理及其在实际开发中的应用。 ... [详细]
  • 在 Android 开发中,`android:exported` 属性用于控制组件(如 Activity、Service、BroadcastReceiver 和 ContentProvider)是否可以被其他应用组件访问或与其交互。若将此属性设为 `true`,则允许外部应用调用或与之交互;反之,若设为 `false`,则仅限于同一应用内的组件进行访问。这一属性对于确保应用的安全性和隐私保护至关重要。 ... [详细]
  • Eclipse中解决JDK源码断点调试失效的问题 ... [详细]
author-avatar
莱茵河泮的独奏_586
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有