热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Flink自定义trigger同时按照计数和时间触发窗口计算

自定义窗口实现同时按照计数和时间(processing-time)触发计算 TriggersA Trigger determineswhenawindow(asformedbyth

自定义窗口 实现同时按照计数和时间(processing-time)触发计算

 

Triggers

Trigger determines when a window (as formed by the window assigner) is ready to be processed by the window function. Each WindowAssigner comes with a default Trigger. If the default trigger does not fit your needs, you can specify a custom trigger using trigger(...).

The trigger interface has five methods that allow a Trigger to react to different events:

  • The onElement() method is called for each element that is added to a window.
  • The onEventTime() method is called when a registered event-time timer fires.
  • The onProcessingTime() method is called when a registered processing-time timer fires.
  • The onMerge() method is relevant for stateful triggers and merges the states of two triggers when their corresponding windows merge, e.g. when using session windows.
  • Finally the clear() method performs any action needed upon removal of the corresponding window.

Two things to notice about the above methods are:

1) The first three decide how to act on their invocation event by returning a TriggerResult. The action can be one of the following:

  • CONTINUE: do nothing,
  • FIRE: trigger the computation,
  • PURGE: clear the elements in the window, and
  • FIRE_AND_PURGE: trigger the computation and clear the elements in the window afterwards.

2) Any of these methods can be used to register processing- or event-time timers for future actions.

 

trigger 接口有5个方法如下:

       onElement()方法,每个元素被添加到窗口时调用
  
onEventTime()方法,当一个已注册的事件时间计时器启动时调用
  onProcessingTime()方法,当一个已注册的处理时间计时器启动时调用
  
onMerge()方法,与状态性触发器相关,当使用会话窗口时,两个触发器对应的窗口合并时,合并两个触发器的状态。
  *最后一个clear()方法执行任何需要清除的相应窗口

TriggerResult返回的操作:

CONTINUE:什么也不做
FIRE:触发计算
PURGE:清除窗口中的数据
FIRE_AND_PURGE:触发计算并清除窗口中的数据

 

按照计数触发

@Override
public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx){
ctx.registerEventTimeTimer(window.maxTimestamp());
if (flag > 4) {
flag = 0;
return TriggerResult.FIRE;
}else{
flag ++;
}
System.out.println("onElement: " + element);
return TriggerResult.CONTINUE;
}

按照时间(processing-time)触发:

@Override
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.FIRE;
}

 

SocketWindowWordCount.java

package com.flink.test;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.awt.image.ImagingOpException;
public class SocketWindowWordCount {
public static void main(String[] args) throws Exception {
/***
* 1:在192.168.3.101 服务器执行命令: nc -l 9000
* 2: 启动该服务
* 3: 在命令行发数据
*/
// 创建 execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 通过连接 socket 获取输入数据,这里连接到本地9000端口,如果9000端口已被占用,请换一个端口
DataStream text = env.socketTextStream("101.132.79.68", 9000, "\n");
// 解析数据,按 word 分组,开窗,聚合
DataStream> windowCounts = text
.flatMap(new FlatMapFunction>() {
@Override
public void flatMap(String value, Collector> out) {
for (String word : value.split("\\s")) {
out.collect(Tuple2.of(word, 1));
}
}
})
.keyBy(0)
.timeWindow(Time.seconds(20))
.trigger(CustomTrigger.create())
.sum(1);
windowCounts.print().setParallelism(1);
env.execute("Socket Window WordCount");
}
}
class CustomTrigger extends Trigger {
private static final long serialVersiOnUID= 1L;
public CustomTrigger() {}
private static int flag = 0;
@Override
public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx){
ctx.registerEventTimeTimer(window.maxTimestamp());
if (flag > 4) {
flag = 0;
return TriggerResult.FIRE;
}else{
flag ++;
}
System.out.println("onElement: " + element);
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception{
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.FIRE;
}
@Override
public void clear(TimeWindow window, TriggerContext ctx) throws Exception{
ctx.deleteProcessingTimeTimer(window.maxTimestamp());
}
@Override
public String toString(){
return "CustomTrigger";
}
// @Override
// public void onMerge(TimeWindow window, OnMergeContext ctx) throws Exception {
// long windowMaxTimestamp = window.maxTimestamp();
// if (windowMaxTimestamp > ctx.getCurrentProcessingTime()) {
// ctx.registerProcessingTimeTimer(windowMaxTimestamp);
// }
// }
public static CustomTrigger create(){
return new CustomTrigger();
}
}

 

结果显示:

服务器发送数据

《Flink 自定义trigger 同时按照计数和时间触发窗口计算》

客户端接收:

《Flink 自定义trigger 同时按照计数和时间触发窗口计算》

其中计数(onElement)触发而打印

===========onElement: (hello,1)
===========onElement: (hello,1)
===========onElement: (hello,1)
===========onElement: (world,1)
===========onElement: (hadoop,1)
(java,1)

其中时间(onProcessingTime)触发而打印

(world,1)
(hadoop,1)
(hello,3)
(java,1)

 

参考:

flink官网:https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/operators/windows.html

https://www.jianshu.com/p/1dacf2f84325

https://blog.csdn.net/rlnLo2pNEfx9c/article/details/88014778

 

 


推荐阅读
author-avatar
LDP-liu
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有