热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Flink:时间和窗口

时间语义在事件发生之后,生成的数据被收集起来,首先进入分布式消息队列,然后被Flink系统中的Source算子读取,进而向下游的转换算子(窗口算子)传递,最终由窗口算子进行计算处理

时间语义

时间语义

在事件发生之后,生成的数据被收集起来,首先进入分布式消息队列,然后被 Flink 系统中的 Source 算子读取,进而向下游的转换算子(窗口算子)传递,最终由窗口算子进行计算处理。



  • 处理时间(Processing Time):执行处理操作的机器系统时间,是最简单的时间语义。

  • 事件时间(Event Time):数据生成时间,“时间戳”(Timestamp)。

在事件时间语义下,我们对于时间的衡量是依赖于数据本身。由于分布式系统中网络传输延迟的不确定性,实际应用中数据流是乱序的。在这种情況下,就不能简单地把数据自带的时间戳当作时钟了,而是需要用另外的标志来表示事件时间的进展,在 Flink 中把它叫作事件时间的“水位线”(Watermarks)。在实际中,我们更关系事件时间。


水位线

水位线可以看作一条特殊的数据记录,它是插入到数据流中的一个标记点, 主要内容就是一个时间戳,用来指示当前的事件时间。

而它插入流中的位置,就应该是在某个数据到来之后;这样就可以从这个数据中提取时间戳,作为当前水位线的时间戳了。



  • 水位线是插入到数据流中的一个标记

  • 水位线的主要内容是时间戳

  • 水位线的时间戳必须是递增的

  • 水位线可以设置延迟

  • 水位线 Watermarks(t) 表示当前流中事件时间已经达到 t,表示 t 之前的数据都已经到达了。


有序流中的水位线

在理想状态下,数据处理的过程会保持原先的顺序,遵守先来后到的原则。这样的话我们从每个数据中提取时间戳可以保证总是从小到大的,水位线也是不断增长的,事件时钟也不断向前推进。

在实际应用中,如果当前数据量非常大,可能会有很多数据的时间戳是相同的,这时候每来一条数据就提取时间戳,插入水位线,就会做很多无用功。所以为了提高效率,一般会每隔一段时间生成一个水位线,这个水位线的时间戳,就是当前最新数据的时间戳。


乱序流中的水位线

“乱序”(out-of-order)是指数据的先后顺序不一致,主要是基于数据的产 生时间而言的。

插入新的水位线时,要先判断一下当前时间戳是否比之前的大,否则就不再生成新的水位线。

如果要考虑到大量数据同时到来的处理效率,我们可以周期性地生成水位线。这时只需要保存一下之前所有数据中的最大时间戳,需要插入水位线时,就直接以它作为时间戳生成新的水位线。

但是周期性会带来一个问题,我们无法正确处理“迟到”的数据。为了让窗口能够正确收集到迟到的数据,我们也可以等上一段时间。也就是用当前己有数据的最大时间戳减去等待时间,就是要插入的水位线的时间戳。


水位线生成原则

Flink 中的水位线是流处理中对低延迟和结果正确性的一个权衡机制。

在 DataStream API 中,有一个单独用于生成水位线的方法 assignTimestampsAndWatermarks(),它主要用来为流中的数据分配时间戳,并生成水位线来指示事件时间。

public SingleOutputStreamOperator assignTimestampsAndWatermarks(WatermarkStrategy watermarkStrategy)

该方法需要传入水位线生成策略 WatermarkStrategy,它包含了时间戳分配器(TimestampAssigner)和水位线生成器(WatermarkGenerator)。

public interface WatermarkStrategy extends TimestampAssignerSupplier, WatermarkGeneratorSupplier {
@Override
public TimestampAssigner createTimestampAssigner(TimestampAssignerSupplier.Context context) {
return this.assigner;
}
@Override
WatermarkGenerator createWatermarkGenerator(WatermarkGeneratorSupplier.Context var1);
}


  • TimestampAssigner:主要负责从流中数据元素的某个字段中提取时间戳,并分配给该元素。

  • WatermarkGenerator:主要负责按照既定的方式,基于时间戳生成水位线。在 WatermarkGenerator 接口中,主要有 onEvent()onPredicEmit() 两个方法。

  • onEvent:每个事件(数据)到来都会调用的方法,它的参数有当前事件、时间戳,以及允许发出水位线的一个 WatermarkOutput,可以基于事件做各种操作。

  • onPeriodicEmit:周期性调用的方法,可以由 WatermarkOutput 发出水位线。周期时间为处理时间,可以调用环境配置的 setAutoWatermarkInterva()方法来设置,默认为 200ms。

