热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

4.1.9Flink流处理框架Flink流处理API之Environment

目录1.写在前面2.Environment:执行环境2.1getExecutionEnvironment:得到执行环境(常用方法&#

目录

1.写在前面

2.Environment:执行环境

2.1 getExecutionEnvironment:得到执行环境(常用方法)

2.2 createLocalEnvironment:创建本地执行环境

2.3 createRemoteEnvironment:创建远程执行环境

3.执行模式(Flink 1.12.0)

3.1 流执行模式(STREAMING)

3.2 批执行模式(BATCH)

3.3 自动模式(AUTOMATIC)

4.触发程序执行



1.写在前面

        Flink 有非常灵活的分层 API 设计,其中的核心层就是 DataStream/DataSet API。由于新版 本已经实现了流批一体,DataSet API 将被弃用,官方推荐统一使用 DataStream API 处理流数 据和批数据。我们主要研究dataStream的api,dataSet与之类似。

        DataStream(数据流)本身是 Flink 中一个用来表示数据集合的类(Class),我们编写的 Flink 代码其实就是基于这种数据类型的处理,所以这套核心 API 就以 DataStream 命名。对于 批处理和流处理,我们都可以用这同一套 API 来实现。 DataStream 在用法上有些类似于常规的 Java 集合,但又有所不同。我们在代码中往往并 不关心集合中具体的数据,而只是用 API 定义出一连串的操作来处理它们;这就叫作数据流 的“转换”(transformations)。

        一个 Flink 程序,其实就是对 DataStream 的各种转换。具体来说,代码基本上都由以下几 69 部分构成。source用来读取数据源,transform做转换计算,sink阶段主要用来输出计算数据。在这三个步骤之前,我们需要创建flink的执行环境。

  • 获取执行环境(execution environment)
  • 读取数据源(source)
  • 定义基于数据的转换操作(transformations)
  • 定义计算结果的输出位置(sink)
  • 触发程序执行(execute) 


2.Environment:执行环境

        Flink 程序可以在各种上下文环境中运行:我们可以在本地 JVM 中执行程序,也可以提交 到远程集群上运行。 不同的环境,代码的提交运行的过程会有所不同。这就要求我们在提交作业执行计算时, 首先必须获取当前 Flink 的运行环境,从而建立起与 Flink 框架之间的联系。只有获取了环境 上下文信息,才能将具体的任务调度到不同的 TaskManager 执行。

2.1 getExecutionEnvironment:得到执行环境(常用方法)

        创建一个执行环境,表示当前执行程序的上下文。 如果程序是独立调用的,则 此方法返回本地执行环境;如果从命令行客户端调用程序以提交到集群,则此方法返回此集群的执行环境,也就是说,getExecutionEnvironment 会根据查询运行的方式决定返回什么样的运行环境,是最常用的一种创建执行环境的方式。这个方法会根据当前运行的方式,自行决定该返回什么样的运行环境。

//批处理
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

//流处理
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        如果没有设置并行度,会以 flink-conf.yaml 中的配置为准,默认是 1,当然我们也可以设置并行度。 

2.2 createLocalEnvironment:创建本地执行环境

        返回本地执行环境,需要在调用时指定默认的并行度。如果不传入,则默认并行度就是本地的 CPU 核心数。

//流处理
LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);

2.3 createRemoteEnvironment:创建远程执行环境

        返回集群执行环境,将 Jar 提交到远程服务器。需要在调用时指定 JobManager 的 IP 和端口号,并指定要在集群中运行的 Jar 包。

StreamExecutionEnvironment remoteEnv = StreamExecutionEnvironment.createRemoteEnvironment("host", // JobManager 主机名1234, // JobManager 进程端口号"path/to/jarFile.jar" // 提交给 JobManager 的 JAR 包
);

