作者:飘飘秀秀真人_562 | 来源:互联网 | 2023-06-22 17:35
Flink基石----Window目录Flink基石----Window一、TimeWindow----时间窗口1、TumblingProcessingTimeWindows---
Flink基石----Window
目录
- Flink基石----Window
- 一、Time Window----时间窗口
- 1、TumblingProcessingTimeWindows----滚动的处理时间窗口
- 2、TumblingEventTimeWindows----滚动的事件时间窗口
- 3、SlidingProcessingTimeWindows:----滑动的处理时间窗口
- 二、Session Window----会话窗口
- 1、ProcessingTimeSessionWindows---- 处理时间的会话窗口
- 2、EventTimeSessionWindows: 事件时间的会话窗口
- 三、Count Window----统计窗口
Flink中的Window包含三部分:
1、Time Window----时间窗口
2、Session Window----会话窗口(待没有数据的时候开始计算)
3、Count Window----统计窗口(每n条数据计算一次)
一、Time Window----时间窗口
时间窗口包含四部分:
TumblingProcessingTimeWindows:滚动的处理时间窗口
TumblingEventTimeWindows:滚动的事件时间窗口(需要设置时间字段和水位线)
SlidingProcessingTimeWindows: 滑动的处理时间窗口(滑动窗口需要指定窗口大小和滑动时间)
SlidingEventTimeWindows:滑动的事件时间窗口(滑动窗口需要指定窗口大小和滑动时间)
滚动:两个时间窗口之间没有交叉; 滑动:两个时间窗口之间有交叉
1、TumblingProcessingTimeWindows----滚动的处理时间窗口
package com.shujia.flink.window
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
object Demo1TimeWindow {
def main(args: Array[String]): Unit = {
val env: StreamExecutiOnEnvironment= StreamExecutionEnvironment.getExecutionEnvironment
//读取socket数据
val linesDS: DataStream[String] = env.socketTextStream("master", 8888)
//拆分、转成kv格式
val kvDS: DataStream[(String, Int)] = linesDS.flatMap(_.split(",")).map((_, 1))
/**
* 滚动的处理时间窗口
* .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
* 简写:
* .timeWindow(Time.seconds(5))
*/
//将单词分组,添加时间、并统计数量,打印
kvDS.keyBy(_._1)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.sum(1)
.print()
env.execute()
}
}
2、TumblingEventTimeWindows----滚动的事件时间窗口
滚动的事件时间窗口:需要设置时间字段和水位线
package com.shujia.flink.window
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.{TumblingEventTimeWindows, TumblingProcessingTimeWindows}
import org.apache.flink.streaming.api.windowing.time.Time
object Demo1TimeWindow {
def main(args: Array[String]): Unit = {
val env: StreamExecutiOnEnvironment= StreamExecutionEnvironment.getExecutionEnvironment
//读取socket数据
val linesDS: DataStream[String] = env.socketTextStream("master", 8888)
//拆分、转成kv格式
val kvDS: DataStream[(String, Int)] = linesDS.flatMap(_.split(",")).map((_, 1))
//设置时间字段, 水位线默认等于最新数据的时间戳,水位线只增加不减少
val assDS: DataStream[(String, Int)] = kvDS.assignTimestampsAndWatermarks(
//执行水位线前移的时间
new BoundedOutOfOrdernessTimestampExtractor[(String, Int)](Time.seconds(5)) {
//指定时间戳字段
override def extractTimestamp(element: (String, Int)): Int = element._2
}
)
//将单词分组,添加时间、并统计数量,打印
kvDS.keyBy(_._1)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))//上面那一行是本行的简写
.sum(1)
.print()
env.execute()
}
}
3、SlidingProcessingTimeWindows:----滑动的处理时间窗口
滑动窗口需要指定窗口大小和滑动时间
package com.shujia.flink.window
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners._
import org.apache.flink.streaming.api.windowing.time.Time
object Demo1TimeWindow {
def main(args: Array[String]): Unit = {
val env: StreamExecutiOnEnvironment= StreamExecutionEnvironment.getExecutionEnvironment
//读取socket数据
val linesDS: DataStream[String] = env.socketTextStream("master", 8888)
//拆分、转成kv格式
val kvDS: DataStream[(String, Int)] = linesDS.flatMap(_.split(",")).map((_, 1))
//将单词分组,添加时间、并统计数量,打印
kvDS.keyBy(_._1)
.window(SlidingProcessingTimeWindows.of(Time.seconds(15), Time.seconds(5)))
.sum(1)
.print()
env.execute()
}
}
二、Session Window----会话窗口
待没有数据的时候开始计算,将前面的数据放到一个窗口中进行计算,每一个key是独立计时的
会话窗口包含两种:
ProcessingTimeSessionWindows: 处理时间的会话窗口
EventTimeSessionWindows: 事件时间的会话窗口(需要设置时间字段和水位线)
1、ProcessingTimeSessionWindows---- 处理时间的会话窗口
package com.shujia.flink.window
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners._
import org.apache.flink.streaming.api.windowing.time.Time
object Demo1TimeWindow {
def main(args: Array[String]): Unit = {
val env: StreamExecutiOnEnvironment= StreamExecutionEnvironment.getExecutionEnvironment
//读取socket数据
val linesDS: DataStream[String] = env.socketTextStream("master", 8888)
//拆分、转成kv格式
val kvDS: DataStream[(String, Int)] = linesDS.flatMap(_.split(",")).map((_, 1))
//将单词分组,添加时间、并统计数量,打印
kvDS.keyBy(_._1)
.window(ProcessingTimeSessionWindows.withGap(Time.seconds(5)))
//当间隔5秒后,没有数据传入,那么开始计算
.sum(1)
.print()
env.execute()
}
}
2、EventTimeSessionWindows: 事件时间的会话窗口
需要设置时间字段和水位线
package com.shujia.flink.window
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.{EventTimeSessionWindows, ProcessingTimeSessionWindows}
import org.apache.flink.streaming.api.windowing.time.Time
object Demo2SessionWindow {
def main(args: Array[String]): Unit = {
val env: StreamExecutiOnEnvironment= StreamExecutionEnvironment.getExecutionEnvironment
//当数据量比较小时,将并行度设置为1
env.setParallelism(1)
//设置时间模式为事件时间
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val linesDS: DataStream[String] = env.socketTextStream("master", 8888)
val eventDS: DataStream[(String, Long)] = linesDS.map(line => {
val split: Array[String] = line.split(",")
(split(0), split(1).toLong)
})
//设置水位线和时间字段
val assDS: DataStream[(String, Long)] = eventDS.assignTimestampsAndWatermarks(
//执行水位线前移的时间
new BoundedOutOfOrdernessTimestampExtractor[(String, Long)](Time.seconds(5)) {
//指定时间戳字段
override def extractTimestamp(element: (String, Long)): LOng= element._2
}
)
assDS
.map(kv => (kv._1, 1))
.keyBy(_._1)
.window(EventTimeSessionWindows.withGap(Time.seconds(5)))
.sum(1)
.print()
env.execute()
}
}
三、Count Window----统计窗口
package com.shujia.flink.window
import org.apache.flink.streaming.api.scala._
object Demo3CountWindow {
def main(args: Array[String]): Unit = {
val env: StreamExecutiOnEnvironment= StreamExecutionEnvironment.getExecutionEnvironment
val linesDS: DataStream[String] = env.socketTextStream("master", 8888)
val kvDS: DataStream[(String, Int)] = linesDS.flatMap(_.split(",")).map((_, 1))
/**
* 滚动的统计窗口
* 滑动的统计窗口
*
*/
kvDS
.keyBy(_._1)
.countWindow(10)//滚动的统计窗口---每隔10条数据计算一次
.countWindow(10, 2) //每隔两条数据将最近的10条数据放到一个窗口中进行计算
.sum(1)
.print()
env.execute()
}
}