热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Flink水位线(Watermark)

文章目录什么是水位线水位线的特性如何生成水位线Flink内置水位线生成器自定义水位线策略在自定义数据源中发送水位线水位线的总结在实际应用中,一般会采用事件时间语义。而


文章目录

  • 什么是水位线
  • 水位线的特性
  • 如何生成水位线
  • Flink 内置水位线生成器
  • 自定义水位线策略
  • 在自定义数据源中发送水位线
  • 水位线的总结

在实际应用中,一般会采用事件时间语义。而水位线,就是基于事件时间提出的概念。一个数据产生的时刻,就是流处理中事件触发的时间点,这就是“事件时间”,一般都会以时间戳的形式作为一个字段记录在数据里。这个时间就像商品的“生产日期”一样,一旦产生就是固定的,印在包装袋上,不会因为运输辗转而变化。如果我们想要统计一段时间内的数据,需要划分时间窗口,这时只要判断一下时间戳就可以知道数据属于哪个窗口了。

在这里插入图片描述

在这个处理过程中,我们其实是基于数据的时间戳,自定义了一个“逻辑时钟”。这个时钟的时间不会自动流逝;它的时间进展,就是靠着新到数据的时间戳来推动的。这样的好处在于,计算的过程可以完全不依赖处理时间(系统时间),不论什么时候进行统计处理,得到的结果都是正确的。比如双十一的时候系统处理压力大,我们可能会把大量数据缓存在 Kafka中;过了高峰时段之后再读取出来,在几秒之内就可以处理完几个小时甚至几天的数据,而且依然可以按照数据产生的时间段进行统计,所有窗口都能收集到正确的数据。而一般实时流处理的场景中,事件时间可以基本与处理时间保持同步,只是略微有一点延迟,同时保证了窗口计算的正确性。


什么是水位线

一种简单的想法是,在数据流中加入一个时钟标记,记录当前的事件时间;这个标记可以直接广播到下游,当下游任务收到这个标记,就可以更新自己的时钟了。由于类似于水流中用来做标志的记号,在 Flink 中,这种用来衡量事件时间(Event Time)进展的标记,就被称作“水位线”(Watermark)。


水位线的特性

水位线就代表了当前的事件时间时钟,而且可以在数据的时间戳基础上加一些延迟来保证不丢数据,这一点对于乱序流的正确处理非常重要。
总结一下水位线的特性:


  • 水位线是插入到数据流中的一个标记,可以认为是一个特殊的数据

  • 水位线主要的内容是一个时间戳,用来表示当前事件时间的进展

  • 水位线是基于数据的时间戳生成的

  • 水位线的时间戳必须单调递增,以确保任务的事件时间时钟一直向前推进

  • 水位线可以通过设置延迟,来保证正确处理乱序数据

  • 一个水位线 Watermark(t),表示在当前流中事件时间已经达到了时间戳 t, 这代表 t 之前的所有数据都到齐了,之后流中不会出现时间戳 t’ ≤ t 的数据


如何生成水位线

在 Flink 的 DataStream API 中 , 有 一 个 单 独 用 于 生 成 水 位 线 的 方法:.assignTimestampsAndWatermarks(),它主要用来为流中的数据分配时间戳,并生成水位线来指示事件时间:

public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks(WatermarkStrategy<T> watermarkStrategy)

具体使用时&#xff0c;直接用 DataStream 调用该方法即可&#xff0c;与普通的 transform 方法完全一样。

DataStream<Event> stream &#61; env.addSource(new ClickSource());
DataStream<Event> withTimestampsAndWatermarks &#61; stream.assignTimestampsAndWatermarks(<watermark strategy>);