StreamExecutionEnvironment env =
StreamExecutionEnvironment.createRemoteEnvironment("jobmanage-hostname", 6123, "YOURPATH//WordCount.jar");

        在获取到程序执行环境后,我们还可以对执行环境进行灵活的设置。比如可以全局设置程 序的并行度、禁用算子链,还可以定义程序的时间语义、配置容错机制。

3.执行模式(Flink 1.12.0)

        从 1.12.0 版本起,Flink 实现了 API 上的流批统一。DataStream API 新增了一个重要特 性:可以支持不同的“执行模式”(execution mode),通过简单的设置就可以让一段 Flink 程序 在流处理和批处理之间切换。这样一来,DataSet API 也就没有存在的必要了。

3.1 流执行模式(STREAMING)

        这是 DataStream API 最经典的模式,一般用于需要持续实时处理的无界数据流。默认情 71 况下,程序使用的就是 STREAMING 执行模式。

3.2 批执行模式(BATCH)

        专门用于批处理的执行模式, 这种模式下,Flink 处理作业的方式类似于 MapReduce 框架。 对于不会持续计算的有界数据,我们用这种模式处理会更方便。

        Flink 程序默认是 STREAMING 模式,那么怎么使用batch模式呢?有以下两种方式:

(1)通过命令行配置,在提交作业时,增加 execution.runtime-mode 参数,指定值为 BATCH。

bin/flink run -Dexecution.runtime-mode=BATCH ...

(2) 通过代码配置,推荐,因为扩展性更好,更加灵活

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);

3.3 自动模式(AUTOMATIC)

在这种模式下,将由程序根据输入数据源是否有界,来自动选择执行模式。、

4.触发程序执行

        有了执行环境,我们就可以构建程序的处理流程了:基于环境读取数据源,进而进行各种 转换操作,最后输出结果到外部系统。 需要注意的是,写完输出(sink)操作并不代表程序已经结束。因为当 main()方法被调用时,其实只是定义了作业的每个执行操作,然后添加到数据流图中;这时并没有真正处理数据 ——因为数据可能还没来。

        Flink 是由事件驱动的,只有等到数据到来,才会触发真正的计算, 这也被称为“延迟执行”或“懒执行”(lazy execution)。 所以我们需要显式地调用执行环境的 execute()方法,来触发程序执行。execute()方法将一 直等待作业完成,然后返回一个执行结果(JobExecutionResult)。

env.execute();


