热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Flink自定义trigger同时按照计数和时间触发窗口计算

自定义窗口实现同时按照计数和时间(processing-time)触发计算 TriggersA Trigger determineswhenawindow(asformedbyth

自定义窗口 实现同时按照计数和时间(processing-time)触发计算

 

Triggers

Trigger determines when a window (as formed by the window assigner) is ready to be processed by the window function. Each WindowAssigner comes with a default Trigger. If the default trigger does not fit your needs, you can specify a custom trigger using trigger(...).

The trigger interface has five methods that allow a Trigger to react to different events:

  • The onElement() method is called for each element that is added to a window.
  • The onEventTime() method is called when a registered event-time timer fires.
  • The onProcessingTime() method is called when a registered processing-time timer fires.
  • The onMerge() method is relevant for stateful triggers and merges the states of two triggers when their corresponding windows merge, e.g. when using session windows.
  • Finally the clear() method performs any action needed upon removal of the corresponding window.

Two things to notice about the above methods are:

1) The first three decide how to act on their invocation event by returning a TriggerResult. The action can be one of the following:

  • CONTINUE: do nothing,
  • FIRE: trigger the computation,
  • PURGE: clear the elements in the window, and
  • FIRE_AND_PURGE: trigger the computation and clear the elements in the window afterwards.

2) Any of these methods can be used to register processing- or event-time timers for future actions.

 

trigger 接口有5个方法如下:

       onElement()方法,每个元素被添加到窗口时调用
  
onEventTime()方法,当一个已注册的事件时间计时器启动时调用
  onProcessingTime()方法,当一个已注册的处理时间计时器启动时调用
  
onMerge()方法,与状态性触发器相关,当使用会话窗口时,两个触发器对应的窗口合并时,合并两个触发器的状态。
  *最后一个clear()方法执行任何需要清除的相应窗口

TriggerResult返回的操作:

CONTINUE:什么也不做
FIRE:触发计算
PURGE:清除窗口中的数据
FIRE_AND_PURGE:触发计算并清除窗口中的数据

 

按照计数触发

@Override
public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx){
ctx.registerEventTimeTimer(window.maxTimestamp());
if (flag > 4) {
flag = 0;
return TriggerResult.FIRE;
}else{
flag ++;
}
System.out.println("onElement: " + element);
return TriggerResult.CONTINUE;
}

按照时间(processing-time)触发:

@Override
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.FIRE;
}

 

SocketWindowWordCount.java

package com.flink.test;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.awt.image.ImagingOpException;
public class SocketWindowWordCount {
public static void main(String[] args) throws Exception {
/***
* 1:在192.168.3.101 服务器执行命令: nc -l 9000
* 2: 启动该服务
* 3: 在命令行发数据
*/
// 创建 execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 通过连接 socket 获取输入数据,这里连接到本地9000端口,如果9000端口已被占用,请换一个端口
DataStream text = env.socketTextStream("101.132.79.68", 9000, "\n");
// 解析数据,按 word 分组,开窗,聚合
DataStream> windowCounts = text
.flatMap(new FlatMapFunction>() {
@Override
public void flatMap(String value, Collector> out) {
for (String word : value.split("\\s")) {
out.collect(Tuple2.of(word, 1));
}
}
})
.keyBy(0)
.timeWindow(Time.seconds(20))
.trigger(CustomTrigger.create())
.sum(1);
windowCounts.print().setParallelism(1);
env.execute("Socket Window WordCount");
}
}
class CustomTrigger extends Trigger {
private static final long serialVersiOnUID= 1L;
public CustomTrigger() {}
private static int flag = 0;
@Override
public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx){
ctx.registerEventTimeTimer(window.maxTimestamp());
if (flag > 4) {
flag = 0;
return TriggerResult.FIRE;
}else{
flag ++;
}
System.out.println("onElement: " + element);
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception{
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.FIRE;
}
@Override
public void clear(TimeWindow window, TriggerContext ctx) throws Exception{
ctx.deleteProcessingTimeTimer(window.maxTimestamp());
}
@Override
public String toString(){
return "CustomTrigger";
}
// @Override
// public void onMerge(TimeWindow window, OnMergeContext ctx) throws Exception {
// long windowMaxTimestamp = window.maxTimestamp();
// if (windowMaxTimestamp > ctx.getCurrentProcessingTime()) {
// ctx.registerProcessingTimeTimer(windowMaxTimestamp);
// }
// }
public static CustomTrigger create(){
return new CustomTrigger();
}
}

 