// 有序流的Watermark生成
DataStream stream = env.fromElements(
new Event("Mary", "./home", 1000L),
new Event("Bob", "./cart", 2000L),
new Event("Bob", "./home", 3000L),
new Event("Mary", "./cart", 2000L))
.assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps()
.withTimestampAssigner(new SerializableTimestampAssigner() {
@Override
public long extractTimestamp(Event event, long l) {
return event.timestamp;
}
})
);
// 无序流的Watermark生成
DataStream stream2 = env.fromElements(
new Event("Mary", "./home", 1000L),
new Event("Bob", "./cart", 2000L),
new Event("Bob", "./home", 3000L),
new Event("Mary", "./cart", 2000L))
.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(2))
.withTimestampAssigner(new SerializableTimestampAssigner() {
@Override
public long extractTimestamp(Event event, long l) {
return event.timestamp;
}
})
);

水位线的传递

水位线传递

在流处理中,上游任务处理完水位线、时钟改变之后,要把当前的水位线再次发出,广播给所有的下游子任务。而当一个任务接收到多个上游并行任务传递来的水位线时,应该以最小的那个作为当前任务的事件时钟。


窗口

概念

想要更加方便高效地处理无界流,一种方式就是将无限数据切割成有限的心数据块”进行处理,这就是所谓的“窗口”。

在 Flink 中,窗口其实并不是一个“框”,流进来的数据被框住了就只能进这一个窗 口。相比之下,我们应该把窗口理解成一个“桶”,窗口可以把流切割成有限大小的多个“存储桶”(bucket)。每个数据都会分发到对应的桶中,当到达窗口结束时间时,就对每个桶中收集的数据进行计算处理。

桶

Flink 中窗口并不是静态准备好的,而是动态创建的。当有落在这个窗口区间范围的数据到达时,才创建对应的窗口。另外,这里我们认为到达窗口结束时间时, 窗口就触发计算并关闭。


分类


按照驱动类型分类



  • 时间窗口(Time Window):按照时间段去截取数据的窗口。

  • 计数窗口(CountWindow):按照固定的个数,来截取了段数据集。


按照窗口分配数据的规则分类



  • 滚动窗口(Tumbling Windows):有固定的大小,对数据进行“均匀切片”的划分方式。窗口之间没有重叠,也不会有间隔,是“首尾相接”的状态。每个数据都会被分配到一个窗口,而且只会属于一个窗口。滚动窗口可以基于时间定义,也可以基于数据个数定义。只需要窗口大小一个参数。

  • 滑动窗口(Sliding Windows):滑动窗口的大小是固定的。但是,窗口之间并不是“首尾相接”的,而是可以“错开”一定的位置。有窗口大小和滑动步长两个参数,滑动步长代表了窗口计算的频率。滑动的距离代表了下一个窗口开始的时间间隔,而窗口大小是固定的。

  • 会话窗口(Session Windows):最重要的参数就是会话的超时时间。如果相邻两个数据到来的时间间隔小于指定大小,说明还在保持会话,它们就属于同一个窗口。反之,则认为新来的数据属于新的会话(另一个窗口)。

  • 全局窗口(Global Windows):全局有效,会把相同 key 的所有数据都分配到同一个窗口中。


API


按键分区和非按键分区

区别:在调用窗口算子之前,是否有 keyBy 操作。



  • 按键分区窗口(Keyed Windows):经过按键分区 keyBy 操作后,数据流会按照 key 被分为多条逻辑流,这 就是 KeyedStream。基于 KeyedStrearn 进行窗口操作时,窗口计算会在多个并行子任务上同时执行。相同 key 的数据会被发送到同一个并行子任务,而窗口操作会基于每个 key 进行单独的处理。

stream.keyBy().window()


  • 非按键分区(Non-Keyed Windows):如果没有进行 keyBy,那么原始的 DataStream 就不会分成多条逻辑流。这时窗口逻辑只能在一个任务上执行,就相当于并行度变成了1。

