热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Flink/ScalaTimeWindow处理迟到数据详解

目录一.引言二.FlinkTimeWindow丢数据示例1.代码分析2.Watermark生成逻辑3.丢失数据代码测试三.Flink处理迟到数据策略1.forBound

目录

一.引言

二.Flink TimeWindow 丢数据示例

1.代码分析

2.Watermark 生成逻辑

3.丢失数据代码测试

三.Flink 处理迟到数据策略

1.forBoundedOutOfOrderness

2.Allowed Lateness

3.SideOutputLateData

四.Flink 处理迟到数据实战

1.代码分析

2.迟到数据处理实战

3.日志分析

五.总结




一.引言

在事件时间 EventTime 语义环境下,窗口中可能出现数据迟到的情况,即 Event 时间戳小于水位线 Watermark,此时数据流未乱序流,水位线不能保证小于自己的时间戳的 Event 不会到来。这时会出现一种情况,水位线到达窗口触发时间触发窗口计算,而属于该窗口的迟到数据到来,默认情况下该类数据会被丢弃。数据的丢弃会导致窗口的计算结果不准确,因此 Flink 推出了 3 种方法处理迟到数据:


- WatermarkStrategy.forBoundedOutOfOrderness 最大延时

- Allowed Lateness 窗口延迟注销

- SideOutputLateData 侧输出流处理迟到数据



二.Flink TimeWindow 丢数据示例


1.代码分析

在介绍几种处理迟到数据方法之前,先示例一下 Flink 在正常情况下如何丢失数据导致计算不准确。以下 Demo 使用 Event 类作为 DataStream[T] 的数据类型,Event 为用户浏览 URL 的简单信息:

// 用户浏览行为
case class Event(user: String, url: String, timeStamp: Long)

def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment// 全局并行度env.setParallelism(1)env.socketTextStream("localhost", 9999) // A.本地 Socket 流.map(line => {val info = line.split(",")Event(info(0), info(1), info(2).toLong)}).assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness[Event](Duration.ofSeconds(5)) // B.延时5s.withTimestampAssigner(new SerializableTimestampAssigner[Event] {override def extractTimestamp(event: Event, l: Long): LOng= event.timeStamp})).keyBy(_.user).window(TumblingEventTimeWindows.of(Time.seconds(10))) // C.10s滚动窗口.process(new ProcessWindowFunction[Event, String, String, TimeWindow] {override def process(key: String, context: Context, elements: Iterable[Event], out: Collector[String]): Unit = {val set = new mutable.HashSet[String]()elements.foreach(elem => set.add(elem.user))val start: LOng= context.window.getStartval end: LOng= context.window.getEnd// D.获取当前 WaterMarkval currentWatermark = context.currentWatermarkval count = elements.sizeval log = s"Window Start: $start End: $end CurrentWaterMark: $currentWatermark Count: $count"out.collect(log)}}).print()env.execute()}

上面是示例的代码,主要部分为 A、B、C、D:

A.本地 Socket 流:后续我们从本地开启通道模式 Event 数据写入

B.延时5s:这里表示了对迟到数据的容忍程度,生成 Watermark 时会延后5s

C.10s滚动窗口:滚动窗口的窗口开关时间固定,例如 10:10-10:20、10:20-10:30,以此类推

D.Watermark:获取当前 Watermark 并输出到下游


2.Watermark 生成逻辑

WatermarkGenerator.BoundedOutOfOrdernessWatermarks 中 onPeriodicEmit 负责周期的生成 Watermark,通过 onEvent 更新当前最大时间戳,再通过 emitWatermark 生成水印,由于引入延迟 outOfOrdernessMillis 参数,该策略会调低水位线的生成,相当于人工把钟表调慢等待迟到的数据一定时间一样,默认情况下 emit 的 period 周期为 200ms。 

Tips:

