热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Flink自定义trigger同时按照计数和时间触发窗口计算

自定义窗口实现同时按照计数和时间(processing-time)触发计算 TriggersA Trigger determineswhenawindow(asformedbyth

自定义窗口 实现同时按照计数和时间(processing-time)触发计算

 

Triggers

Trigger determines when a window (as formed by the window assigner) is ready to be processed by the window function. Each WindowAssigner comes with a default Trigger. If the default trigger does not fit your needs, you can specify a custom trigger using trigger(...).

The trigger interface has five methods that allow a Trigger to react to different events:

  • The onElement() method is called for each element that is added to a window.
  • The onEventTime() method is called when a registered event-time timer fires.
  • The onProcessingTime() method is called when a registered processing-time timer fires.
  • The onMerge() method is relevant for stateful triggers and merges the states of two triggers when their corresponding windows merge, e.g. when using session windows.
  • Finally the clear() method performs any action needed upon removal of the corresponding window.

Two things to notice about the above methods are:

1) The first three decide how to act on their invocation event by returning a TriggerResult. The action can be one of the following:

  • CONTINUE: do nothing,
  • FIRE: trigger the computation,
  • PURGE: clear the elements in the window, and
  • FIRE_AND_PURGE: trigger the computation and clear the elements in the window afterwards.

2) Any of these methods can be used to register processing- or event-time timers for future actions.

 

trigger 接口有5个方法如下:

       onElement()方法,每个元素被添加到窗口时调用
  
onEventTime()方法,当一个已注册的事件时间计时器启动时调用
  onProcessingTime()方法,当一个已注册的处理时间计时器启动时调用
  
onMerge()方法,与状态性触发器相关,当使用会话窗口时,两个触发器对应的窗口合并时,合并两个触发器的状态。
  *最后一个clear()方法执行任何需要清除的相应窗口

TriggerResult返回的操作:

CONTINUE:什么也不做
FIRE:触发计算
PURGE:清除窗口中的数据
FIRE_AND_PURGE:触发计算并清除窗口中的数据

 

按照计数触发

@Override
public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx){
ctx.registerEventTimeTimer(window.maxTimestamp());
if (flag > 4) {
flag = 0;
return TriggerResult.FIRE;
}else{
flag ++;
}
System.out.println("onElement: " + element);
return TriggerResult.CONTINUE;
}

按照时间(processing-time)触发:

@Override
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.FIRE;
}

 

SocketWindowWordCount.java

package com.flink.test;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.awt.image.ImagingOpException;
public class SocketWindowWordCount {
public static void main(String[] args) throws Exception {
/***
* 1:在192.168.3.101 服务器执行命令: nc -l 9000
* 2: 启动该服务
* 3: 在命令行发数据
*/
// 创建 execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 通过连接 socket 获取输入数据,这里连接到本地9000端口,如果9000端口已被占用,请换一个端口
DataStream text = env.socketTextStream("101.132.79.68", 9000, "\n");
// 解析数据,按 word 分组,开窗,聚合
DataStream> windowCounts = text
.flatMap(new FlatMapFunction>() {
@Override
public void flatMap(String value, Collector> out) {
for (String word : value.split("\\s")) {
out.collect(Tuple2.of(word, 1));
}
}
})
.keyBy(0)
.timeWindow(Time.seconds(20))
.trigger(CustomTrigger.create())
.sum(1);
windowCounts.print().setParallelism(1);
env.execute("Socket Window WordCount");
}
}
class CustomTrigger extends Trigger {
private static final long serialVersiOnUID= 1L;
public CustomTrigger() {}
private static int flag = 0;
@Override
public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx){
ctx.registerEventTimeTimer(window.maxTimestamp());
if (flag > 4) {
flag = 0;
return TriggerResult.FIRE;
}else{
flag ++;
}
System.out.println("onElement: " + element);
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception{
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.FIRE;
}
@Override
public void clear(TimeWindow window, TriggerContext ctx) throws Exception{
ctx.deleteProcessingTimeTimer(window.maxTimestamp());
}
@Override
public String toString(){
return "CustomTrigger";
}
// @Override
// public void onMerge(TimeWindow window, OnMergeContext ctx) throws Exception {
// long windowMaxTimestamp = window.maxTimestamp();
// if (windowMaxTimestamp > ctx.getCurrentProcessingTime()) {
// ctx.registerProcessingTimeTimer(windowMaxTimestamp);
// }
// }
public static CustomTrigger create(){
return new CustomTrigger();
}
}

 

结果显示:

服务器发送数据

《Flink 自定义trigger 同时按照计数和时间触发窗口计算》

客户端接收:

《Flink 自定义trigger 同时按照计数和时间触发窗口计算》