stream.windowAll()

API 的调用

窗口操作主要有两个部分:窗口分配器和窗口函数。

stream.keyBy()
// 指明了窗口的类型
.window()
// 定义窗口具体的处理逻辑
.aggregate()

窗口分配器

定义窗口分配器(Window Assigners)是构建窗口算子的第一步,它的作用就是定义数据应该被“分配”到哪个窗口。



  • 窗口分配器最通用的定义方式,就是调用 window()方法,需要传入一个 WindowAssigner 作为参数,返回WindowedStream

  • 如果是非按键分区窗口,那么直接调用 windowAll() 方法,同样传入一个 WindowAssigner,返回的是 AllWindowedStream

// 滑动计数:第一个参数计数个数,第二个参数是滑动个数大小
stream.keyBy(event -> event.user).countWindow(10, 2);
// 滚动事件时间窗口
stream.keyBy(event -> event.user).window(TumblingEventTimeWindows.of(Time.hours(1)));
// 滑动事件时间窗口:第一个参数时间窗口大小,第二个参数是滑动时间大小
stream.keyBy(event -> event.user).window(SlidingEventTimeWindows.of(Time.hours(1), Time.minutes(5)));
// 事件时间会话窗口
stream.keyBy(event -> event.user).window(EventTimeSessionWindows.withGap(Time.seconds(2)));

窗口函数


增量聚合函数

窗口将数据收集起来,最基本的处理操作就是进行聚合。每来一个数据就在之前结果上聚合一次,这就是“增量聚合”。



  • 归约函数(ReduceFunction)

SingleOutputStreamOperator stream = env.addSource(new ClickSource())
.assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps()
.withTimestampAssigner(new SerializableTimestampAssigner() {
@Override
public long extractTimestamp(Event event, long l) {
return event.timestamp;
}
})
);
stream.map(event -> Tuple2.of(event.user, 1L))
.returns(new TypeHint>() {})
.keyBy(tuple -> tuple.f0)
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.reduce((t0, t1) -> Tuple2.of(t0.f0, t0.f1 + t1.f1))
.print();


  • 聚合函数(AggregateFunction):输入类型(IN)、累加器类型(ACC)和输出类型(OUT)

    • createAccumulator():创建一个累加器,就是为聚合创建一个初始状态,每个聚合任务只调用一次。

    • add():将输入的元素添加到累加器中。

    • getResult():从累加器中提取聚合的输出结果。

    • merge():合并两个累加器,并将合并后的状态作为一个累加器返回。



stream.keyBy(event -> event.user)
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.aggregate(new AggregateFunction, String>() {
@Override
public Tuple2 createAccumulator() {
return Tuple2.of(0L, 0);
}
@Override
public Tuple2 add(Event event, Tuple2 longIntegerTuple2) {
return Tuple2.of(longIntegerTuple2.f0 + event.timestamp, longIntegerTuple2.f1 + 1);
}
@Override
public String getResult(Tuple2 longIntegerTuple2) {
Timestamp timestamp = new Timestamp(longIntegerTuple2.f0 / longIntegerTuple2.f1);
return timestamp.toString();
}
@Override
public Tuple2 merge(Tuple2 longIntegerTuple2, Tuple2 acc1) {
return Tuple2.of(longIntegerTuple2.f0 + acc1.f0, longIntegerTuple2.f1 + acc1.f1);
}
}).print();

全窗口函数

与增量聚合函数不同,全窗口函数需要先收集窗口中的数据,并在内部缓存起来,等到窗口要输出结果的时候再取出数据进行计算。



  • 窗口函数:以基于 WindowedStream 调用 apply 方法,传入一个 WindowFunction 的实现类。

  • 处理窗口函数:Window API 中最底层的通用窗口函数接口,除了可以拿到窗口中的所有数据之外,还可以获取到一个“上下文对象”。

stream.keyBy(event -> true)
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.process(new UVCountByWindow())
.print();

// 自定义ProcessWindowFunction
public static class UVCountByWindow extends ProcessWindowFunction{
@Override
public void process(Boolean aBoolean, ProcessWindowFunction.Context context, Iterable iterable, Collector collector) throws Exception {
HashSet userSet = new HashSet<>();
for(Event event: iterable) {
userSet.add(event.user);
}
Integer uv = userSet.size();
// 结合窗口信息输出
Long start = context.window().getStart();
Long end = context.window().getEnd();
collector.collect("窗口 " + new Timestamp(start) + "-" + new Timestamp(end) + "UV=" + uv);
}
}