-1L:生成水位线时用了 -1L,这里和水位线的释义有关系,水位线标识小于等于自己的时间戳的 Event 都不会来了,而窗口的时间范围为左闭右开,例如 [0,10000),如果生成 10000 的水位线触发窗口,则代表小于等于 10000 时间戳的 Event 都不会到来了,此时 timestamp = 10000 的元素也会计入 [0,10000) 的窗口,因此影响数据准确性,所以这里通过 -1L 使得触发的数据范围为 [0, 9999],而窗口的起止为 [0,10000)。


3.丢失数据代码测试

在本地 terminal 内执行下述命令启动 Socket:

nc -lk 9999

Alice,1,1000
Alice,2,2000
Alice,3,15000
Alice,4,9000
Alice,3,20000
Alice,4,25000

依次输入上述信息得到如下结果: 


TimeStampMaxTimeStampWatermarkTimeWindow
10001000//
20002000//
15000150009999[0,10000) 触发
9000150009999数据迟到
200002000014999/
250002500019999[10000,20000) 触发
//LONG.MAX_VALUE[20000,30000) 触发

上面表格为每条数据输入对应的操作:

第一次触发:1000,2000 进入 [0,10000) 的窗口后,在 15000 对应 Event 达到后,水位线推进到 15000 - 5000 - 1 即 [0,10000) 窗口的触发时间,所以触发窗口计算,这时元素为 1000,2000,所以 Count = 2

第二次触发:15000 属于 [10000,20000) 窗口,在 25000 对应 Event 到达后,水位线推进到 25000 - 5000 -1 即 [10000,20000) 窗口的触发时间,所以触发窗口计算,这时元素为 15000,所以 Count = 1

第三次触发:20000,25000 属于 [20000,30000) 窗口,最终关闭 Socket 时,Flink 会发出一个 Long.MAX_VALUE = 9223372036854775807 的 WaterMark,从而触发所有窗口计算,此时只剩 [20000,30000) 窗口未触发,触发后元素为 20000,25000,所以 Count = 2

Tips:

累计 Count 为 2 + 1 + 2 = 5,而总共输入元素为 6 个,因为 9000 对应的 Event 输入的时候,属于它的窗口 [0,10000) 已经触发,所以 9000 没有在触发窗口逻辑中,这也就是乱序数据下 Flink 数据丢失的一种真实案例。


三.Flink 处理迟到数据策略

引言中提到了 Flink 3 种处理迟到数据的方法,下面一一介绍:


- WatermarkStrategy.forBoundedOutOfOrderness 最大延时

- Allowed Lateness 窗口延迟注销

- SideOutputLateData 侧输出流处理迟到数据



1.forBoundedOutOfOrderness

WatermarkStrategy 的方法属性,允许乱序流中的数据存在延迟,其中 maxOutOfOrderness 参数代表最大乱序程度,其中最关键的实现就是 onPeriodicEmit:

public void onPeriodicEmit(WatermarkOutput output) {output.emitWatermark(new Watermark(this.maxTimestamp - this.outOfOrdernessMillis - 1L));}

其生成的水印会根据 maxOutOfOrderness 进行延迟,例如 10000 的数据到来,正常情况下触发 [0,10000) 的窗口触发,而设置5s延迟后,必须等到 15000 的数据来,[0,10000) 才会触发,因此在5s内迟到的数据,依然会进入到属于自己的窗口并最终统一计算。这里相当于是性能和时效性的权衡,延时一段时间,但是保障了数据的相对准确性。使用时只需要在 WaterMarkStrategy 中设置即可:

WatermarkStrategy.forBoundedOutOfOrderness[Event](Duration.ofSeconds(5)).withTimestampAssigner(new SerializableTimestampAssigner[Event] {override def extractTimestamp(event: Event, l: Long): LOng= event.timeStamp})


2.Allowed Lateness