这里读者可能有疑惑&#xff1a;不是说数据里已经有时间戳了吗&#xff0c;为什么这里还要“分配”呢&#xff1f;这是因为原始的时间戳只是写入日志数据的一个字段&#xff0c;如果不提取出来并明确把它分配给数据&#xff0c;
Flink 是无法知道数据真正产生的时间的。当然&#xff0c;有些时候数据源本身就提供了时间戳信息&#xff0c;比如读取 Kafka 时&#xff0c;我们就可以从 Kafka 数据中直接获取时间戳&#xff0c;而不需要单独提取字段分配了。
.assignTimestampsAndWatermarks() 方法需要传入一个 WatermarkStrategy 作为参数&#xff0c;这就是 所 谓 的 “ 水 位 线 生 成 策 略 ” 。 WatermarkStrategy 中 包 含 了 一 个 “ 时 间 戳 分 配器” TimestampAssigner 和一个“水位线生成器”WatermarkGenerator

&#64;Public
public interface WatermarkStrategy<T>extends TimestampAssignerSupplier<T>, WatermarkGeneratorSupplier<T> {// ------------------------------------------------------------------------// Methods that implementors need to implement.// ------------------------------------------------------------------------/** Instantiates a WatermarkGenerator that generates watermarks according to this strategy. */&#64;OverrideWatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context);/*** Instantiates a {&#64;link TimestampAssigner} for assigning timestamps according to this strategy.*/&#64;Overridedefault TimestampAssigner<T> createTimestampAssigner(TimestampAssignerSupplier.Context context) {// By default, this is {&#64;link RecordTimestampAssigner},// for cases where records come out of a source with valid timestamps, for example from// Kafka.return new RecordTimestampAssigner<>();}