其他 API


触发器

触发器主要是用来控制窗口什么时候触发计算(执行窗口函数)。

stream.keyBy()
// 指明了窗口的类型
.window()
// 定义自定义触发器
.trigger(new MyTrigger())

Trigger 是一个抽象类,自定义时必须实现下面四个抽象方法:



  • onElement():窗口中每到来一个元素,都会调用这个方法。

  • onEventTime():当注册的事件时间计时器触发时,将调用这个方法。

  • onProcessingTime():当注册的处理时间计时器触发时,将调用这个方法。

  • clear():当窗口关闭销毁时,调用这个方法,一般用来清除自定义的状态。

前三个方法返回的都是 TriggerResult 类型,这是一个枚举类型,其中定义了对窗口进行操作的四种类型。



  • CONTINUE:什么都不做

  • FIRE_AND_PURGE:触发计算,输出结果,销毁窗口

  • FIRE:触发计算,输出结果

  • PURGE:清空窗口中所有的数据,销毁窗口


移除器

移除器主要用来定义移除某些数据的逻辑。基于 WindowedStream 调用 evictor 方法,就可以传入一个自定义的移除器。Evictor 是一个接口,不同的窗口类型都有各自预实现的移除器。

stream.keyBy()
// 指明了窗口的类型
.window()
// 定义自定义移除器
.trigger(new MyEvictor())

Evictor 接口定义了两个接口:



  • evictBefore():定义执行窗口函数之前的移除数据操作

  • evictAfter():定义执行窗口函数之后的移除数据操作

默认情况下,预实现的移除器都是在执行窗口函数之前移除数据的。


允许延迟

在事件时间语义下,窗口中可能会出现数据迟到的情況,因此,Flink 提供了一个特殊的接口,可以为窗口算子设置一个允许的最大延迟。

基于 WindowedStream 调用 allowedLateness() 方法,传入一个 Time 类型的延迟时间,就可以表示允许这段时间内的延迟数据。

stream.keyBy()
// 指明了窗口的类型
.window(TumblingEventTimeWindows.of(Time.hours(1)))
// 允许延迟1分钟
.allowedLateness(Time.minutes(1))

侧输出流

另一种处理迟到数据的方法:将迟到的数据存入到侧输出流中。

基于 WindowedStream 调用 sideOutputLateData() 方法,该方法需要传入一个输出标签,用来标记分支的迟到数据。

OutputTag outputTag = new OutputTag("late"){};
stream.keyBy()
// 指明了窗口的类型
.window(TumblingEventTimeWindows.of(Time.hours(1)))
// 侧输出流
.sideOutputLateData(outputTag)

将迟到数据放入侧输出流之后,还应该可以将它提取出来。基于窗口处理完成之后的 DataStream,调用 getSideoutput() 方法,传入对应的输出标签,就可以获取到迟到数据所在的流了。

DataStream lateStream = Stream.getSideOutput(outputTag);