虽然设置了 maxOutOfOrderness 的最大延时时间,但是还是无法避免迟到过久的数据,像上面例子中的一样,9000 比 15000 延迟了 6s,而此时 9000 对应的窗口已经触发并销毁,因此 9000 对应的数据就被舍弃了。Allowed Lateness 参数允许设定一段延迟时间,在延时的这段时间内,窗口不会注销,如果此时有迟到且属于对应窗口的数据到来,数据仍然可以进入窗口并参与计算,直到 Watermark 推进至 窗口结束时间 + Allowed Lateness 设定的延时时间,窗口才会真正销毁,这时候如果还有迟到的数据,就找不到对应窗口了。这里相当于双保险,在水印延迟生成的情况下,窗口销毁时间也延后。

val result = stream.keyBy(_.user).window(TumblingEventTimeWindows.of(Time.seconds(10))).allowedLateness(Time.minutes(1)) // 1分钟的窗口迟到等待时间

Flink 窗口的触发相关知识可以参考:Flink - Scala/Java trigger 简介与使用。


3.SideOutputLateData

无论是延迟水印,亦或是延迟窗口销毁时间,这个时间都是有限的,程序都无法永远等下去,因此超过双保险的数据还是会被丢弃,如果不想丢掉任何一个数据,则可以增加侧输出流,该数据流负责接收超时严重被舍弃的数据,而通过 stream.getSideOutput 方法可以获取该类数据,你需要重新增加处理逻辑处理,而他们依旧无法进入到之前的正确窗口中。

val sideOutputTag: OutputTag[Event] = new OutputTag[Event]("late")val result = stream.keyBy(_.user).window(TumblingEventTimeWindows.of(Time.seconds(10))).allowedLateness(Time.minutes(1)) // 方法2:1分钟的窗口迟到等待时间.sideOutputLateData(sideOutputTag) // 方法3: 最后迟到的数据输出到侧输出流.aggregate(new UrlViewCountAgg(), new UrlViewCountResult())result.print("result")result.getSideOutput(sideOutputTag).print("late")

使用 SideOutputLateData 需要先定义 OutputTag 标记侧输出流类型,并在 window 后添加 SideOutputLateData 选项,最终通过 getSideOutput 输出,print 内的 "late" 为该数据流标记,后续日志中可以与 result 的正常日志区分。


四.Flink 处理迟到数据实战

下面通过一个案例,同时运用上述三个方法展示 Flink EventTime + TimeWindow 如何处理迟到数据。


1.代码分析

def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment// 全局并行度env.setParallelism(1)val stream = env.socketTextStream("localhost", 9999).map(line => {val info = line.split(",")Event(info(0), info(1), info(2).toLong)}).assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness[Event](Duration.ofSeconds(2)) // 方法1:延时等待2s.withTimestampAssigner(new SerializableTimestampAssigner[Event] {override def extractTimestamp(event: Event, l: Long): LOng= event.timeStamp}))val sideOutputTag: OutputTag[Event] = new OutputTag[Event]("late")val result = stream.keyBy(_.user).window(TumblingEventTimeWindows.of(Time.seconds(10))).allowedLateness(Time.minutes(1)) // 方法2:1分钟的窗口迟到等待时间.sideOutputLateData(sideOutputTag) // 方法3: 最后迟到的数据输出到侧输出流.aggregate(new UrlViewCountAgg(), new UrlViewCountResult())result.print("result")result.getSideOutput(sideOutputTag).print("late")// 原始数据stream.print("input")env.execute()}

上面分别使用了 2s 的水印生成延迟策略,1min 的窗口延时注销策略以及兜底的侧输出流策略保证迟到数据的处理,同时使用 "input" Tag 输出原始数据,"result" Tag 输出窗口聚合数据,"late" Tag 输出迟到过久的数据。处理逻辑中使用了 aggregate 以及对应的 ACC 与 ProcessFunction,具体代码细节请参考:Flink / Scala - Aggregate 详解与 UV、PV 统计实战。


2.迟到数据处理实战

在本地 terminal 内执行下述命令启动 Socket:

nc -lk 9999

a,1,1000
a,1,2000
a,1,10000
a,1,9000
a,1,12000
a,1,15000
a,1,9000
a,1,8000
a,1,70000
a,1,8000
a,1,72000
a,1,8000

 依次输入上述 Event 并得到下述结果:


3.日志分析

下面以 "Result" Tag 触发的日志为分界线,详细分析窗口的触发逻辑。

A.第一次触发

上面的数据与日志比较多,这里做分解的详细分析:

因为设置了 2s 的延迟,所以 [0,10000) 的窗口需要等到 12000 的元素到来,此时 Watermark 为 12000 - 2000 - 1 = 9999,所以第一次窗口触发,此时窗口元素包含 1000,2000,9000,可以看到 9000 虽然比 10000 迟到了,但是由于 2s 的延迟,使得其进入正确的窗口并触发计算。注意,这里 [0,10000) 触发后并未销毁,需要再等 1min,即水位线到达 10000 + 60000 = 70000 时,窗口才会真正销毁,不再接收迟到数据。

B.第二,三次触发

15000 后迟到数据 9000 和 8000 到达,此时水位线为 15000 - 2000 - 1 = 12999 未达到 70000,所以 [0,10000)窗口未销毁,所以 9000,8000 继续触发对应窗口计算,所以 Count 由 3 变为 5,其余日志不变。

C.第四,五次触发

 随着 70000 的到来,水位线推进至 70000 - 2000 - 1 = 67999,此时触发 [10000,20000) 的窗口,其实这里 [20000,30000),[30000,40000) 等的窗口也会被触发,但是由于没有对应元素开窗,所以只触发  [10000,20000) 内的 10000,12000,15000 计算,此时 Count 为 3。而在 70000 后迟到数据 8000 再次到来,这时 67999 未达到 70000 的窗口销毁时间,所以还能继续触发 [0,10000) 窗口的计算,此时 Count 由 5 转换为 6。

D.第六次触发

随着 72000 的到来,水位线推进至 72000 - 2000 - 1 = 69999,达到 [0,10000) 窗口注销的条件,所以 [0,10000)窗口注销,此时再有属于 [0,10000) 窗口的数据到来将不会窗口计算,而是输入到侧输出流,所以 72000 后到达的 8000,以 "late" Tag 输出。最终的触发为关闭 Socket,Flink 发出 LONG.MAX_VALUE,触发了 [70000,80000) 的窗口,此时剩余的元素 70000,72000 触发窗口计算,此时 Count = 2。


五.总结

至此,一次完整的迟到数据处理也就结束了,可以看到,迟到相对较近的元素可以通过 forBoundedOutOfOrderness 策略捕获,迟到相对较远的元素则需要通过 Allowed Lateness 捕获,二者都未捕获的,需要 SideOutputLateData 兜底并最终捕获。迟到数据的处理本质上就是精准度与效率以及资源的权衡,大家可以根据自己的业务场景与实际需求,决定每个策略的容忍度。