  • TimestampAssigner&#xff1a;主要负责从流中数据元素的某个字段中提取时间戳&#xff0c;并分配给元素。时间戳的分配是生成水位线的基础。
  • WatermarkGenerator&#xff1a;主要负责按照既定的方式&#xff0c;基于时间戳生成水位线。在WatermarkGenerator 接口中&#xff0c;主要又有两个方法&#xff1a;onEvent()和 onPeriodicEmit()。
  • onEvent&#xff1a;每个事件&#xff08;数据&#xff09;到来都会调用的方法&#xff0c;它的参数有当前事件、时间戳&#xff0c;以及允许发出水位线的一个 WatermarkOutput&#xff0c;可以基于事件做各种操作
  • onPeriodicEmit&#xff1a;周期性调用的方法&#xff0c;可以由 WatermarkOutput 发出水位线。周期时间为处理时间&#xff0c;可以调用环境配置的.setAutoWatermarkInterval()方法来设置&#xff0c;默认为200ms。

env.getConfig().setAutoWatermarkInterval(60 * 1000L);

Flink 内置水位线生成器

WatermarkStrategy 这个接口是一个生成水位线策略的抽象&#xff0c;让我们可以灵活地实现自己的需求&#xff1b;但看起来有些复杂&#xff0c;如果想要自己实现应该还是比较麻烦的。好在 Flink 充分考虑到了我们的痛苦&#xff0c;提供了内置的水位线生成&#xff08;WatermarkGenerator&#xff09;&#xff0c;不仅开箱即用简化了编程&#xff0c;而且也为我们自定义水位线策略提供了模板。这两个生成器可以通过调用 WatermarkStrategy 的静态辅助方法来创建。它们都是周期性生成水位线的&#xff0c;分别对应着处理有序流和乱序流的场景。
&#xff08;1&#xff09;有序流
对于有序流&#xff0c;主要特点就是时间戳单调增长&#xff08;Monotonously Increasing Timestamps&#xff09;&#xff0c;所以永远不会出现迟到数据的问题。这是周期性生成水位线的最简单的场景&#xff0c;直接调用WatermarkStrategy.forMonotonousTimestamps() 方法就可以实现。简单来说&#xff0c;就是直接拿当前最大的时间戳作为水位线就可以了。

.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<Event>() {&#64;Overridepublic long extractTimestamp(Event element, long recordTimestamp) {return element.timestamp; //默认是毫秒}}))

上面代码中我们调用 .withTimestampAssigner() 方法&#xff0c;将数据中的 timestamp 字段提取出来&#xff0c;作为时间戳分配给数据元素&#xff1b;然后用内置的有序流水位线生成器构造出了生成策略。这样&#xff0c;提取出的数据时间戳&#xff0c;就是我们处理计算的事件时间。
这里需要注意的是&#xff0c;时间戳和水位线的单位&#xff0c;必须都是毫秒。
&#xff08;2&#xff09;乱序流
由于乱序流中需要等待迟到数据到齐&#xff0c;所以必须设置一个固定量的延迟时间&#xff08;Fixed Amount of Lateness&#xff09;。这时生成水位线的时间戳&#xff0c;就是当前数据流中最大的时间戳减去延迟的结果&#xff0c;相当于把表调慢&#xff0c;当前时钟会滞后于数据的最大时间戳。调用 WatermarkStrategy. forBoundedOutOfOrderness() 方法就可以实现。这个方法需要传入一个 maxOutOfOrderness 参数&#xff0c;表示“最大乱序程度”&#xff0c;它表示数据流中乱序数据时间戳的最大差值&#xff1b;如果我们能确定乱序程度&#xff0c;那么设置对应时间长度的延迟&#xff0c;就可以等到所有的乱序数据了。
代码示例如下&#xff1a;

public class WatermarkTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env &#61; StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//周期性生成watermarkTestenv.getConfig().setAutoWatermarkInterval(100);// 读取数据源&#xff0c;并行度为 1DataStream<Event> stream &#61; env.fromElements(new Event("Mary", "./home", 1000L),new Event("Bob", "./cart", 2000L),new Event("Alice", "./prod?id&#61;100", 3000L),new Event("Alice", "./prod?id&#61;200", 3500L),new Event("Bob", "/prod?id&#61;2", 2500L),new Event("Alice", "./prod?id&#61;300", 3600L),new Event("Bob", "./home", 3000L),new Event("Bob", "./prod?id&#61;1", 2300L),new Event("Bob", "./prod?id&#61;3", 3300L))//乱序的Watermarks,延迟时间设置为 2s.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner(new SerializableTimestampAssigner<Event>() {// 抽取时间戳的逻辑&#64;Overridepublic long extractTimestamp(Event element, long recordTimestamp) {return element.timestamp;}}));env.execute();}
}

上面代码中&#xff0c;我们同样提取了 timestamp 字段作为时间戳&#xff0c;并且以 5 秒的延迟时间创建了处理乱序流的水位线生成器。
事实上&#xff0c;有序流的水位线生成器本质上和乱序流是一样的&#xff0c;相当于延迟设为 0 的乱序流水位线生成器&#xff0c;两者完全等同&#xff1a;

WatermarkStrategy.forMonotonousTimestamps()
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(0))

这里需要注意的是&#xff0c;乱序流中生成的水位线真正的时间戳&#xff0c;其实是 当前最大时间戳 – 延迟时间 – 1&#xff0c;这里的单位是毫秒。为什么要减 1 毫秒呢&#xff1f;我们可以回想一下水位线的特点&#xff1a;时间戳为 t 的水位线&#xff0c;表示时间戳≤t 的数据全部到齐&#xff0c;不会再来了。如果考虑有序流&#xff0c;也就是延迟时间为 0 的情况&#xff0c;那么时间戳为 7 秒的数据到来时&#xff0c;之后其实是还有可能继续来 7 秒的数据的&#xff1b;所以生成的水位线不是 7 秒&#xff0c;而是 6 秒 999 毫秒&#xff0c;7 秒的数据还可以继续来。这一点可以在 BoundedOutOfOrdernessWatermarks 的源码中明显地看到&#xff1a;

在这里插入图片描述
Flink有序乱序流测试源代码&#xff1a;Flink有序乱序流测试


自定义水位线策略