结果显示:

服务器发送数据

《Flink 自定义trigger 同时按照计数和时间触发窗口计算》

客户端接收:

《Flink 自定义trigger 同时按照计数和时间触发窗口计算》

其中计数(onElement)触发而打印

===========onElement: (hello,1)
===========onElement: (hello,1)
===========onElement: (hello,1)
===========onElement: (world,1)
===========onElement: (hadoop,1)
(java,1)

其中时间(onProcessingTime)触发而打印

(world,1)
(hadoop,1)
(hello,3)
(java,1)

 

参考:

flink官网:https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/operators/windows.html

https://www.jianshu.com/p/1dacf2f84325

https://blog.csdn.net/rlnLo2pNEfx9c/article/details/88014778

 

 


推荐阅读
  • 本文将指导如何向ReactJS计算器应用添加必要的功能,使其能够响应用户操作并正确计算数学表达式。 ... [详细]
  • 利用YAML配置Resilience4J的Circuit Breaker
    本文探讨了Resilience4j作为现代Java应用程序中不可或缺的容错工具,特别介绍了如何通过YAML文件配置Circuit Breaker以提高服务的弹性和稳定性。 ... [详细]
  • Symfony是一个功能强大的PHP框架,以其依赖注入(DI)特性著称。许多流行的PHP框架如Drupal和Laravel的核心组件都基于Symfony构建。本文将详细介绍Symfony的安装方法及其基本使用。 ... [详细]
  • 本文探讨了如何在Classic ASP中实现与PHP的hash_hmac('SHA256', $message, pack('H*', $secret))函数等效的哈希生成方法。通过分析不同实现方式及其产生的差异,提供了一种使用Microsoft .NET Framework的解决方案。 ... [详细]
  • 使用JS、HTML5和C3创建自定义弹出窗口
    本文介绍如何结合JavaScript、HTML5和C3.js来实现一个功能丰富的自定义弹出窗口。通过具体的代码示例,详细讲解了实现过程中的关键步骤和技术要点。 ... [详细]
  • 本文探讨了如何利用HTML5和JavaScript在浏览器中进行本地文件的读取和写入操作,并介绍了获取本地文件路径的方法。HTML5提供了一系列API,使得这些操作变得更加简便和安全。 ... [详细]
  • 一个登陆界面
    预览截图html部分123456789101112用户登入1314邮箱名称邮箱为空15密码密码为空16登 ... [详细]
  • Logback使用小结
    1一定要使用slf4j的jar包,不要使用apachecommons的jar。否则滚动生成文件不生效,不滚动的时候却生效~~importorg.slf ... [详细]
  • 本文深入探讨了 PHP 实现计划任务的方法,包括其原理、具体实现方式以及在不同操作系统中的应用。通过详细示例和代码片段,帮助开发者理解和掌握如何高效地设置和管理定时任务。 ... [详细]
  • 本文深入探讨了 Delphi 中类对象成员的核心概念,包括 System 单元的基础知识、TObject 类的定义及其方法、TClass 的作用以及对象的消息处理机制。文章不仅解释了这些概念的基本原理,还提供了丰富的补充和专业解答,帮助读者全面理解 Delphi 的面向对象编程。 ... [详细]
  • [Vue.js 3.0] Guide – Scaling Up – State Management
    [Vue.js 3.0] Guide – Scaling Up – State Management ... [详细]
  • 本文介绍了如何通过ARM编译器组件重定向标准C运行时库的I/O函数,以适应不同的硬件平台。原文链接:https://www.keil.com/pack/doc/compiler/RetargetIO/html/retarget_overview.html ... [详细]
  • 交互式左右滑动导航菜单设计
    本文介绍了一种使用HTML和JavaScript实现的左右可点击滑动导航菜单的方法,适用于需要展示多个链接或项目的网页布局。 ... [详细]
  • JavaScript:简洁与复杂之间的平衡
    本文探讨了在编写JavaScript教程时,如何在保持内容简洁的同时,确保初学者能够理解并应用实际开发中的复杂问题。文章通过具体示例分析了不同层次的JavaScript代码实现。 ... [详细]
  • 本文介绍了如何使用JavaScript和jQuery实现页面元素随着滚动条的移动而相应变化位置的功能,提供了一段简洁的代码示例。 ... [详细]
author-avatar
LDP-liu
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有