推荐阅读
  • Java太阳系小游戏分析和源码详解
    本文介绍了一个基于Java的太阳系小游戏的分析和源码详解。通过对面向对象的知识的学习和实践,作者实现了太阳系各行星绕太阳转的效果。文章详细介绍了游戏的设计思路和源码结构,包括工具类、常量、图片加载、面板等。通过这个小游戏的制作,读者可以巩固和应用所学的知识,如类的继承、方法的重载与重写、多态和封装等。 ... [详细]
  • 本文介绍了闭包的定义和运转机制,重点解释了闭包如何能够接触外部函数的作用域中的变量。通过词法作用域的查找规则,闭包可以访问外部函数的作用域。同时还提到了闭包的作用和影响。 ... [详细]
  • Iamtryingtomakeaclassthatwillreadatextfileofnamesintoanarray,thenreturnthatarra ... [详细]
  • 本文介绍了C#中生成随机数的三种方法,并分析了其中存在的问题。首先介绍了使用Random类生成随机数的默认方法,但在高并发情况下可能会出现重复的情况。接着通过循环生成了一系列随机数,进一步突显了这个问题。文章指出,随机数生成在任何编程语言中都是必备的功能,但Random类生成的随机数并不可靠。最后,提出了需要寻找其他可靠的随机数生成方法的建议。 ... [详细]
  • 开发笔记:实验7的文件读写操作
    本文介绍了使用C++的ofstream和ifstream类进行文件读写操作的方法,包括创建文件、写入文件和读取文件的过程。同时还介绍了如何判断文件是否成功打开和关闭文件的方法。通过本文的学习,读者可以了解如何在C++中进行文件读写操作。 ... [详细]
  • 解决nginx启动报错epoll_wait() reported that client prematurely closed connection的方法
    本文介绍了解决nginx启动报错epoll_wait() reported that client prematurely closed connection的方法,包括检查location配置是否正确、pass_proxy是否需要加“/”等。同时,还介绍了修改nginx的error.log日志级别为debug,以便查看详细日志信息。 ... [详细]
  • 纠正网上的错误:自定义一个类叫java.lang.System/String的方法
    本文纠正了网上关于自定义一个类叫java.lang.System/String的错误答案,并详细解释了为什么这种方法是错误的。作者指出,虽然双亲委托机制确实可以阻止自定义的System类被加载,但通过自定义一个特殊的类加载器,可以绕过双亲委托机制,达到自定义System类的目的。作者呼吁读者对网上的内容持怀疑态度,并带着问题来阅读文章。 ... [详细]
  • 基于Socket的多个客户端之间的聊天功能实现方法
    本文介绍了基于Socket的多个客户端之间实现聊天功能的方法,包括服务器端的实现和客户端的实现。服务器端通过每个用户的输出流向特定用户发送消息,而客户端通过输入流接收消息。同时,还介绍了相关的实体类和Socket的基本概念。 ... [详细]
  • MySQL语句大全:创建、授权、查询、修改等【MySQL】的使用方法详解
    本文详细介绍了MySQL语句的使用方法,包括创建用户、授权、查询、修改等操作。通过连接MySQL数据库,可以使用命令创建用户,并指定该用户在哪个主机上可以登录。同时,还可以设置用户的登录密码。通过本文,您可以全面了解MySQL语句的使用方法。 ... [详细]
  • 本文介绍了在使用Laravel和sqlsrv连接到SQL Server 2016时,如何在插入查询中使用输出子句,并返回所需的值。同时讨论了使用CreatedOn字段返回最近创建的行的解决方法以及使用Eloquent模型创建后,值正确插入数据库但没有返回uniqueidentifier字段的问题。最后给出了一个示例代码。 ... [详细]
  • 本文讨论了如何使用GStreamer来删除H264格式视频文件中的中间部分,而不需要进行重编码。作者提出了使用gst_element_seek(...)函数来实现这个目标的思路,并提到遇到了一个解决不了的BUG。文章还列举了8个解决方案,希望能够得到更好的思路。 ... [详细]
  • springboot启动不了_Spring Boot + MyBatis 多模块搭建教程
    作者:枫本非凡来源:www.cnblogs.comorzlinp9717399.html一、前言1、创建父工程最近公司项目准备开始重构,框 ... [详细]
  • 本文介绍了解决java开源项目apache commons email简单使用报错的方法,包括使用正确的JAR包和正确的代码配置,以及相关参数的设置。详细介绍了如何使用apache commons email发送邮件。 ... [详细]
  • 本文介绍了关于Java异常的八大常见问题,包括异常管理的最佳做法、在try块中定义的变量不能用于catch或finally的原因以及为什么Double.parseDouble(null)和Integer.parseInt(null)会抛出不同的异常。同时指出这些问题是由于不同的开发人员开发所导致的,不值得过多思考。 ... [详细]
  • 本文讨论了在使用Git进行版本控制时,如何提供类似CVS中自动增加版本号的功能。作者介绍了Git中的其他版本表示方式,如git describe命令,并提供了使用这些表示方式来确定文件更新情况的示例。此外,文章还介绍了启用$Id:$功能的方法,并讨论了一些开发者在使用Git时的需求和使用场景。 ... [详细]
author-avatar
renminxilu662
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有