一般来说&#xff0c;Flink 内置的水位线生成器就可以满足应用需求了。不过有时我们的业务逻辑可能非常复杂&#xff0c;这时对水位线生成的逻辑也有更高的要求&#xff0c;我们就必须自定义实现水位线策略 WatermarkStrategy 了。
在 WatermarkStrategy 中&#xff0c;时间戳分配器 TimestampAssigner 都是大同小异的&#xff0c;指定字段提取时间戳就可以了&#xff1b;而不同策略的关键就在于 WatermarkGenerator 的实现。整体说来&#xff0c;Flink有两种不同的生成水位线的方式&#xff1a;一种是周期性的&#xff08;Periodic&#xff09;&#xff0c;另一种是断点式的&#xff08;Punctuated&#xff09;。还记得 WatermarkGenerator 接口中的两个方法吗&#xff1f;—— onEvent()onPeriodicEmit()&#xff0c;前者是在每个事件到来时调用&#xff0c;而后者由框架周期性调用。周期性调用的方法中发出水位线&#xff0c;自然就是周期性生成水位线&#xff1b;而在事件触发的方法中发出水位线&#xff0c;自然就是断点式生成了。两种方式的不同就集中体现在这两个方法的实现上。

&#xff08;1&#xff09;周期性水位线生成器&#xff08;Periodic Generator&#xff09;
周期性生成器一般是通过 onEvent()观察判断输入的事件&#xff0c;而在 onPeriodicEmit()里发出水位线。
下面是一段自定义周期性生成水位线的代码&#xff1a;
周期性水位线生成器源代码

onPeriodicEmit()里调用 output.emitWatermark()&#xff0c;就可以发出水位线了&#xff1b;这个方法由系统框架周期性地调用&#xff0c;默认 200ms 一次。所以水位线的时间戳是依赖当前已有数据的最大时间戳的&#xff08;这里的实现与内置生成器类似&#xff0c;也是减去延迟时间再减 1&#xff09;&#xff0c;但具体什么时候生成与数据无关。

&#xff08;2&#xff09;断点式水位线生成器&#xff08;Punctuated Generator&#xff09;
断点式生成器会不停地检测 onEvent()中的事件&#xff0c;当发现带有水位线信息的特殊事件时&#xff0c;就立即发出水位线。一般来说&#xff0c;断点式生成器不会通过 onPeriodicEmit()发出水位线。
自定义的断点式水位线生成器代码如下&#xff1a;断点式水位线生成器源代码
在 onEvent()中判断当前事件的 user 字段&#xff0c;只有遇到“Mary”这个特殊的值时&#xff0c;才调用 output.emitWatermark() 发出水位线。这个过程是完全依靠事件来触发的&#xff0c;所以水位线的生成一定在某个数据到来之后。


在自定义数据源中发送水位线

我们也可以在自定义的数据源中抽取事件时间&#xff0c;然后发送水位线。这里要注意的是&#xff0c;在自定义数据源中发送了水位线以后&#xff0c;就不能再在程序中使用assignTimestampsAndWatermarks 方法 来 生 成 水 位 线 了 。 在 自 定 义 数 据 源 中 生 成 水 位 线 和 在 程 序 中 使 用assignTimestampsAndWatermarks 方法生成水位线二者只能取其一。
示例程序如下&#xff1a;自定义数据源中发送水位线
在自定义水位线中生成水位线相比 assignTimestampsAndWatermarks 方法更加灵活&#xff0c;可以任意的产生周期性的、非周期性的水位线&#xff0c;以及水位线的大小也完全由我们自定义。所以非常适合用来编写 Flink 的测试程序&#xff0c;测试 Flink 的各种各样的特性。


水位线的总结

水位线在事件时间的世界里面&#xff0c;承担了时钟的角色。也就是说在事件时间的流中&#xff0c;水位线是唯一的时间尺度。
水位线是一种特殊的事件&#xff0c;由程序员通过编程插入的数据流里面&#xff0c;然后跟随数据流向下游流动。

水位线的默认计算公式&#xff1a;水位线 &#61; 观察到的最大事件时间 – 最大延迟时间 – 1 毫秒。

