Apache Flink是一个同时支持分布式数据流处理和数据批处理的大数据处理系统。 Flink可以表达和执行许多类别的数据处理应用程序,包括实时数据分析,连续数据管道,历史数据处理(批处理)和迭代算法(机器学习,图表分析)以及容错的数据流。
实时流处理系统(Stream Processing System)和历史数据处理(BatchProcessing System)传统上被认为是两个非常不同的类型的应用。他们使用不同的编程模型和API进行编程,并且由不同的系统执行。例如ApacheStorm、Spark Streaming等专有的实时流处理系统与Hadoop、Spark等专用的批处理系统。
然而目前越来越明显的是,当今大量的大规模数据处理应用的数据实际上是随着时间的推移而不断产生的,也就是实时流处理。这些连续的数据流来自如Web日志、应用程序日志,传感器或则一些事务日志记录。然而目前的解决方案不是把实时流数据当作流来处理,而是忽略了数据产生的连续性和及时性。数据记录(通常是人为的)分批次存入静态数据集中(例如,小时,每日或每月),然后以时间不可知的方式进行处理。像“lambda架构”这样的架构模式结合了批处理和流处理系统,以实现多种计算方式:为及时的近似结果提供了一个快速的流处理方法,并为后期的精确结果提供了一个批量离线处理方法。所有这些方法都有很高的延迟(批处理)、高复杂性(连接和协调多个系统,并实现业务逻辑两次),以及任意时间的不准确,因为时间维度没有被应用程序代码显式地处理。
Apache Flink 很好的解决了这个问题。Apache Flink遵循一种模式,将数据流处理作为统一模型,同时支持数据实时流处理和数据批处理。并且它与持久消息队列相结合,允许数据流的准任意重放(如ApacheKafka或Amazon Kinesis)。Apache Flink 将批处理程序看作是有边界的数据集,实时流数据是无边界的数据集。使用者只需要使用一个系统,它既能处理实时流数据,也能处理静态历史数据集。ApacheFlink为了使它的批处理更加具有竞争性和性能优越,Apache Flink使用一个专用的API用于处理静态数据集,并且使用专门的数据结构和算法优化批处理操作,如join或group。
图5.26 Flink的生态圈[19]
图5.26显示了Apache Flink的软件生态圈。Flink中有两个核心API:用于处理有限数据集DataSet API(通常称为批处理)和用于处 理无界数据流的DataStream API(通常称为实时流处理)。Flink的核心是DistributedStreaming Dataflow引擎,它用来执行dataflow程序。Flink的核心运行引擎可以看作是Streaming Dataflow引擎,DataSetAPI和DataStreamAPI都可以通过该引擎创建运行时程序。在核心API的基础上,Flink还绑定了用于特定于领域的库和API,目前是用于机器学习的FlinkML, 用于图处理的Gelly和用于sql的操作的Table API。从部署模式上讲,Flink支持local模式、集群模式(standalone集群或者Yarn集群)、Cloud端部署。
图5.27显示Apache Flink的分布式运行环境架构。Flink分布式程序包含2个主要的进程:JobManager和TaskManager。当程序运行时,不同的进程就会参与其中,包括JobManager、TaskManager和JobClient。
JobManager:也叫做Master进程,负责Job的管理和资源的协调。包括任务调度,监控任务的执行状态,协调任务的执行,检查点管理,失败恢复等。
TaskManager:Task Manager是具体执行tasks的worker节点,执行任务运行在一个JVM中的一个或多个线程中。TaskManager就是运行在不同节点上的JVM进程,这个进程会拥有一定的量的资源。比如内存,CPU,网络,磁盘等等。可以将进程的内存划分到多个slot上去。每个TaskSlot包括JVM进程中的一部分内存。
TaskSlots:TaskSlot是分布式程序真正执行Task的地方。通过调整TaskSlot的数量,用户可以定义子任务是如何相互隔离的。假如每个TaskManager有一个TaskSlot,意味着每个task在单独的JVM中运行(例如,可以在单独的容器中启动)。TaskManager拥有多个TaskSlot意味着更多的子任务共享相同的JVM内存。相同JVM中的任务共享TCP连接(通过多路复用)和心跳消息。它们还可以共享数据集和数据结构,从而减少了每个任务的开销。
JobClinet:JobClient是程序执行的入口。Job Client负责接收用户提交的程序,并将用户提交的程序通过优化器和GraphBuilder转换成Dataflow graph。然后将生成的Data flow提交给Job Manager。一旦执行完成,Job Client将返回给用户执行结果。
图5.27 Apache Flink分布式运行环境[20]
Flink具体执行流程如下。首先,Flink程序提交给JobClient,JobClient再提交到JobManager,JobManager负责资源的协调和Job的执行。一旦资源分配完成,task就会分配到不同的TaskManager,TaskManager会初始化线程去执行task,并根据程序的执行状态向JobManager反馈,执行的状态包括starting、in progress、finished以及canceled和failing等。当Job执行完成,结果会返回给客户端。
Apache Flink提供容错机制来持续恢复数据流应用程序的状态。该机制确保即使在出现故障时,程序的状态最终也会恢复到之前正常运行的状态。程序中的数据流也只会被处理一次,也就是我们常常说的ExactlyOnce。ApacheFlink容错机制不断的生成分布式数据流快照。对于程序产生状态较小的流式传输应用程序,这些快照是非常轻量级的,可以频繁的异步生成,并且对整个系统性能不会产生太大的影响。流应用程序的状态存储一般都放置在分布式文件系统中(比如HDFS)。如果程序失败(由于机器,网络或软件故障),Flink会停止发送分布式流式数据流。然后系统重新启动所有的Operator并将其重置为最新的checkpoints。输入流被重置为state snapshot中记录的 offset。由于实时流处理系统的输入流都是可以回播的输入源,比如Kafka这样的分布式消息发布订阅系统,系统恢复时,可以从最新的checkpoint所在的输入数据偏移量开始发送数据。
1)Barriers
Flink的分布式快照的核心元素是streambarriers。这些barriers被注入到数据流中,作为数据流的一部分和其他数据一同流动(正如InfoSphere的punctuation),barriers不会超过其他数据到达(乱序到达)。一个Barrier将数据流中的数据分割成两个数据集,即进入当前快照的数据和进入下一次快照的数据。每个Barrier带有一个ID,该ID为将处于该Barrier之前的数据归入快照的检查点的ID。Barrier不会打断数据流的流动,所以它是十分轻量级的。来自不同的快照的多个Barrier可以同一时间存在于同一个流中,也就是说,不同的快照可以并行同时发生。
图5.28 Apache Flink Barrier 容错机制[24]
数据流中的Barrier是在数据流的source处被插入到并行数据流图的。快照n的barrier被插入的点(成为Sn),就是在源数据流中快照n能覆盖到的数据的最近位置,如在Apache Kafaka中,这个位置就是上一个数据(record)在分区(partition)中的偏移量(offset)。这个位置Sn将会交给checkpointcoordinator(Flink的JobManager中)。
这些Barrier随数据流流动向下游,当一个中间Operator在其输入流接收到快照n的barrier时,它在其所有的输出流中都发送一个快照n的Barrier。当一个sinkoperator(流DAG的终点)从其输入流接收到n的Barrier,它将快照n通知给checkpointcoordinator。在所有Sink都通知了一个快照后,这个快照就完成了。当快照n完成后,由于数据源中先于Sn的的数据已经通过了整个data flowtopology,我们就可以确定不再需要这些数据了。
2)Apache Flink 语义
Apache Flink基于State的异步快照机制,可以做到Exactly Once的语义。也就是说Apache Flink在失败恢复的时候,也能保证输入流的数据,当且仅当被处理一次。
对齐操作可能会给流应用增加延迟(latency),通常这些额外时延都仅是毫秒级的,但也有在一些异常情况下延迟明显增长的情况。一些应用对所有数据都严格要求极低延迟(几毫秒),在这些应用中,Flink提供一个可以跳过检查点中对齐操作的开关接口。检查点快照依然将在Operator在所有输入流接收到检查点Barrier时生成。当选择跳过对齐操作时,即使Operator在一些输入流中接收到检查点n的Barrier,它仍将继续处理所有输入数据。在这种情况下,Operator在检查点n快照生成之前,也会处理属于快照n+1的数据。在恢复时,这些数据将会重复出现,因为它们既属于检查点n的状态快照,也会在检查点n之后的数据重放(replay)中出现。
由于一些特殊的应用需要极低的延迟(几毫秒),这个时候Apache Flink可以通过禁用对齐操作以提高性能,这种情况下,Apache Flink的一致性语义就是At Least Once。
Flink 数据以record是作为它的处理单元。每个record又是由一个Event产生,每个record再实时流处理系统一般都是和时间相绑定。Record一般会有以下三种时间:
1. 事件时间(EventTime),是指事件创建时的时间。这种类型时间一般会表示为事件的时间戳,再通过事件生成传感器或者事件生成服务等附到事件中。Flink 通过时间戳指定器获取事件的时间戳。
2. 摄入时间(IngestionTime),是指事件在源运算符中进入Flink的数据流的时间。
3. 处理时间(ProcessingTime),是指运算符在执行时间类操作时的本地时间。
1)Flink DataStream
Flink程序的基础构建模块是流与转换。其中Stream是一个中间结果数据,而Transformation是一个操作,它对一个或多个输入Stream进行计算处理,输出一个或多个结果Stream。执行时,Flink程序映射到数据流,由流以及相应的Transformation构成。每一个数据流起始于一个或多个 source,并终止于一个或多个 sink。数据流类似于任意的有向无环图(DAG)。如下图所示:
图5.29 Apache Flink Streaming Dataflow[21]
Flink程序本质上是并行分布的。在执行过程中,一个流包含一个或多个流分区 ,而每一个Transformations操作包含一个或多个Transformations子任务 。操作子任务间彼此独立,以不同的线程执行,甚至有可能运行在不同的机器或容器上。
Flink包括许多Transformations操作。比如Map、FlatMap、Reduce、Window等等。Flink也由许多Sink操作,比如writeAsText、writeAsCsv、print等Sink操作。需要了解的读者可以通过阅读ApacheFlink的官方API。
2)并行数据流
本质上说,Flink 程序是分布式、并发执行的。在程序运行过程中,一个数据流可能会有一个或多个流分区,而一个运算符也可能会有一个或多个运算子任务。每个运算子任务与另外一个运算子任务之间都是相互独立的,他们是在不同的线程中运行的,甚至有可能所运行的机器或者容器都完全不同。运算子任务的数量由运算符的并发数确定。数据流的并发数就是它所生成的运算符的个数。程序中不同的运算符可以有不同等级的并发量。
图5.30 Apache Flink Programing Model[21]
streams可以在两个算子之间以one-to-one模式或者redistributing模式传递数据。
1. one-to-one模式的数据流(例如上图中Source和map()运算符之间的数据流)中元素的分组和顺序会保持不变,也就是说,map()运算符的子任务所看见的元素与 Source 运算符的子任务所生成的元素的顺序完全一致。
2. redistributing模式的数据流(例如上图中map()和keyBy/window运算符之间的数据流,以及keyby/window和Sink运算符之间的数据流)会改变数据流所在的分区。根据所选的变换的不同,每个运算子任务会将数据发送到不同的目标子任务中去。keyBy()(通过对key进行哈希计算来重分区)、boradcast()和rebalance()(随机重分区)就是重分发模式的几个例子。在重分发模式下,元素之间的先后次序在每对发送——接收子任务(例如map()的子任务和keyBy/window的子任务)中是保持不变的。因此,在上图的例子中,尽管在子任务之间每个key的顺序都是确定的,但是由于程序的并发过程引入了不确定性,最终到达 Sink 的元素顺序就不能保证与一开始的元素顺序完全一致。
Transformation |
Transform Operator描叙 |
Map |
取一个元素并生成一个元素 |
FlatMap |
获取一个元素并生成一个或多个元素。 |
Filter |
为每一个元素进行过滤操作 |
KeyBy |
逻辑上将一条流分解成不同的分区,所有具有相同key的记录都被分配到相同的分区。 |
Reduce |
将相同分区的元素进行聚合操作 |
Window |
Windows根据某些特性(例如,在最近5秒内到达的数据)对每个key中的数据进行分组, |
Split |
根据一些标准将流分成两个或更多流。 |
Select |
从一个split流中选择一个分流 |
Fold |
在具有初始值的key数据流上“滚动”折叠。 |
Aggregation |
在key数据流上滚动聚合。 |
WindowAll |
Windows根据某些特征(例如,在最近5秒内到达的数据)对所有流事件进行分组。 |
Union |
两个或更多数据流的联合创建一个包含所有流中所有元素的新流。 |
Window Apply |
将整体函数应用于窗口。 |
Window Reduce |
在窗口中应用函数进行reduce并返回reduce的值。 |
Connect |
“连接”保留其类型的两个数据流。 |
Sink |
Sink Operator 描叙 |
WriteAsText |
将数据输出为文本文件 |
WriteAsCsv |
将数据输出为Csv文件 |
打印标准输出流上每个元素的toString()值。 |
|
WriteToSocket |
根据SerializationSchema将元素写入Socket |
WriteUsingOutputFormat |
自定义文件输出的方法和基类。 支持自定义对象到字节的转换。 |
import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.util.Collector; public class WindowWordCount { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream> dataStream = env .socketTextStream("localhost", 9999) .flatMap(new Splitter()) .keyBy(0) .timeWindow(Time.seconds(5)) .sum(1); dataStream.print(); env.execute("Window WordCount"); } public static class Splitter implements FlatMapFunction > { @Override public void flatMap(String sentence, Collector > out) throws Exception { for (String word: sentence.split(" ")) { out.collect(new Tuple2 (word, 1)); } } } }
计数(counts)、求和(sums)等聚合事件和批处理过程的工作模式完全不同。举个例子,由于数据流在理论上是无限的,因此直接计算数据流中的所有元素的个数基本上是无法实现的。因此,数据流的聚合操作(计数、求和等)都是由窗口(window)限定了范围的,例如“计算前五分钟的元素个数”,“对前100个元素求和”等。
窗口可以通过时间(例如以30秒为单位)或者数据(例如以100个元素为单位)来定义。有多种不同类型的窗口,例如数据不重叠的滚动窗口(tumbling window)、数据重叠的滑动窗口(slidingwindow),以及以非活动状态为间隔的会话窗口(session window)。
图5.31 Apache Flink window mechanism [21]
Flink将批处理程序作为流处理程序的特殊情况来执行,只是这种特殊的“流”是有界的,然而批处理程序这种特殊的“流”是无边界的。 DataSet在Flink系统内部被执行时视为DataFlow。上述适用于流处理程序的概念同样适用于批处理程序。Flink对于处理批处理程序,它也同样通过构建上述介绍的Dataflow数据模型。只不过Flink将批处理程序中的静态数据集视为有边界的数据集,对于实时流处理程序,Flink将实时流数据视为无边界的数据集。对于批处理程序,ApacheFlink使用一个专用的API,对于处理静态数据集,使用专门的数据结构和算法来进行批处理操作,如join或group,并且使用专用的调度策略进行优化。