推荐阅读
  • 本文介绍了如何使用php限制数据库插入的条数并显示每次插入数据库之间的数据数目,以及避免重复提交的方法。同时还介绍了如何限制某一个数据库用户的并发连接数,以及设置数据库的连接数和连接超时时间的方法。最后提供了一些关于浏览器在线用户数和数据库连接数量比例的参考值。 ... [详细]
  • 本文介绍了Redis的基础数据结构string的应用场景,并以面试的形式进行问答讲解,帮助读者更好地理解和应用Redis。同时,描述了一位面试者的心理状态和面试官的行为。 ... [详细]
  • 2018年人工智能大数据的爆发,学Java还是Python?
    本文介绍了2018年人工智能大数据的爆发以及学习Java和Python的相关知识。在人工智能和大数据时代,Java和Python这两门编程语言都很优秀且火爆。选择学习哪门语言要根据个人兴趣爱好来决定。Python是一门拥有简洁语法的高级编程语言,容易上手。其特色之一是强制使用空白符作为语句缩进,使得新手可以快速上手。目前,Python在人工智能领域有着广泛的应用。如果对Java、Python或大数据感兴趣,欢迎加入qq群458345782。 ... [详细]
  • 一、Hadoop来历Hadoop的思想来源于Google在做搜索引擎的时候出现一个很大的问题就是这么多网页我如何才能以最快的速度来搜索到,由于这个问题Google发明 ... [详细]
  • Monkey《大话移动——Android与iOS应用测试指南》的预购信息发布啦!
    Monkey《大话移动——Android与iOS应用测试指南》的预购信息已经发布,可以在京东和当当网进行预购。感谢几位大牛给出的书评,并呼吁大家的支持。明天京东的链接也将发布。 ... [详细]
  • 本文介绍了Python高级网络编程及TCP/IP协议簇的OSI七层模型。首先简单介绍了七层模型的各层及其封装解封装过程。然后讨论了程序开发中涉及到的网络通信内容,主要包括TCP协议、UDP协议和IPV4协议。最后还介绍了socket编程、聊天socket实现、远程执行命令、上传文件、socketserver及其源码分析等相关内容。 ... [详细]
  • 本文分享了一个关于在C#中使用异步代码的问题,作者在控制台中运行时代码正常工作,但在Windows窗体中却无法正常工作。作者尝试搜索局域网上的主机,但在窗体中计数器没有减少。文章提供了相关的代码和解决思路。 ... [详细]
  • Java序列化对象传给PHP的方法及原理解析
    本文介绍了Java序列化对象传给PHP的方法及原理,包括Java对象传递的方式、序列化的方式、PHP中的序列化用法介绍、Java是否能反序列化PHP的数据、Java序列化的原理以及解决Java序列化中的问题。同时还解释了序列化的概念和作用,以及代码执行序列化所需要的权限。最后指出,序列化会将对象实例的所有字段都进行序列化,使得数据能够被表示为实例的序列化数据,但只有能够解释该格式的代码才能够确定数据的内容。 ... [详细]
  • t-io 2.0.0发布-法网天眼第一版的回顾和更新说明
    本文回顾了t-io 1.x版本的工程结构和性能数据,并介绍了t-io在码云上的成绩和用户反馈。同时,还提到了@openSeLi同学发布的t-io 30W长连接并发压力测试报告。最后,详细介绍了t-io 2.0.0版本的更新内容,包括更简洁的使用方式和内置的httpsession功能。 ... [详细]
  • 本文介绍了使用PHP实现断点续传乱序合并文件的方法和源码。由于网络原因,文件需要分割成多个部分发送,因此无法按顺序接收。文章中提供了merge2.php的源码,通过使用shuffle函数打乱文件读取顺序,实现了乱序合并文件的功能。同时,还介绍了filesize、glob、unlink、fopen等相关函数的使用。阅读本文可以了解如何使用PHP实现断点续传乱序合并文件的具体步骤。 ... [详细]
  • Voicewo在线语音识别转换jQuery插件的特点和示例
    本文介绍了一款名为Voicewo的在线语音识别转换jQuery插件,该插件具有快速、架构、风格、扩展和兼容等特点,适合在互联网应用中使用。同时还提供了一个快速示例供开发人员参考。 ... [详细]
  • 本文介绍了在Windows环境下如何配置php+apache环境,包括下载php7和apache2.4、安装vc2015运行时环境、启动php7和apache2.4等步骤。希望对需要搭建php7环境的读者有一定的参考价值。摘要长度为169字。 ... [详细]
  • JSP内置对象之application的作用范围和获取方式
    本文介绍了JSP内置对象之application的作用时间范围、可以在不同浏览器获取的特点,以及获取application对象的方法。通过示例代码展示了在JSP中设置和在servlet中获取application对象的步骤。对于学习JSP内置对象的读者来说,本文具有一定的参考价值。摘要长度为163字。 ... [详细]
  • RouterOS 5.16软路由安装图解教程
    本文介绍了如何安装RouterOS 5.16软路由系统,包括系统要求、安装步骤和登录方式。同时提供了详细的图解教程,方便读者进行操作。 ... [详细]
  • 本文介绍了操作系统的定义和功能,包括操作系统的本质、用户界面以及系统调用的分类。同时还介绍了进程和线程的区别,包括进程和线程的定义和作用。 ... [详细]
author-avatar
风让我离开
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有