所以这里涉及到一个问题&#xff0c;就是不同的算子看到的水位线的大小可能是不一样的。因为下游的算子可能并未接收到来自上游算子的水位线&#xff0c;导致下游算子的时钟要落后于上游算子的时钟。比如 map->reduce 这样的操作&#xff0c;如果在 map 中编写了非常耗时间的代码&#xff0c;将会阻塞水位线的向下传播&#xff0c;因为水位线也是数据流中的一个事件&#xff0c;位于水位线前面的数据如果没有处理完毕&#xff0c;那么水位线不可能弯道超车绕过前面的数据向下游传播&#xff0c;也就是说会被前面的数据阻塞。
这样就会影响到下游算子的聚合计算&#xff0c;因为下游算子中无论由窗口聚合还是定时器的操作&#xff0c;都需要水位线才能触发执行。这也就告诉了我们&#xff0c;在编写 Flink 程序时&#xff0c;一定要谨慎的编写每一个算子的计算逻辑&#xff0c;尽量避免大量计算或者是大量的 IO 操作&#xff0c;这样才不会阻塞水位线的向下传递。

在数据流开始之前&#xff0c;Flink 会插入一个大小是负无穷大&#xff08;在 Java 中是-Long.MAX_VALUE&#xff09;的水位线&#xff0c;而在数据流结束时&#xff0c;Flink 会插入一个正无穷大(Long.MAX_VALUE)的水位线&#xff0c;保证所有的窗口闭合以及所有的定时器都被触发。对于离线数据集&#xff0c;Flink 也会将其作为流读入&#xff0c;也就是一条数据一条数据的读取。在这种情况下&#xff0c;Flink 对于离线数据集&#xff0c;只会插入两次水位线&#xff0c;也就是在最开始处插入负无穷大的水位线&#xff0c;在结束位置插入一个正无穷大的水位线。因为只需要插入两次水位线&#xff0c;就可以保证计算的正确&#xff0c;无需在数据流的中间插入水位线了。

水位线的重要性在于它的逻辑时钟特性&#xff0c;而逻辑时钟这个概念可以说是分布式系统里面最为重要的概念之一了&#xff0c;理解透彻了对理解各种分布式系统非常有帮助。具体可以参考 Leslie Lamport 的论文。


