热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

FlinkWatermark

Flin

Event Time & Processing Time

  • Event Time:事件创建的时间

  • Processing Time:执行操作算子的当前机器的本地时间


官网权威解释可以参考

https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/concepts/time/#notions-of-time-event-time-and-processing-time

真实业务场景中,我们往往更关心事件时间(Event Time),Flink 从 1.12 起流的时间特性默认设置为 TimeCharacteristic.EventTime



Watermark

当 Flink 以 Event Time 模式处理数据流时,会根据数据里的时间戳来处理基于时间的算子,通常系统由于网络抖动、分布式架构等原因,会导致乱序数据的产生,从而导致窗口计算不精确。

Fink 为了避免乱序数据带来的窗口计算不精确的问题,引入了 Watermark 机制。

  • Watermark 用于标记 Event Time 的前进过程

  • Watermark 跟随 DataStream Event Time 变动,并自身携带 TimeStamp

  • Watermark 用于表明所有较早的事件已经(可能)到达

  • Watermark 本身也属于特殊的事件


官网权威解释可以参考

https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/concepts/time/#event-time-and-watermarks

在 Flink 中,Watermark 由应用程序开发人员生成,这通常需要开发人员对业务的上下游数据乱序的程度有一定的了解;如果 Watermark 设置的延迟太久,收到结果的速度可能就会很慢,解决办法是在水位线到达之前输出一个近似结果;而如果 Watermark 到达的太早,则可能收到错误结果,不过可以通过 Flink 处理迟到数据的机制来解决这个问题。


Demo

Maven Dependency


<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0modelVersion>


<groupId>org.foolgroupId>
<artifactId>flinkartifactId>
<version>1.0-SNAPSHOTversion>


<properties>
<maven.compiler.source>8maven.compiler.source>
<maven.compiler.target>8maven.compiler.target>
properties>


<dependencies>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-javaartifactId>
<version>1.12.5version>
dependency>


<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-streaming-java_2.12artifactId>
<version>1.12.5version>
dependency>


<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-clients_2.12artifactId>
<version>1.12.5version>
dependency>


<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-connector-kafka_2.12artifactId>
<version>1.12.5version>
dependency>


<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-connector-elasticsearch7_2.12artifactId>
<version>1.12.5version>
dependency>


<dependency>
<groupId>org.apache.bahirgroupId>
<artifactId>flink-connector-redis_2.11artifactId>
<version>1.0version>
dependency>


<dependency>
<groupId>org.projectlombokgroupId>
<artifactId>lombokartifactId>
<version>1.18.20version>
dependency>


<dependency>
<groupId>mysqlgroupId>
<artifactId>mysql-connector-javaartifactId>
<version>8.0.26version>
dependency>
dependencies>


project>


SRC

src/main/java/org/fool/flink/contract/Sensor.java

package org.fool.flink.contract;


import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;


@Data
@NoArgsConstructor
@AllArgsConstructor
public class Sensor {
private String id;
private Long timestamp;
private Double temperature;
}


src/main/java/org/fool/flink/window/WindowWatermarkTest.java

package org.fool.flink.window;


import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.common.eventtime.WatermarkGenerator;
import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
import org.apache.flink.api.common.eventtime.WatermarkOutput;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.OutputTag;
import org.fool.flink.contract.Sensor;


public class WindowWatermarkTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment envirOnment= StreamExecutionEnvironment.getExecutionEnvironment();
environment.setParallelism(1);
// environment.setParallelism(4);


DataStream inputStream = environment.socketTextStream("localhost", 7878);


DataStream dataStream = inputStream.map(new MapFunction() {
@Override
public Sensor map(String value) throws Exception {
String[] fields = value.split(",");
return new Sensor(fields[0], new Long(fields[1]), new Double(fields[2]));
}
}).assignTimestampsAndWatermarks(new WatermarkStrategy() {
@Override
public WatermarkGenerator createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
return new WatermarkGenerator() {
private final long maxOutOfOrderness = 2000; // 2 seconds


private long currentMaxTimestamp;


@Override
public void onEvent(Sensor sensor, long eventTimestamp, WatermarkOutput output) {
// System.out.println("sensor.getTimestamp(): " + sensor.getTimestamp() * 1000L);
// System.out.println("eventTimestamp: " + eventTimestamp);
currentMaxTimestamp = Math.max(sensor.getTimestamp() * 1000L, eventTimestamp);
// System.out.println("currentMaxTimestamp1: " + currentMaxTimestamp);
}


@Override
public void onPeriodicEmit(WatermarkOutput output) {
// System.out.println("currentMaxTimestamp2: " + currentMaxTimestamp);
output.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness - 1));
}
};
}
}.withTimestampAssigner(new SerializableTimestampAssigner() {
@Override
public long extractTimestamp(Sensor sensor, long recordTimestamp) {
return sensor.getTimestamp() * 1000L;
}
}));