推荐阅读
  • 在本地环境中部署了两个不同版本的 Flink 集群,分别为 1.9.1 和 1.9.2。近期在尝试启动 1.9.1 版本的 Flink 任务时,遇到了 TaskExecutor 启动失败的问题。尽管 TaskManager 日志显示正常,但任务仍无法成功启动。经过详细分析,发现该问题是由 Kafka 版本不兼容引起的。通过调整 Kafka 客户端配置并升级相关依赖,最终成功解决了这一故障。 ... [详细]
  • Spring框架的核心组件与架构解析 ... [详细]
  • Android中将独立SO库封装进JAR包并实现SO库的加载与调用
    在Android开发中,将独立的SO库封装进JAR包并实现其加载与调用是一个常见的需求。本文详细介绍了如何将SO库嵌入到JAR包中,并确保在外部应用调用该JAR包时能够正确加载和使用这些SO库。通过这种方式,开发者可以更方便地管理和分发包含原生代码的库文件,提高开发效率和代码复用性。文章还探讨了常见的问题及其解决方案,帮助开发者避免在实际应用中遇到的坑。 ... [详细]
  • 本文介绍了如何在iOS平台上使用GLSL着色器将YV12格式的视频帧数据转换为RGB格式,并展示了转换后的图像效果。通过详细的技术实现步骤和代码示例,读者可以轻松掌握这一过程,适用于需要进行视频处理的应用开发。 ... [详细]
  • 技术日志:深入探讨Spark Streaming与Spark SQL的融合应用
    技术日志:深入探讨Spark Streaming与Spark SQL的融合应用 ... [详细]
  • 深入解析Android 4.4中的Fence机制及其应用
    在Android 4.4中,Fence机制是处理缓冲区交换和同步问题的关键技术。该机制广泛应用于生产者-消费者模式中,确保了不同组件之间高效、安全的数据传输。通过深入解析Fence机制的工作原理和应用场景,本文探讨了其在系统性能优化和资源管理中的重要作用。 ... [详细]
  • 使用 ListView 浏览安卓系统中的回收站文件 ... [详细]
  • 在最近的WWDC17大会上,苹果公司宣布了多项重要更新,其中一项是macOS High Sierra 10.13 Final的正式发布。这一版本经过优化,显著提升了系统的稳定性和响应速度,为用户在任何Mac设备上提供了更加流畅的使用体验。本文将详细介绍如何在Windows系统中利用VMware虚拟机软件安装并运行macOS High Sierra 10.13 Final,帮助用户在非苹果硬件上体验这一先进操作系统。 ... [详细]
  • 本文深入探讨了WinRing0及其源代码实现,详细解析了如何通过获取Ring0权限在应用程序中直接执行需要Ring0权限的CPU指令。此外,文章还提供了实例截图和核心代码示例,帮助读者更好地理解和应用这一技术。 ... [详细]
  • Web开发框架概览:Java与JavaScript技术及框架综述
    Web开发涉及服务器端和客户端的协同工作。在服务器端,Java是一种优秀的编程语言,适用于构建各种功能模块,如通过Servlet实现特定服务。客户端则主要依赖HTML进行内容展示,同时借助JavaScript增强交互性和动态效果。此外,现代Web开发还广泛使用各种框架和库,如Spring Boot、React和Vue.js,以提高开发效率和应用性能。 ... [详细]
  • PHP预处理常量详解:如何定义与使用常量 ... [详细]
  • 使用Maven JAR插件将单个或多个文件及其依赖项合并为一个可引用的JAR包
    本文介绍了如何利用Maven中的maven-assembly-plugin插件将单个或多个Java文件及其依赖项打包成一个可引用的JAR文件。首先,需要创建一个新的Maven项目,并将待打包的Java文件复制到该项目中。通过配置maven-assembly-plugin,可以实现将所有文件及其依赖项合并为一个独立的JAR包,方便在其他项目中引用和使用。此外,该方法还支持自定义装配描述符,以满足不同场景下的需求。 ... [详细]
  • 在多年使用Java 8进行新应用开发和现有应用迁移的过程中,我总结了一些非常实用的技术技巧。虽然我不赞同“最佳实践”这一术语,因为它可能暗示了通用的解决方案,但这些技巧在实际项目中确实能够显著提升开发效率和代码质量。本文将深入解析并探讨这四大高级技巧的具体应用,帮助开发者更好地利用Java 8的强大功能。 ... [详细]
  • 利用ZFS和Gluster实现分布式存储系统的高效迁移与应用
    本文探讨了在Ubuntu 18.04系统中利用ZFS和Gluster文件系统实现分布式存储系统的高效迁移与应用。通过详细的技术分析和实践案例,展示了这两种文件系统在数据迁移、高可用性和性能优化方面的优势,为分布式存储系统的部署和管理提供了宝贵的参考。 ... [详细]
  • Python全局解释器锁(GIL)机制详解
    在Python中,线程是操作系统级别的原生线程。为了确保多线程环境下的内存安全,Python虚拟机引入了全局解释器锁(Global Interpreter Lock,简称GIL)。GIL是一种互斥锁,用于保护对解释器状态的访问,防止多个线程同时执行字节码。尽管GIL有助于简化内存管理,但它也限制了多核处理器上多线程程序的并行性能。本文将深入探讨GIL的工作原理及其对Python多线程编程的影响。 ... [详细]
author-avatar
朱志铭贤雯
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有