热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

4.1.18Flink流处理框架WindowAPI之窗口函数windowfunction

目录1.windowfunction-窗口函数2.其他可选API3.代码演示3.1TimeWindow3.2CountWindow1.windowfunction-窗口函

目录

1.window function-窗口函数

2.其他可选API

3.代码演示

3.1 TimeWindow

3.2 CountWindow



1.window function-窗口函数

        window function 定义了要对窗口中收集的数据做的计算操作,主要可以分为两类:

  • 增量聚合函数(incremental aggregation functions) 每条数据到来就进行计算,保持一个简单的状态。典型的增量聚合函数有 ReduceFunction, AggregateFunction。(来一个处理一个)
  • 全窗口函数(full window functions) 先把窗口所有数据收集起来,等到计算的时候会遍历所有数据。 ProcessWindowFunction和WindowFunction就是全窗口函数。(一个窗口一起处理)

        

2.其他可选API
  •  .trigger() —— 触发器      定义 window 什么时候关闭,触发计算并输出结果
  •  .evitor() —— 移除器       定义移除某些数据的逻辑
  •  .allowedLateness() —— 允许处理迟到的数据
  •  .sideOutputLateData() —— 将迟到的数据放入侧输出流
  •  .getSideOutput() —— 获取侧输出流

3.代码演示

3.1 TimeWindow时间窗口

package com.atguigu.apitest.window;import com.atguigu.apitest.beans.SensorReading;
import org.apache.commons.collections.IteratorUtils;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;import java.util.Arrays;
import java.util.Collections;public class WindowTest1_TimeWindow {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// // 从文件读取数据
// DataStream inputStream = env.readTextFile("D:\\Projects\\BigData\\FlinkTutorial\\src\\main\\resources\\sensor.txt");// socket文本流DataStream inputStream = env.socketTextStream("localhost", 7777);// 转换成SensorReading类型DataStream dataStream = inputStream.map(line -> {String[] fields = line.split(",");return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));});// 开窗测试// 1. 增量聚合函数DataStream resultStream = dataStream.keyBy("id")
// .countWindow(10, 2);
// .window(EventTimeSessionWindows.withGap(Time.minutes(1)));
// .window(TumblingProcessingTimeWindows.of(Time.seconds(15))).timeWindow(Time.seconds(15))//聚合统计总数.aggregate(new AggregateFunction() {@Overridepublic Integer createAccumulator() {return 0;}@Overridepublic Integer add(SensorReading value, Integer accumulator) {return accumulator + 1;}@Overridepublic Integer getResult(Integer accumulator) {return accumulator;}@Overridepublic Integer merge(Integer a, Integer b) {return a + b;}});// 2. 全窗口函数SingleOutputStreamOperator> resultStream2 = dataStream.keyBy("id").timeWindow(Time.seconds(15))
// .process(new ProcessWindowFunction() {
// }).apply(new WindowFunction, Tuple, TimeWindow>() {&#64;Overridepublic void apply(Tuple tuple, TimeWindow window, Iterable input, Collector> out) throws Exception {String id &#61; tuple.getField(0);Long windowEnd &#61; window.getEnd();Integer count &#61; IteratorUtils.toList(input.iterator()).size();out.collect(new Tuple3<>(id, windowEnd, count));}});// 3. 其它可选APIOutputTag outputTag &#61; new OutputTag("late") {};SingleOutputStreamOperator sumStream &#61; dataStream.keyBy("id").timeWindow(Time.seconds(15))
// .trigger()
// .evictor().allowedLateness(Time.minutes(1)).sideOutputLateData(outputTag).sum("temperature");sumStream.getSideOutput(outputTag).print("late");resultStream2.print();env.execute();}
}

3.2 CountWindow计数窗口

package com.atguigu.apitest.window;
import com.atguigu.apitest.beans.SensorReading;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class WindowTest2_CountWindow {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env &#61; StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// // 从文件读取数据
// DataStream inputStream &#61; env.readTextFile("D:\\Projects\\BigData\\FlinkTutorial\\src\\main\\resources\\sensor.txt");// socket文本流DataStream inputStream &#61; env.socketTextStream("localhost", 7777);// 转换成SensorReading类型DataStream dataStream &#61; inputStream.map(line -> {String[] fields &#61; line.split(",");return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));});// 开计数窗口测试SingleOutputStreamOperator avgTempResultStream &#61; dataStream.keyBy("id").countWindow(10, 2).aggregate(new MyAvgTemp());avgTempResultStream.print();env.execute();}public static class MyAvgTemp implements AggregateFunction, Double>{&#64;Overridepublic Tuple2 createAccumulator() {return new Tuple2<>(0.0, 0);}&#64;Overridepublic Tuple2 add(SensorReading value, Tuple2 accumulator) {return new Tuple2<>(accumulator.f0 &#43; value.getTemperature(), accumulator.f1 &#43; 1);}&#64;Overridepublic Double getResult(Tuple2 accumulator) {return accumulator.f0 / accumulator.f1;}&#64;Overridepublic Tuple2 merge(Tuple2 a, Tuple2 b) {return new Tuple2<>(a.f0 &#43; b.f0, a.f1 &#43; b.f1);}}
}