OutputTag lateTag = new OutputTag<>("late", TypeInformation.of(Sensor.class));


SingleOutputStreamOperator minStream = dataStream.keyBy(new KeySelector() {
@Override
public String getKey(Sensor sensor) throws Exception {
return sensor.getId();
}
}).window(TumblingEventTimeWindows.of(Time.seconds(15)))
.allowedLateness(Time.minutes(1))
.sideOutputLateData(lateTag)
.minBy("temperature");


minStream.print("min temp");


minStream.getSideOutput(lateTag).print("late");
environment.execute();
}


}

Note: 当前并行度是 1,Watermark 设置为 2 秒

environment.setParallelism(1);


Run

Socket Input

1,1628754405,35.8
1,1628754420,34.8
1,1628754422,33.8

Note:1628754422 这个时间点会触发窗口 [05, 20) 这个窗口计算


Console Output

min temp> Sensor(id=1, timestamp=1628754405, temperature=35.8)


Socket Input

1,1628754406,30.8
1,1628754407,31.8

Note:在 1628754422 这个时间点后继续输入, 1628754406、1628754407 后仍旧会触发窗口计算


Console Output

min temp> Sensor(id=1, timestamp=1628754406, temperature=30.8)
min temp> Sensor(id=1, timestamp=1628754406, temperature=30.8)

Note:因为设置了 1 分钟的 allowedLateness,1628754406、1628754407 这两个迟到的事件在 [05, 20) 这个窗口已经触发过计算后仍旧会触发窗口计算

allowedLateness(Time.minutes(1))

 

Socket Input

1,1628754482,28.8

Note:在 1628754407 这个时间点后继续输入


Console Output

min temp> Sensor(id=1, timestamp=1628754422, temperature=33.8)

Note:1628754482 这个时间点,1 分钟的 allowedLateness 的窗口会关闭,触发窗口计算


Socket Input

1,1628754411,30.3
1,1628754412,31.3

Note:在 1628754482 这个时间点后继续输入,即 1 分钟的 allowedLateness 的窗口已经关闭


Console Output

late> Sensor(id=1, timestamp=1628754411, temperature=30.3)
late> Sensor(id=1, timestamp=1628754412, temperature=31.3)

Note:1 分钟的 allowedLateness 的窗口关闭后,1628754411、1628754412 这两个迟到的事件会进入 side output


完整的 Socket Input


完整的 Console Output


Key Point

以上操作都是基于并行度为 1 的情况下进行的,当设置的并行度不为 1 时,比如设置并行度为 4,结果会不一样。

environment.setParallelism(4);

并行度不为 1 的时候,测试输出的时候,Watermark 在上下游任务之间传递的规则:必须是每一个分区的 Watermark 都要上升,取所有分区中最小的值才是当前的 Watermark,才会触发窗口聚合计算。


Socket Input

Note:4 个分区的 Watermark 都到了 1628754422,才会触发窗口聚合计算


Console Output


Reference

https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/event-time/generating_watermarks/




泰克风格 只讲干货 不弄玄虚



