热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Flink基石—Window

Flink基石----Window目录Flink基石----Window一、TimeWindow----时间窗口1、TumblingProcessingTimeWindows---

Flink基石----Window


目录



  • Flink基石----Window

    • 一、Time Window----时间窗口

      • 1、TumblingProcessingTimeWindows----滚动的处理时间窗口

      • 2、TumblingEventTimeWindows----滚动的事件时间窗口

      • 3、SlidingProcessingTimeWindows:----滑动的处理时间窗口



    • 二、Session Window----会话窗口

      • 1、ProcessingTimeSessionWindows---- 处理时间的会话窗口

      • 2、EventTimeSessionWindows: 事件时间的会话窗口



    • 三、Count Window----统计窗口




Flink中的Window包含三部分:

1、Time Window----时间窗口

2、Session Window----会话窗口(待没有数据的时候开始计算)

3、Count Window----统计窗口(每n条数据计算一次)

image



一、Time Window----时间窗口


时间窗口包含四部分:

TumblingProcessingTimeWindows:滚动的处理时间窗口
TumblingEventTimeWindows:滚动的事件时间窗口(需要设置时间字段和水位线)
SlidingProcessingTimeWindows: 滑动的处理时间窗口(滑动窗口需要指定窗口大小和滑动时间)
SlidingEventTimeWindows:滑动的事件时间窗口(滑动窗口需要指定窗口大小和滑动时间)
滚动:两个时间窗口之间没有交叉; 滑动:两个时间窗口之间有交叉


1、TumblingProcessingTimeWindows----滚动的处理时间窗口

package com.shujia.flink.window
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
object Demo1TimeWindow {
def main(args: Array[String]): Unit = {
val env: StreamExecutiOnEnvironment= StreamExecutionEnvironment.getExecutionEnvironment
//读取socket数据
val linesDS: DataStream[String] = env.socketTextStream("master", 8888)
//拆分、转成kv格式
val kvDS: DataStream[(String, Int)] = linesDS.flatMap(_.split(",")).map((_, 1))
/**
* 滚动的处理时间窗口
* .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
* 简写:
* .timeWindow(Time.seconds(5))
*/
//将单词分组,添加时间、并统计数量,打印
kvDS.keyBy(_._1)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.sum(1)
.print()
env.execute()
}
}

2、TumblingEventTimeWindows----滚动的事件时间窗口

滚动的事件时间窗口:需要设置时间字段和水位线


package com.shujia.flink.window
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.{TumblingEventTimeWindows, TumblingProcessingTimeWindows}
import org.apache.flink.streaming.api.windowing.time.Time
object Demo1TimeWindow {
def main(args: Array[String]): Unit = {
val env: StreamExecutiOnEnvironment= StreamExecutionEnvironment.getExecutionEnvironment
//读取socket数据
val linesDS: DataStream[String] = env.socketTextStream("master", 8888)
//拆分、转成kv格式
val kvDS: DataStream[(String, Int)] = linesDS.flatMap(_.split(",")).map((_, 1))
//设置时间字段, 水位线默认等于最新数据的时间戳,水位线只增加不减少
val assDS: DataStream[(String, Int)] = kvDS.assignTimestampsAndWatermarks(
//执行水位线前移的时间
new BoundedOutOfOrdernessTimestampExtractor[(String, Int)](Time.seconds(5)) {
//指定时间戳字段
override def extractTimestamp(element: (String, Int)): Int = element._2
}
)
//将单词分组,添加时间、并统计数量,打印
kvDS.keyBy(_._1)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))//上面那一行是本行的简写
.sum(1)
.print()
env.execute()
}
}

3、SlidingProcessingTimeWindows:----滑动的处理时间窗口

滑动窗口需要指定窗口大小和滑动时间


package com.shujia.flink.window
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners._
import org.apache.flink.streaming.api.windowing.time.Time
object Demo1TimeWindow {
def main(args: Array[String]): Unit = {
val env: StreamExecutiOnEnvironment= StreamExecutionEnvironment.getExecutionEnvironment
//读取socket数据
val linesDS: DataStream[String] = env.socketTextStream("master", 8888)
//拆分、转成kv格式
val kvDS: DataStream[(String, Int)] = linesDS.flatMap(_.split(",")).map((_, 1))
//将单词分组,添加时间、并统计数量,打印
kvDS.keyBy(_._1)
.window(SlidingProcessingTimeWindows.of(Time.seconds(15), Time.seconds(5)))
.sum(1)
.print()
env.execute()
}
}

二、Session Window----会话窗口


待没有数据的时候开始计算,将前面的数据放到一个窗口中进行计算,每一个key是独立计时的

会话窗口包含两种:

ProcessingTimeSessionWindows: 处理时间的会话窗口
EventTimeSessionWindows: 事件时间的会话窗口(需要设置时间字段和水位线)


1、ProcessingTimeSessionWindows---- 处理时间的会话窗口

package com.shujia.flink.window
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners._
import org.apache.flink.streaming.api.windowing.time.Time
object Demo1TimeWindow {
def main(args: Array[String]): Unit = {
val env: StreamExecutiOnEnvironment= StreamExecutionEnvironment.getExecutionEnvironment
//读取socket数据
val linesDS: DataStream[String] = env.socketTextStream("master", 8888)
//拆分、转成kv格式
val kvDS: DataStream[(String, Int)] = linesDS.flatMap(_.split(",")).map((_, 1))
//将单词分组,添加时间、并统计数量,打印
kvDS.keyBy(_._1)
.window(ProcessingTimeSessionWindows.withGap(Time.seconds(5)))
//当间隔5秒后,没有数据传入,那么开始计算
.sum(1)
.print()
env.execute()
}
}

2、EventTimeSessionWindows: 事件时间的会话窗口

