watermark(水位线)是Flink里边的相当重要的存在,是Flink处理乱序数据的重要组成。
Flink理论上定义了三种watermark广播机制(【2种生成水印的策略】【如果生成的watermark是null,或者小于之前的watermark,则该watermark不会发往下游】)。
首先看看它的组成:
AssignerWithPunctuatedWatermarks(为每条消息都会尝试生成水印)
AssignerWithPeriodicWatermarks(周期性的生成水印,不会针对每条消息都生成)(常用)
还有第三种策略是 无为策略:不设定watermark策略。
这两种策略的源码分析参考https://www.cnblogs.com/ljygz/p/11435243.html
AssignerWithPeriodicWatermarks 实例:
.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks>() {//定义watermark乱序等待时间2s,即允许数据的最大乱序时间private long maxOutofOrderness = 2 * 1000;// 观察到的最大时间戳private long currentMaxTs = Long.MIN_VALUE;@Nullablepublic Watermark getCurrentWatermark() {//生成具有2s容忍度的水位线return new Watermark(currentMaxTs-maxOutofOrderness );}// 先调用该函数 previousElementTimestamp代表public long extractTimestamp(Tuple2 element, long previousElementTimestamp) {long currentTime = previousElementTimestamp;// 更新最大的时间戳currentTime = Math.max(currentMaxTs, currentTime);// 返回记录的时间戳return currentTime;}})
再看看实现接口的几个抽象类:
关于AscendingTimestampExtractor,一般是在数据集的时间戳是单调递增的且没有乱序时使用,该方法使用当前的时间戳生成水位线
.assignTimestampsAndWatermarks(new AscendingTimestampExtractor() {@Overridepublic long extractAscendingTimestamp(UserBehavior element) {return element.timestamp*1000;}});
关于BoundedOutOfOrdernessTimestampExtractor,是在数据集中存在乱序数据的情况下使用,即数据有延迟(任意新到来的元素与已经到来的时间戳最大的元素之间的时间差),这种方式可以接收一个表示最大预期延迟参数,具体如下:
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor(Time.seconds(10)) {@Overridepublic long extractTimestamp(UserBehavior element) {return element.timestamp*1000;}} );