推荐阅读
  • 在处理遗留数据库的映射时,反向工程是一个重要的初始步骤。由于实体模式已经在数据库系统中存在,Hibernate 提供了自动化工具来简化这一过程,帮助开发人员快速生成持久化类和映射文件。通过反向工程,可以显著提高开发效率并减少手动配置的错误。此外,该工具还支持对现有数据库结构进行分析,自动生成符合 Hibernate 规范的配置文件,从而加速项目的启动和开发周期。 ... [详细]
  • php更新数据库字段的函数是,php更新数据库字段的函数是 ... [详细]
  • 本文深入解析了通过JDBC实现ActiveMQ消息持久化的机制。JDBC能够将消息可靠地存储在多种关系型数据库中,如MySQL、SQL Server、Oracle和DB2等。采用JDBC持久化方式时,数据库会自动生成三个关键表:`activemq_msgs`、`activemq_lock`和`activemq_ACKS`,分别用于存储消息数据、锁定信息和确认状态。这种机制不仅提高了消息的可靠性,还增强了系统的可扩展性和容错能力。 ... [详细]
  • 本文探讨了资源访问的学习路径与方法,旨在帮助学习者更高效地获取和利用各类资源。通过分析不同资源的特点和应用场景,提出了多种实用的学习策略和技术手段,为学习者提供了系统的指导和建议。 ... [详细]
  • Spring框架中的面向切面编程(AOP)技术详解
    面向切面编程(AOP)是Spring框架中的关键技术之一,它通过将横切关注点从业务逻辑中分离出来,实现了代码的模块化和重用。AOP的核心思想是将程序运行过程中需要多次处理的功能(如日志记录、事务管理等)封装成独立的模块,即切面,并在特定的连接点(如方法调用)动态地应用这些切面。这种方式不仅提高了代码的可维护性和可读性,还简化了业务逻辑的实现。Spring AOP利用代理机制,在不修改原有代码的基础上,实现了对目标对象的增强。 ... [详细]
  • 本文探讨了利用Java实现WebSocket实时消息推送技术的方法。与传统的轮询、长连接或短连接等方案相比,WebSocket提供了一种更为高效和低延迟的双向通信机制。通过建立持久连接,服务器能够主动向客户端推送数据,从而实现真正的实时消息传递。此外,本文还介绍了WebSocket在实际应用中的优势和应用场景,并提供了详细的实现步骤和技术细节。 ... [详细]
  • 掌握Android UI设计:利用ZoomControls实现图片缩放功能
    本文介绍了如何在Android应用中通过使用ZoomControls组件来实现图片的缩放功能。ZoomControls提供了一种简单且直观的方式,让用户可以通过点击放大和缩小按钮来调整图片的显示大小。文章详细讲解了ZoomControls的基本用法、布局设置以及与ImageView的结合使用方法,适合初学者快速掌握Android UI设计中的这一重要功能。 ... [详细]
  • 技术分享:深入解析GestureDetector手势识别机制
    技术分享:深入解析GestureDetector手势识别机制 ... [详细]
  • 深入解析 Android 选择器与形状绘制技术
    本文深入探讨了 Android 中选择器(Selector)与形状绘制(Shape Drawing)技术的应用与实现。重点分析了 `Selector` 的 `item` 元素,其中包括 `android:drawable` 属性的使用方法及其在不同状态下的表现。此外,还详细介绍了如何通过 XML 定义复杂的形状和渐变效果,以提升 UI 设计的灵活性和美观性。 ... [详细]
  • 网站访问全流程解析
    本文详细介绍了从用户在浏览器中输入一个域名(如www.yy.com)到页面完全展示的整个过程,包括DNS解析、TCP连接、请求响应等多个步骤。 ... [详细]
  • 本文介绍如何使用 Python 的 DOM 和 SAX 方法解析 XML 文件,并通过示例展示了如何动态创建数据库表和处理大量数据的实时插入。 ... [详细]
  • 本文介绍了如何利用Struts1框架构建一个简易的四则运算计算器。通过采用DispatchAction来处理不同类型的计算请求,并使用动态Form来优化开发流程,确保代码的简洁性和可维护性。同时,系统提供了用户友好的错误提示,以增强用户体验。 ... [详细]
  • 本文详细介绍了在CentOS 6.5 64位系统上使用阿里云ECS服务器搭建LAMP环境的具体步骤。首先,通过PuTTY工具实现远程连接至服务器。接着,检查当前系统的磁盘空间使用情况,确保有足够的空间进行后续操作,可使用 `df` 命令进行查看。此外,文章还涵盖了安装和配置Apache、MySQL和PHP的相关步骤,以及常见问题的解决方法,帮助用户顺利完成LAMP环境的搭建。 ... [详细]
  • 本文深入探讨了如何利用Maven高效管理项目中的外部依赖库。通过介绍Maven的官方依赖搜索地址(),详细讲解了依赖库的添加、版本管理和冲突解决等关键操作。此外,还提供了实用的配置示例和最佳实践,帮助开发者优化项目构建流程,提高开发效率。 ... [详细]
  • 本文深入探讨了CGLIB BeanCopier在Bean对象复制中的应用及其优化技巧。相较于Spring的BeanUtils和Apache的BeanUtils,CGLIB BeanCopier在性能上具有显著优势。通过详细分析其内部机制和使用场景,本文提供了多种优化方法,帮助开发者在实际项目中更高效地利用这一工具。此外,文章还讨论了CGLIB BeanCopier在复杂对象结构和大规模数据处理中的表现,为读者提供了实用的参考和建议。 ... [详细]
author-avatar
常山他爹没有JJ2000_836
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有