作者:LDP-liu | 来源:互联网 | 2024-09-27 10:30
自定义窗口实现同时按照计数和时间(processing-time)触发计算 TriggersA Trigger determineswhenawindow(asformedbyth
自定义窗口 实现同时按照计数和时间(processing-time)触发计算
Triggers
A Trigger
determines when a window (as formed by the window assigner) is ready to be processed by the window function. Each WindowAssigner
comes with a default Trigger
. If the default trigger does not fit your needs, you can specify a custom trigger using trigger(...)
.
The trigger interface has five methods that allow a Trigger
to react to different events:
- The
onElement()
method is called for each element that is added to a window. - The
onEventTime()
method is called when a registered event-time timer fires. - The
onProcessingTime()
method is called when a registered processing-time timer fires. - The
onMerge()
method is relevant for stateful triggers and merges the states of two triggers when their corresponding windows merge, e.g. when using session windows. - Finally the
clear()
method performs any action needed upon removal of the corresponding window.
Two things to notice about the above methods are:
1) The first three decide how to act on their invocation event by returning a TriggerResult
. The action can be one of the following:
CONTINUE
: do nothing,FIRE
: trigger the computation,PURGE
: clear the elements in the window, andFIRE_AND_PURGE
: trigger the computation and clear the elements in the window afterwards.
2) Any of these methods can be used to register processing- or event-time timers for future actions.
trigger 接口有5个方法如下:
onElement()方法,每个元素被添加到窗口时调用
onEventTime()方法,当一个已注册的事件时间计时器启动时调用
onProcessingTime()方法,当一个已注册的处理时间计时器启动时调用
onMerge()方法,与状态性触发器相关,当使用会话窗口时,两个触发器对应的窗口合并时,合并两个触发器的状态。
*最后一个clear()方法执行任何需要清除的相应窗口
TriggerResult返回的操作:
CONTINUE:什么也不做
FIRE:触发计算
PURGE:清除窗口中的数据
FIRE_AND_PURGE:触发计算并清除窗口中的数据
按照计数触发
@Override
public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx){
ctx.registerEventTimeTimer(window.maxTimestamp());
if (flag > 4) {
flag = 0;
return TriggerResult.FIRE;
}else{
flag ++;
}
System.out.println("onElement: " + element);
return TriggerResult.CONTINUE;
}
按照时间(processing-time)触发:
@Override
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.FIRE;
}
SocketWindowWordCount.java
package com.flink.test;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.awt.image.ImagingOpException;
public class SocketWindowWordCount {
public static void main(String[] args) throws Exception {
/***
* 1:在192.168.3.101 服务器执行命令: nc -l 9000
* 2: 启动该服务
* 3: 在命令行发数据
*/
// 创建 execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 通过连接 socket 获取输入数据,这里连接到本地9000端口,如果9000端口已被占用,请换一个端口
DataStream text = env.socketTextStream("101.132.79.68", 9000, "\n");
// 解析数据,按 word 分组,开窗,聚合
DataStream> windowCounts = text
.flatMap(new FlatMapFunction>() {
@Override
public void flatMap(String value, Collector> out) {
for (String word : value.split("\\s")) {
out.collect(Tuple2.of(word, 1));
}
}
})
.keyBy(0)
.timeWindow(Time.seconds(20))
.trigger(CustomTrigger.create())
.sum(1);
windowCounts.print().setParallelism(1);
env.execute("Socket Window WordCount");
}
}
class CustomTrigger extends Trigger {
private static final long serialVersiOnUID= 1L;
public CustomTrigger() {}
private static int flag = 0;
@Override
public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx){
ctx.registerEventTimeTimer(window.maxTimestamp());
if (flag > 4) {
flag = 0;
return TriggerResult.FIRE;
}else{
flag ++;
}
System.out.println("onElement: " + element);
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception{
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.FIRE;
}
@Override
public void clear(TimeWindow window, TriggerContext ctx) throws Exception{
ctx.deleteProcessingTimeTimer(window.maxTimestamp());
}
@Override
public String toString(){
return "CustomTrigger";
}
// @Override
// public void onMerge(TimeWindow window, OnMergeContext ctx) throws Exception {
// long windowMaxTimestamp = window.maxTimestamp();
// if (windowMaxTimestamp > ctx.getCurrentProcessingTime()) {
// ctx.registerProcessingTimeTimer(windowMaxTimestamp);
// }
// }
public static CustomTrigger create(){
return new CustomTrigger();
}
}
结果显示:
服务器发送数据
客户端接收:
其中计数(onElement)触发而打印
===========onElement: (hello,1)
===========onElement: (hello,1)
===========onElement: (hello,1)
===========onElement: (world,1)
===========onElement: (hadoop,1)
(java,1)
其中时间(onProcessingTime)触发而打印
(world,1)
(hadoop,1)
(hello,3)
(java,1)
参考:
flink官网:https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/operators/windows.html
https://www.jianshu.com/p/1dacf2f84325
https://blog.csdn.net/rlnLo2pNEfx9c/article/details/88014778