需要设置时间字段和水位线


package com.shujia.flink.window
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.{EventTimeSessionWindows, ProcessingTimeSessionWindows}
import org.apache.flink.streaming.api.windowing.time.Time
object Demo2SessionWindow {
def main(args: Array[String]): Unit = {
val env: StreamExecutiOnEnvironment= StreamExecutionEnvironment.getExecutionEnvironment
//当数据量比较小时,将并行度设置为1
env.setParallelism(1)
//设置时间模式为事件时间
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val linesDS: DataStream[String] = env.socketTextStream("master", 8888)
val eventDS: DataStream[(String, Long)] = linesDS.map(line => {
val split: Array[String] = line.split(",")
(split(0), split(1).toLong)
})
//设置水位线和时间字段
val assDS: DataStream[(String, Long)] = eventDS.assignTimestampsAndWatermarks(
//执行水位线前移的时间
new BoundedOutOfOrdernessTimestampExtractor[(String, Long)](Time.seconds(5)) {
//指定时间戳字段
override def extractTimestamp(element: (String, Long)): LOng= element._2
}
)
assDS
.map(kv => (kv._1, 1))
.keyBy(_._1)
.window(EventTimeSessionWindows.withGap(Time.seconds(5)))
.sum(1)
.print()
env.execute()
}
}

三、Count Window----统计窗口

package com.shujia.flink.window
import org.apache.flink.streaming.api.scala._
object Demo3CountWindow {
def main(args: Array[String]): Unit = {
val env: StreamExecutiOnEnvironment= StreamExecutionEnvironment.getExecutionEnvironment
val linesDS: DataStream[String] = env.socketTextStream("master", 8888)
val kvDS: DataStream[(String, Int)] = linesDS.flatMap(_.split(",")).map((_, 1))
/**
* 滚动的统计窗口
* 滑动的统计窗口
*
*/
kvDS
.keyBy(_._1)
.countWindow(10)//滚动的统计窗口---每隔10条数据计算一次
.countWindow(10, 2) //每隔两条数据将最近的10条数据放到一个窗口中进行计算
.sum(1)
.print()
env.execute()
}
}


推荐阅读
  • TableAPI报一下异常:FieldtypesofqueryresultandregisteredTableSink
    报错信息如下:Exceptioninthread“main”org.apache.flink.table.api.ValidationException:Fieldtypesofq ... [详细]
  • GetWindowLong函数
    今天在看一个代码里头写了GetWindowLong(hwnd,0),我当时就有点费解,靠,上网搜索函数原型说明,死活找不到第 ... [详细]
  • Flink(三)IDEA开发Flink环境搭建与测试
    一.IDEA开发环境1.pom文件设置1.8 ... [详细]
  • 在计算机领域,数据仓库(DW或DWH),是一个用于报告和数据分析的零碎,被认为是商业智能的一个外围组成部分。它将以后和历史数据存储在一个中央,为整个企 ... [详细]
  • 本文介绍了使用Java实现大数乘法的分治算法,包括输入数据的处理、普通大数乘法的结果和Karatsuba大数乘法的结果。通过改变long类型可以适应不同范围的大数乘法计算。 ... [详细]
  • HDU 2372 El Dorado(DP)的最长上升子序列长度求解方法
    本文介绍了解决HDU 2372 El Dorado问题的一种动态规划方法,通过循环k的方式求解最长上升子序列的长度。具体实现过程包括初始化dp数组、读取数列、计算最长上升子序列长度等步骤。 ... [详细]
  • 本文介绍了OC学习笔记中的@property和@synthesize,包括属性的定义和合成的使用方法。通过示例代码详细讲解了@property和@synthesize的作用和用法。 ... [详细]
  • C++字符字符串处理及字符集编码方案
    本文介绍了C++中字符字符串处理的问题,并详细解释了字符集编码方案,包括UNICODE、Windows apps采用的UTF-16编码、ASCII、SBCS和DBCS编码方案。同时说明了ANSI C标准和Windows中的字符/字符串数据类型实现。文章还提到了在编译时需要定义UNICODE宏以支持unicode编码,否则将使用windows code page编译。最后,给出了相关的头文件和数据类型定义。 ... [详细]
  • 本文介绍了使用Python解析C语言结构体的方法,包括定义基本类型和结构体类型的字典,并提供了一个示例代码,展示了如何解析C语言结构体。 ... [详细]
  • 本文介绍了在Python中使用zlib模块进行字符串的压缩与解压缩的方法,并探讨了其在内存优化方面的应用。通过压缩存储URL等长字符串,可以大大降低内存消耗,虽然处理时间会增加,但是整体效果显著。同时,给出了参考链接,供进一步学习和应用。 ... [详细]
  • 前言本篇为大家总结社区多人合作常见的场景和对应的git操作命令。本篇非新手教程,阅读本篇前需具备Git基础知识。Git入门教程请参考https://www ... [详细]
  • 透明木头问世!“木头大王”胡良兵再发顶刊,已成立公司加速落地69
    道翰天琼认知智能机器人平台API接口大脑为您揭秘。木材是人类最古老的建筑材料之一,也是一种绿色节能材料,我们对其外观的认知可谓根深蒂固。如今,随着透明木材的问世,这一观感将被颠覆。 ... [详细]
  • 5.单例模式classMarker(valcolor:String){类中的任何代码段作为构造函数的一部分println(Creating+this)over ... [详细]
  • 这一节是此方案设计的最核心部分,指针能否准确的定位出来是最关键的问题,在学习过程中,初步采用几种方案来尽可能的提高指针定位的精度: ... [详细]
  • 方法的返回值基本数据类型:byte,short,int,long,float,double,char,boolean引用数据类型:类:当类 ... [详细]
author-avatar
飘飘秀秀真人_562
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有