序
本文主要研究一下flink的Sliding Window
SlidingEventTimeWindows
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/assigners/SlidingEventTimeWindows.java
@PublicEvolving
public class SlidingEventTimeWindows extends WindowAssigner {private static final long serialVersionUID &#61; 1L;private final long size;private final long slide;private final long offset;protected SlidingEventTimeWindows(long size, long slide, long offset) {if (offset <0 || offset >&#61; slide || size <&#61; 0) {throw new IllegalArgumentException("SlidingEventTimeWindows parameters must satisfy 0 <&#61; offset
}
- SlidingEventTimeWindows继承了Window&#xff0c;其中元素类型为Object&#xff0c;而窗口类型为TimeWindow&#xff1b;它有三个参数&#xff0c;一个是size&#xff0c;一个是slide&#xff0c;一个是offset&#xff0c;其中offset必须大于等于0&#xff0c;offset必须大于slide&#xff0c;size必须大于0
- assignWindows方法以slide作为size通过TimeWindow.getWindowStartWithOffset(timestamp, offset, slide)计算lastStart&#xff0c;然后以为start &#43; size > timestamp为循环条件&#xff0c;每次对start减去slide&#xff0c;挨个计算TimeWindow(start, start &#43; size)&#xff1b;getDefaultTrigger方法返回的是EventTimeTrigger&#xff1b;getWindowSerializer方法返回的是TimeWindow.Serializer()&#xff1b;isEventTime返回的为true
- SlidingEventTimeWindows提供了of静态工厂方法&#xff0c;可以指定size、slide及offset参数&#xff0c;它对于传入的offset参数转为毫秒然后与slide.toMilliseconds()取余作为最后的offset值
SlidingProcessingTimeWindows
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java
public class SlidingProcessingTimeWindows extends WindowAssigner {private static final long serialVersionUID &#61; 1L;private final long size;private final long offset;private final long slide;private SlidingProcessingTimeWindows(long size, long slide, long offset) {if (offset <0 || offset >&#61; slide || size <&#61; 0) {throw new IllegalArgumentException("SlidingProcessingTimeWindows parameters must satisfy 0 <&#61; offset
}
- SlidingProcessingTimeWindows继承了Window&#xff0c;其中元素类型为Object&#xff0c;而窗口类型为TimeWindow&#xff1b;它有三个参数&#xff0c;一个是size&#xff0c;一个是slide&#xff0c;一个是offset&#xff0c;其中offset必须大于等于0&#xff0c;offset必须大于slide&#xff0c;size必须大于0
- assignWindows方法以slide作为size通过TimeWindow.getWindowStartWithOffset(timestamp, offset, slide)计算lastStart(
与SlidingEventTimeWindows不同的是SlidingProcessingTimeWindows的这个方法里头使用context.getCurrentProcessingTime()值重置了timestamp
)&#xff0c;然后以为start &#43; size > timestamp为循环条件&#xff0c;每次对start减去slide&#xff0c;挨个计算TimeWindow(start, start &#43; size)&#xff1b;getDefaultTrigger方法返回的是ProcessingTimeTrigger&#xff1b;getWindowSerializer方法返回的是TimeWindow.Serializer()&#xff1b;isEventTime返回的为false - SlidingEventTimeWindows提供了of静态工厂方法&#xff0c;可以指定size、slide及offset参数&#xff0c;它对于传入的offset参数转为毫秒然后与slide.toMilliseconds()取余作为最后的offset值
小结
- flink的Sliding Window分为SlidingEventTimeWindows及SlidingProcessingTimeWindows&#xff0c;它们都继承了WindowAssigner&#xff0c;其中元素类型为Object&#xff0c;而窗口类型为TimeWindow&#xff1b;它有三个参数&#xff0c;一个是size&#xff0c;一个是slide&#xff0c;一个是offset&#xff0c;其中offset必须大于等于0&#xff0c;offset必须大于slide&#xff0c;size必须大于0
- WindowAssigner定义了assignWindows、getDefaultTrigger、getWindowSerializer、isEventTime这几个抽象方法&#xff0c;同时定义了抽象静态类WindowAssignerContext&#xff1b;它有两个泛型&#xff0c;其中T为元素类型&#xff0c;而W为窗口类型&#xff1b;SlidingEventTimeWindows及SlidingProcessingTimeWindows的窗口类型为TimeWindow&#xff0c;它有start及end属性&#xff0c;其中start为inclusive&#xff0c;而end为exclusive&#xff0c;maxTimestamp返回的是end-1&#xff0c;它还提供了mergeWindows及getWindowStartWithOffset静态方法&#xff1b;前者用于合并重叠的时间窗口&#xff0c;后者用于获取指定timestamp、offset、windowSize的window start
- SlidingEventTimeWindows及SlidingProcessingTimeWindows的不同在于assignWindows、getDefaultTrigger、isEventTime方法&#xff1b;前者assignWindows使用的是参数中的timestamp&#xff0c;而后者使用的是context.getCurrentProcessingTime()&#xff1b;前者的getDefaultTrigger返回的是EventTimeTrigger&#xff0c;而后者返回的是ProcessingTimeTrigger&#xff1b;前者isEventTime方法返回的为true&#xff0c;而后者返回的为false
doc
- Sliding Windows