其中计数(onElement)触发而打印

===========onElement: (hello,1)
===========onElement: (hello,1)
===========onElement: (hello,1)
===========onElement: (world,1)
===========onElement: (hadoop,1)
(java,1)

其中时间(onProcessingTime)触发而打印

(world,1)
(hadoop,1)
(hello,3)
(java,1)

 

参考:

flink官网:https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/operators/windows.html

https://www.jianshu.com/p/1dacf2f84325

https://blog.csdn.net/rlnLo2pNEfx9c/article/details/88014778

 

 


推荐阅读
  • 全面解读Apache Flink的核心架构与优势
    Apache Flink作为大数据处理领域的新兴力量,凭借其独特的流处理能力和高效的批处理性能,迅速获得了广泛的关注。本文旨在深入探讨Flink的关键技术特点及其应用场景,为大数据处理提供新的视角。 ... [详细]
  • 本文详细介绍了 Flink 和 YARN 的交互机制。YARN 是 Hadoop 生态系统中的资源管理组件,类似于 Spark on YARN 的配置方式。我们将基于官方文档,深入探讨如何在 YARN 上部署和运行 Flink 任务。 ... [详细]
  • 时序数据是指按时间顺序排列的数据集。通过时间轴上的数据点连接,可以构建多维度报表,揭示数据的趋势、规律及异常情况。 ... [详细]
  • 本文详细探讨了Java中的24种设计模式及其应用,并介绍了七大面向对象设计原则。通过创建型、结构型和行为型模式的分类,帮助开发者更好地理解和应用这些模式,提升代码质量和可维护性。 ... [详细]
  • 本文介绍了Java并发库中的阻塞队列(BlockingQueue)及其典型应用场景。通过具体实例,展示了如何利用LinkedBlockingQueue实现线程间高效、安全的数据传递,并结合线程池和原子类优化性能。 ... [详细]
  • 本文详细介绍了Java编程语言中的核心概念和常见面试问题,包括集合类、数据结构、线程处理、Java虚拟机(JVM)、HTTP协议以及Git操作等方面的内容。通过深入分析每个主题,帮助读者更好地理解Java的关键特性和最佳实践。 ... [详细]
  • Android 渐变圆环加载控件实现
    本文介绍了如何在 Android 中创建一个自定义的渐变圆环加载控件,该控件已在多个知名应用中使用。我们将详细探讨其工作原理和实现方法。 ... [详细]
  • 在金融和会计领域,准确无误地填写票据和结算凭证至关重要。这些文件不仅是支付结算和现金收付的重要依据,还直接关系到交易的安全性和准确性。本文介绍了一种使用C语言实现小写金额转换为大写金额的方法,确保数据的标准化和规范化。 ... [详细]
  • 本文探讨了如何优化和正确配置Kafka Streams应用程序以确保准确的状态存储查询。通过调整配置参数和代码逻辑,可以有效解决数据不一致的问题。 ... [详细]
  • 机器学习中的相似度度量与模型优化
    本文探讨了机器学习中常见的相似度度量方法,包括余弦相似度、欧氏距离和马氏距离,并详细介绍了如何通过选择合适的模型复杂度和正则化来提高模型的泛化能力。此外,文章还涵盖了模型评估的各种方法和指标,以及不同分类器的工作原理和应用场景。 ... [详细]
  • 本教程涵盖OpenGL基础操作及直线光栅化技术,包括点的绘制、简单图形绘制、直线绘制以及DDA和中点画线算法。通过逐步实践,帮助读者掌握OpenGL的基本使用方法。 ... [详细]
  • 在现代Web应用中,当用户滚动到页面底部时,自动加载更多内容的功能变得越来越普遍。这种无刷新加载技术不仅提升了用户体验,还优化了页面性能。本文将探讨如何实现这一功能,并介绍一些实际应用案例。 ... [详细]
  • 本文深入探讨了WPF框架下的数据验证机制,包括内置验证规则的使用、自定义验证规则的实现方法、错误信息的有效展示策略以及验证时机的选择,旨在帮助开发者构建更加健壮和用户友好的应用程序。 ... [详细]
  • 流处理中的计数挑战与解决方案
    本文探讨了在流处理中进行计数的各种技术和挑战,并基于作者在2016年圣何塞举行的Hadoop World大会上的演讲进行了深入分析。文章不仅介绍了传统批处理和Lambda架构的局限性,还详细探讨了流处理架构的优势及其在现代大数据应用中的重要作用。 ... [详细]
  • Storm集成Kakfa
    一、整合说明Storm官方对Kafka的整合分为两个版本,官方说明文档分别如下:StormKafkaIntegratio ... [详细]
author-avatar
LDP-liu
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有