推荐阅读
  • ### 优化后的摘要本学习指南旨在帮助读者全面掌握 Bootstrap 前端框架的核心知识点与实战技巧。内容涵盖基础入门、核心功能和高级应用。第一章通过一个简单的“Hello World”示例,介绍 Bootstrap 的基本用法和快速上手方法。第二章深入探讨 Bootstrap 与 JSP 集成的细节,揭示两者结合的优势和应用场景。第三章则进一步讲解 Bootstrap 的高级特性,如响应式设计和组件定制,为开发者提供全方位的技术支持。 ... [详细]
  • com.sun.javadoc.PackageDoc.exceptions()方法的使用及代码示例 ... [详细]
  • Spark与HBase结合处理大规模流量数据结构设计
    本文将详细介绍如何利用Spark和HBase进行大规模流量数据的分析与处理,包括数据结构的设计和优化方法。 ... [详细]
  • 本文总结了一些开发中常见的问题及其解决方案,包括特性过滤器的使用、NuGet程序集版本冲突、线程存储、溢出检查、ThreadPool的最大线程数设置、Redis使用中的问题以及Task.Result和Task.GetAwaiter().GetResult()的区别。 ... [详细]
  • 本文介绍了如何利用Shell脚本高效地部署MHA(MySQL High Availability)高可用集群。通过详细的脚本编写和配置示例,展示了自动化部署过程中的关键步骤和注意事项。该方法不仅简化了集群的部署流程,还提高了系统的稳定性和可用性。 ... [详细]
  • 本文详细解析了 Android 系统启动过程中的核心文件 `init.c`,探讨了其在系统初始化阶段的关键作用。通过对 `init.c` 的源代码进行深入分析,揭示了其如何管理进程、解析配置文件以及执行系统启动脚本。此外,文章还介绍了 `init` 进程的生命周期及其与内核的交互方式,为开发者提供了深入了解 Android 启动机制的宝贵资料。 ... [详细]
  • 在Cisco IOS XR系统中,存在提供服务的服务器和使用这些服务的客户端。本文深入探讨了进程与线程状态转换机制,分析了其在系统性能优化中的关键作用,并提出了改进措施,以提高系统的响应速度和资源利用率。通过详细研究状态转换的各个环节,本文为开发人员和系统管理员提供了实用的指导,旨在提升整体系统效率和稳定性。 ... [详细]
  • 在处理 XML 数据时,如果需要解析 `` 标签的内容,可以采用 Pull 解析方法。Pull 解析是一种高效的 XML 解析方式,适用于流式数据处理。具体实现中,可以通过 Java 的 `XmlPullParser` 或其他类似的库来逐步读取和解析 XML 文档中的 `` 元素。这样不仅能够提高解析效率,还能减少内存占用。本文将详细介绍如何使用 Pull 解析方法来提取 `` 标签的内容,并提供一个示例代码,帮助开发者快速解决问题。 ... [详细]
  • 在本地环境中部署了两个不同版本的 Flink 集群,分别为 1.9.1 和 1.9.2。近期在尝试启动 1.9.1 版本的 Flink 任务时,遇到了 TaskExecutor 启动失败的问题。尽管 TaskManager 日志显示正常,但任务仍无法成功启动。经过详细分析,发现该问题是由 Kafka 版本不兼容引起的。通过调整 Kafka 客户端配置并升级相关依赖,最终成功解决了这一故障。 ... [详细]
  • 本指南从零开始介绍Scala编程语言的基础知识,重点讲解了Scala解释器REPL(读取-求值-打印-循环)的使用方法。REPL是Scala开发中的重要工具,能够帮助初学者快速理解和实践Scala的基本语法和特性。通过详细的示例和练习,读者将能够熟练掌握Scala的基础概念和编程技巧。 ... [详细]
  • 每日前端实战:148# 视频教程展示纯 CSS 实现按钮两侧滑入装饰元素的悬停效果
    通过点击页面右侧的“预览”按钮,您可以直接在当前页面查看效果,或点击链接进入全屏预览模式。该视频教程展示了如何使用纯 CSS 实现按钮两侧滑入装饰元素的悬停效果。视频内容具有互动性,观众可以实时调整代码并观察变化。访问以下链接体验完整效果:https://codepen.io/comehope/pen/yRyOZr。 ... [详细]
  • 本文深入探讨了NoSQL数据库的四大主要类型:键值对存储、文档存储、列式存储和图数据库。NoSQL(Not Only SQL)是指一系列非关系型数据库系统,它们不依赖于固定模式的数据存储方式,能够灵活处理大规模、高并发的数据需求。键值对存储适用于简单的数据结构;文档存储支持复杂的数据对象;列式存储优化了大数据量的读写性能;而图数据库则擅长处理复杂的关系网络。每种类型的NoSQL数据库都有其独特的优势和应用场景,本文将详细分析它们的特点及应用实例。 ... [详细]
  • 如何优化MySQL数据库性能以提升查询效率和系统稳定性 ... [详细]
  • 本文介绍了如何在iOS平台上使用GLSL着色器将YV12格式的视频帧数据转换为RGB格式,并展示了转换后的图像效果。通过详细的技术实现步骤和代码示例,读者可以轻松掌握这一过程,适用于需要进行视频处理的应用开发。 ... [详细]
  • 本文深入探讨了HTTP头部中的Expires与Cache-Control字段及其缓存机制。Cache-Control字段主要用于控制HTTP缓存行为,其在HTTP/1.1中得到了广泛应用,而HTTP/1.0中主要使用Pragma:no-cache来实现类似功能。Expires字段则定义了资源的过期时间,帮助浏览器决定是否从缓存中读取资源。文章详细解析了这两个字段的具体用法、相互关系以及在不同场景下的应用效果,为开发者提供了全面的缓存管理指南。 ... [详细]
author-avatar
无谓__
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有