推荐阅读
  • 开机自启动的几种方式
    0x01快速自启动目录快速启动目录自启动方式源于Windows中的一个目录,这个目录一般叫启动或者Startup。位于该目录下的PE文件会在开机后进行自启动 ... [详细]
  • WinMain 函数详解及示例
    本文详细介绍了 WinMain 函数的参数及其用途,并提供了一个具体的示例代码来解析 WinMain 函数的实现。 ... [详细]
  • DAO(Data Access Object)模式是一种用于抽象和封装所有对数据库或其他持久化机制访问的方法,它通过提供一个统一的接口来隐藏底层数据访问的复杂性。 ... [详细]
  • 零拷贝技术是提高I/O性能的重要手段,常用于Java NIO、Netty、Kafka等框架中。本文将详细解析零拷贝技术的原理及其应用。 ... [详细]
  • 在多线程并发环境中,普通变量的操作往往是线程不安全的。本文通过一个简单的例子,展示了如何使用 AtomicInteger 类及其核心的 CAS 无锁算法来保证线程安全。 ... [详细]
  • [转]doc,ppt,xls文件格式转PDF格式http:blog.csdn.netlee353086articledetails7920355确实好用。需要注意的是#import ... [详细]
  • 本文介绍如何使用 Python 的 DOM 和 SAX 方法解析 XML 文件,并通过示例展示了如何动态创建数据库表和处理大量数据的实时插入。 ... [详细]
  • 字节流(InputStream和OutputStream),字节流读写文件,字节流的缓冲区,字节缓冲流
    字节流抽象类InputStream和OutputStream是字节流的顶级父类所有的字节输入流都继承自InputStream,所有的输出流都继承子OutputStreamInput ... [详细]
  • 在Delphi7下要制作系统托盘,只能制作一个比较简单的系统托盘,因为ShellAPI文件定义的TNotifyIconData结构体是比较早的版本。定义如下:1234 ... [详细]
  • 第二十五天接口、多态
    1.java是面向对象的语言。设计模式:接口接口类是从java里衍生出来的,不是python原生支持的主要用于继承里多继承抽象类是python原生支持的主要用于继承里的单继承但是接 ... [详细]
  • 在Windows系统中安装TensorFlow GPU版的详细指南与常见问题解决
    在Windows系统中安装TensorFlow GPU版是许多深度学习初学者面临的挑战。本文详细介绍了安装过程中的每一个步骤,并针对常见的问题提供了有效的解决方案。通过本文的指导,读者可以顺利地完成安装并避免常见的陷阱。 ... [详细]
  • 属性类 `Properties` 是 `Hashtable` 类的子类,用于存储键值对形式的数据。该类在 Java 中广泛应用于配置文件的读取与写入,支持字符串类型的键和值。通过 `Properties` 类,开发者可以方便地进行配置信息的管理,确保应用程序的灵活性和可维护性。此外,`Properties` 类还提供了加载和保存属性文件的方法,使其在实际开发中具有较高的实用价值。 ... [详细]
  • Java Socket 关键参数详解与优化建议
    Java Socket 的 API 虽然被广泛使用,但其关键参数的用途却鲜为人知。本文详细解析了 Java Socket 中的重要参数,如 backlog 参数,它用于控制服务器等待连接请求的队列长度。此外,还探讨了其他参数如 SO_TIMEOUT、SO_REUSEADDR 等的配置方法及其对性能的影响,并提供了优化建议,帮助开发者提升网络通信的稳定性和效率。 ... [详细]
  • Python 伦理黑客技术:深入探讨后门攻击(第三部分)
    在《Python 伦理黑客技术:深入探讨后门攻击(第三部分)》中,作者详细分析了后门攻击中的Socket问题。由于TCP协议基于流,难以确定消息批次的结束点,这给后门攻击的实现带来了挑战。为了解决这一问题,文章提出了一系列有效的技术方案,包括使用特定的分隔符和长度前缀,以确保数据包的准确传输和解析。这些方法不仅提高了攻击的隐蔽性和可靠性,还为安全研究人员提供了宝贵的参考。 ... [详细]
  • 在Android平台中,播放音频的采样率通常固定为44.1kHz,而录音的采样率则固定为8kHz。为了确保音频设备的正常工作,底层驱动必须预先设定这些固定的采样率。当上层应用提供的采样率与这些预设值不匹配时,需要通过重采样(resample)技术来调整采样率,以保证音频数据的正确处理和传输。本文将详细探讨FFMpeg在音频处理中的基础理论及重采样技术的应用。 ... [详细]
author-avatar
羚之舞
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有