目录
Apache Flink® — Stateful Computations over Data Streams
有状态的流是其最大的特性, 可以使用 stage 进行各种状态的保存。
有状态
分布式
并行度
flink 分布式运行环境:
TaskManager -> slot -> task -> 并行度
forward strategy
key based strategy (key by)
broadcast strategy
就是把数据广播到下游的所有task中
random strategy
Operator Chain的条件:
如下 keyBy-sum 会和 map 合并在一个 JVM线程中进行处理,也就是搞成一个task去运行里面的业务逻辑。
Stream Graph
Job Graph
Execution Graph
Physical Execution Graph
获取source的方式
(1)基于文件
readTextFile(path)
读取文本文件,文件遵循TextInputFormat 读取规则,逐行读取并返回。
(2)基于socket
socketTextStream
从socker中读取数据,元素可以通过一个分隔符切开。
(3)基于集合
fromCollection(Collection)
通过java 的collection集合创建一个数据流,集合中的所有元素必须是相同类型的。
(4)自定义输入
addSource 可以实现读取第三方数据源的数据
系统内置提供了一批connectors,连接器会提供对应的source支持【kafka】
union :
合并多个流,新的流会包含所有流中的数据,但是union是一个限制,就是所有合并的流类型必须是一致的
connect,conMap和conFlatMap :
和union类似,但是只能连接两个流,两个流的数据类型可以不同,会对两个流中的数据应用不同的处理方法
Split和Select:
根据规则把一个数据流切分为多个流
应用场景:
此机制毫不夸张的讲 是 flink 最牛B的特性之一, 比 spark streaming的 状态管理好用 100倍。就靠这点也足够支持 flink作为最牛逼的实时计算框架。
接下来会对 state 进行非常详细的讲解
state : Flink中有两种基本类型的State:Keyed State,Operator State,他们两种都可以以两种形式存在:原
始状态(raw state)和托管状态(managed state)
托管状态:由Flink框架管理的状态,我们通常使用的就是这种。
原始状态:由用户自行管理状态具体的数据结构,框架在做checkpoint的时候,使用byte[]来读写状态
内容,对其内部数据结构一无所知。通常在DataStream上的状态推荐使用托管的状态,当实现一个用
户自定义的operator时,会使用到原始状态。但是我们工作中一般不常用,所以我们不考虑他。
Operator State(task级别的)
Keyed State(针对每一个key) 可以理解 这个 stage 最顶层是一个 map, map的key 就是 keyBy的 key,故可以存储和使用具体任意一个 KEY 对应的状态
Flink支持的StateBackend:
MemoryStateBackend
默认情况下,状态信息是存储在 TaskManager 的堆内存中的,checkpoint 的时候将状态保存到
JobManager 的堆内存中。
缺点:
只能保存数据量小的状态
状态数据有可能会丢失
优点:
开发测试很方便
FsStateBackend
状态信息存储在 TaskManager 的堆内存中的,checkpoint 的时候将状态保存到指定的文件中 (HDFS
等文件系统)
缺点:
状态大小受TaskManager内存限制(默认支持5M)
优点:
状态访问速度很快
状态信息不会丢失
用于: 生产,也可存储状态数据量大的情况
RocksDBStateBackend
状态信息存储在 RocksDB 数据库 (key-value 的数据存储服务), 最终保存在本地文件中
checkpoint 的时候将状态保存到指定的文件中 (HDFS 等文件系统)
缺点:
状态访问速度有所下降
优点:
可以存储超大量的状态信息
状态信息不会丢失
用于: 生产,可以存储超大量的状态信息
(1)单任务调整
修改当前任务代码
env.setStateBackend(new
FsStateBackend("hdfs://namenode:9000/flink/checkpoints"));
或者new MemoryStateBackend()
或者new RocksDBStateBackend(filebackend, true);【需要添加第三方依赖】
(2)全局调整
修改flink-conf.yaml
state.backend: filesystem
state.checkpoints.dir: hdfs://namenode:9000/flink/checkpoints
注意:state.backend的值可以是下面几种:jobmanager(MemoryStateBackend),
filesystem(FsStateBackend), rocksdb(RocksDBStateBackend)
概述:
(1)为了保证state的容错性,Flink需要对state进行checkpoint。
(2)Checkpoint是Flink实现容错机制最核心的功能,它能够根据配置周期性地基于Stream中各个
Operator/task的状态来生成快照,从而将这些状态数据定期持久化存储下来,当Flink程序一旦意外崩
溃时,重新运行程序时可以有选择地从这些快照进行恢复,从而修正因为故障带来的程序数据异常
(3)Flink的checkpoint机制可以与(stream和state)的持久化存储交互的前提:
持久化的source,它需要支持在一定时间内重放事件。这种sources的典型例子是持久化的消息队列
(比如Apache Kafka,RabbitMQ等)或文件系统(比如HDFS,S3,GFS等)
用于state的持久化存储,例如分布式文件系统(比如HDFS,S3,GFS等)
生成快照。5s
恢复快照
默认checkpoint功能是disabled的,想要使用的时候需要先启用,checkpoint开启之后,
checkPointMode有两种,Exactly-once和At-least-once,默认的checkPointMode是Exactly-once,
Exactly-once对于大多数应用来说是最合适的。At-least-once可能用在某些延迟超低的应用程序(始终
延迟为几毫秒)。
默认checkpoint功能是disabled的,想要使用的时候需要先启用
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
// 每隔1000 ms进行启动一个检查点【设置checkpoint的周期】
env.enableCheckpointing(1000);
// 高级选项:
// 设置模式为exactly-once (这是默认值)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 确保检查点之间有至少500 ms的间隔【checkpoint最小间隔】
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
// 检查点必须在一分钟内完成,或者被丢弃【checkpoint的超时时间】
env.getCheckpointConfig().setCheckpointTimeout(60000);
// 同一时间只允许进行一个检查点
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// 表示一旦Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定的
Checkpoint【详细解释见备注】
env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCl
eanup.RETAIN_ON_CANCELLATION);
默认的重启策略是在 Flink 的配置文件 flink-conf.yaml 中指定。
restart-strategy 具体包括如下:
(1)固定间隔 (Fixed delay)
(2)失败率 (Failure rate)
(3)无重启 (No restart)
没有启动 CK 则肯定没有重启策略,若启用了,则默认是 Fixed delay 的重启策略, 尝试重启次数
默认值是:Integer.MAX_VALUE,重启策略可以在flink-conf.yaml中配置,表示全局的配置。也可以在
应用代码中动态指定,会覆盖全局配置。
第一种:全局配置 flink-conf.yaml
restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 3
restart-strategy.fixed-delay.delay: 10 s
第二种:应用代码设置
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
3, // 尝试重启的次数
Time.of(10, TimeUnit.SECONDS) // 间隔
));
第一种:全局配置 flink-conf.yaml
restart-strategy: failure-rate
restart-strategy.failure-rate.max-failures-per-interval: 3
restart-strategy.failure-rate.failure-rate-interval: 5 min
restart-strategy.failure-rate.delay: 10 s
第二种:应用代码设置
env.setRestartStrategy(RestartStrategies.failureRateRestart(
3, // 一个时间段内的最大失败次数
Time.of(5, TimeUnit.MINUTES), // 衡量失败次数的是时间段
Time.of(10, TimeUnit.SECONDS) // 间隔
));
无重启 (No restart)
第一种:全局配置 flink-conf.yaml
restart-strategy: none
第二种:应用代码设置
env.setRestartStrategy(RestartStrategies.noRestart());
默认只保留最近成功生成的1个 checkpoint , 程序失败时从这个CK来进行恢复。但是也可以配置多个CK , 这样恢复能力更强一些。
conf/flink-conf.yam
以下配置最多需要保存CK的个数:
state.checkpoints.num-retained: 20
通过以下查看, 就能知道所有的CK 列表信息,若要恢复回退只需要指定对应的CK启动即可
hdfs dfs -ls hdfs://namenode:9000/flink/checkpoints
根据每隔5秒执行最近10秒的数据,Flink划分的窗口, 左闭右开, 是按照时间间隔为最小粒度切分窗口的。
[00:00:00, 00:00:05) [00:00:05, 00:00:10)
[00:00:10, 00:00:15) [00:00:15, 00:00:20)
[00:00:20, 00:00:25) [00:00:25, 00:00:30)
[00:00:30, 00:00:35) [00:00:35, 00:00:40)
[00:00:40, 00:00:45) [00:00:45, 00:00:50)
[00:00:50, 00:00:55) [00:00:55, 00:01:00)
[00:01:00, 00:01:05) ...
Event Time:事件产生的时间,它通常由事件中的时间戳描述。
Ingestion time:事件(日志,数据,消息)进入Flink的时间(不考虑)
Processing Time:事件被处理时当前系统的时间
如下图所示,需求模拟, 第13秒发送 2个事件, 第 16秒发送 一个事件:
现在需求是每隔5S计算10S之内的窗口结果;
下图是按照 执行时间窗口计算的示意图:最终计算出来的结果就是
(hadoop,2)
(hadoop,3)
(hadoop,1)
如下图所示 原本 16S 的一个事件延迟到了 19S 才发送出去,最终结果如下;
发现按照执行时间处理的话, 在发生事件延迟到达的时候, 对结果的准确度影响非常大;
(hadoop,1)
(hadoop,3)
(hadoop,2)
关键代码
//步骤一:设置时间类型
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
dataStream.map(new MapFunction
@Override
public Tuple2
String[] fields = line.split(",");
return new Tuple2<>(fields[0],Long.valueOf(fields[1]));
}
//步骤二:获取数据里面的event Time
}).assignTimestampsAndWatermarks(new EventTimeExtractor() )
.keyBy(0)
.timeWindow(Time.seconds(10), Time.seconds(5))
.process(new SumProcessWindowFunction())
.print().setParallelism(1);
private static class EventTimeExtractor
implements AssignerWithPeriodicWatermarks
FastDateFormat dateFormat = FastDateFormat.getInstance("HH:mm:ss");
// 拿到每一个事件的 Event Time
@Override
public long extractTimestamp(Tuple2
long previousElementTimestamp) {
return element.f1;
}
@Nullable
@Override
public Watermark getCurrentWatermark() {
return new Watermark(System.currentTimeMillis());
}
}
如下图所示执行计算,最终结果如下:
(hadoop,1)
(hadoop,3)
(hadoop,1)
可以发现按照事件时间是比较接近真实结果的, 但是比如第一个窗口消息已经延迟了, 则不会触发计算。 最终放大招,终极解决方案就是 waterMark
如下图所示执行过程,划红线的绿色框就代表着水印, 水印左边的是真实窗口可以理解为水杯中存储的实际的水, 右边的代表水面以上部分到达杯沿, 只有杯沿 时间 大于等于 窗口结束时间时就会触发窗口计算。这也是为啥叫 watermaker的原因;
最终执行结果如下 : 可以发现完全符合预期,可以水印 把 19S延迟到达的消息还是放到 第一个窗口去计算了 。
(hadoop,2)
(hadoop,3)
(hadoop,1)
关键核心代码:
//步骤一:设置时间类型
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
dataStream.map(new MapFunction
@Override
public Tuple2
String[] fields = line.split(",");
return new Tuple2<>(fields[0],Long.valueOf(fields[1]));
}
//步骤二:获取数据里面的event Time
}).assignTimestampsAndWatermarks(new EventTimeExtractor() )
.keyBy(0)
.timeWindow(Time.seconds(10), Time.seconds(5))
.process(new SumProcessWindowFunction())
.print().setParallelism(1);
private static class EventTimeExtractor
implements AssignerWithPeriodicWatermarks
FastDateFormat dateFormat = FastDateFormat.getInstance("HH:mm:ss");
// 拿到每一个事件的 Event Time
@Override
public long extractTimestamp(Tuple2
long previousElementTimestamp) {
return element.f1;
}
@Nullable
@Override
public Watermark getCurrentWatermark() {
//window延迟5秒触发
return new Watermark(System.currentTimeMillis() - 5000);
}
}
产生的根本原因是消息乱序,延迟到达, kafka 是可能出现消息延迟的, 比如挤压、业务处理延迟、或者消费失败重试 都可以导致先处理的消息把后处理的消息延迟到达;
既然乱序肯定会产生那如何催进窗口触发计算呢? 那当然是通过 watermaker机制,通过此机制触发窗口啥时候 进行计算。
有序的流的watermarks
无序的流的watermarks
多并行度流的watermarks
一个window可能会接受到多个waterMark,我们以最小的为准。
关键代码:
//步骤一:设置时间类型
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//设置waterMark产生的周期为1s
env.getConfig().setAutoWatermarkInterval(1000);
dataStream.map(new MapFunction
@Override
public Tuple2
String[] fields = line.split(",");
return new Tuple2<>(fields[0],Long.valueOf(fields[1]));
}
//步骤二:获取数据里面的event Time
}).assignTimestampsAndWatermarks(new EventTimeExtractor() )
.keyBy(0)
.timeWindow(Time.seconds(3))
.process(new SumProcessWindowFunction())
.print().setParallelism(1);
private static class EventTimeExtractor
implements AssignerWithPeriodicWatermarks
FastDateFormat dateFormat = FastDateFormat.getInstance("HH:mm:ss");
private long currentMaxEventTime = 0L;
private long maxOutOfOrderness = 10000; // 最大允许的乱序时间 10 秒
// 拿到每一个事件的 Event Time
@Override
public long extractTimestamp(Tuple2
long previousElementTimestamp) {
long currentElementEventTime = element.f1;
currentMaxEventTime = Math.max(currentMaxEventTime,
currentElementEventTime);
System.out.println("event = " + element
+ "|" + dateFormat.format(element.f1) // Event Time
+ "|" + dateFormat.format(currentMaxEventTime) // Max Event
Time
+ "|" +
dateFormat.format(getCurrentWatermark().getTimestamp())); // Current Watermark
return currentElementEventTime;
}
@Nullable
@Override
public Watermark getCurrentWatermark() {
/**
* WasterMark会周期性的产生,默认就是每隔200毫秒产生一个
*
* 设置 watermark 产生的周期为 1000ms
* env.getConfig().setAutoWatermarkInterval(1000);
*/
//window延迟5秒触发
System.out.println("water mark...");
return new Watermark(currentMaxEventTime - maxOutOfOrderness);
}
}
}
总结下window 触发计算的机制如下:
也就是 watermaker 驱动着窗口的计算,并且窗口内的必须有数据,并且水印计算的时候 窗口必须按照 事件事件处理
1, watermaker 时间 >= window_end_time
2, 在 [window_start_time, window_end_time) 区间中有数据存在,注意是左闭右开的区间,而且是以 event time 来计算的
上述 使用【WaterMark】机制解决无序 的案例中,WaterMark 要是不够长, 或者消息延迟太多, 还是可能会导致消息丢失不会在窗口中进行计算。
具体延迟太多的消息是否触发计算, 可以考虑直接丢失, 因为延迟太多意义不大; 或者采取其他手段,接下来就介绍这种情况该如何处理。
总体延迟消息的处理一共下述三种方案:
).assignTimestampsAndWatermarks(new EventTimeExtractor() )
.keyBy(0)
.timeWindow(Time.seconds(3))
.allowedLateness(Time.seconds(2)) // 允许事件迟到 2 秒
.process(new SumProcessWindowFunction())
.print().setParallelism(1);
//步骤一:设置时间类型
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//设置waterMark产生的周期为1s
env.getConfig().setAutoWatermarkInterval(1000);
// 保存迟到的,会被丢弃的数据
OutputTag
new OutputTag
SingleOutputStreamOperator
MapFunction
@Override
public Tuple2
String[] fields = line.split(",");
return new Tuple2<>(fields[0], Long.valueOf(fields[1]));
}
//步骤二:获取数据里面的event Time
}).assignTimestampsAndWatermarks(new EventTimeExtractor())
.keyBy(0)
.timeWindow(Time.seconds(3))
// .allowedLateness(Time.seconds(2)) // 允许事件迟到 2 秒
.sideOutputLateData(outputTag) // 保存迟到太多的数据
.process(new SumProcessWindowFunction());
DataStream
= result.getSideOutput(outputTag).map(new
MapFunction
@Override
public String map(Tuple2
Exception {
return "迟到的数据:" + stringLongTuple2.toString();
}
});
// 以上迟到的消息就可以再保存到 kafka 后续再进行处理;
private static class EventTimeExtractor
implements AssignerWithPeriodicWatermarks
FastDateFormat dateFormat = FastDateFormat.getInstance("HH:mm:ss");
private long currentMaxEventTime = 0L;
private long maxOutOfOrderness = 10000; // 最大允许的乱序时间 10 秒
// 拿到每一个事件的 Event Time
@Override
public long extractTimestamp(Tuple2
long previousElementTimestamp) {
long currentElementEventTime = element.f1;
currentMaxEventTime = Math.max(currentMaxEventTime,
currentElementEventTime);
System.out.println("event = " + element
+ "|" + dateFormat.format(element.f1) // Event Time
+ "|" + dateFormat.format(currentMaxEventTime) // Max Event
Time
+ "|" +
dateFormat.format(getCurrentWatermark().getTimestamp())); // Current Watermark
return currentElementEventTime;
}
@Nullable
@Override
public Watermark getCurrentWatermark() {
/**
* WasterMark会周期性的产生,默认就是每隔200毫秒产生一个
*
* 设置 watermark 产生的周期为 1000ms
* env.getConfig().setAutoWatermarkInterval(1000);
*/
System.out.println("water mark...");
return new Watermark(currentMaxEventTime - maxOutOfOrderness);
}
}
}
结论如下:
一个window可能会接受到多个waterMark,我们以最小的为准。
其实 就是 source transform sink 等算子的并发度 大于 1 , 至少2个以上, 会导致
接下来要举例论证上述原理: 下面验证一种场景 3S 滚动窗口计算,按照事件时间处理
1、 设置实时任务并行度为2
按照如下打印执行事件:
当前线程ID、事件时间、max事件时间、水印(max事件时间 - 10S)
打印消费消息结果如下:
当前线程ID:55event = (000001,1461756883000)|19:34:43|19:34:43|19:34:33
当前线程ID:56event = (000001,1461756870000)|19:34:30|19:34:30|19:34:20
当前线程ID:56event = (000001,1461756888000)|19:34:48|19:34:48|19:34:38
会发现最终处理的窗口是 window start time : 19:34:30、window end time : 19:34:33, 此时处理的窗口中的事件是 (000001,1461756870000)|19:34:30
线程ID 56 最终水印以最大的为准,也就是 19:34:38;而此时线程 55 的水印 是 19:34:33 ; 不同线程的水印以最小的为准,所以最终的水印就是 19:34:33 会触发
【19:34:30,19:34:33) 窗口的计算,最终打印的日志也证明了猜想。
窗口通常被区分为不同的类型:
tumbling windows:滚动窗口 【没有重叠】 (time,count)
sliding windows:滑动窗口 【有重叠】(time,count)
session windows:会话窗口 (time)
global windows: 没有窗口
Keyed Window 和 Non Keyed Window
Keyed Windows:
stream
.keyBy(...) 1 <- keyed versus non-keyed windows
.window(...) 1 <- required: "assigner"
[.trigger(...)] <- optional: "trigger" (else default trigger)
[.evictor(...)] <- optional: "evictor" (else no evictor)
[.allowedLateness(...)] 1 <- optional: "lateness" (else zero)
[.sideOutputLateData(...)] 1 <- optional: "output tag" (else no side
output for late data)
.reduce/aggregate/fold/apply() 1 <- required: "function"
[.getSideOutput(...)] 1 <- optional: "output tag"
Non-Keyed Windows
stream
.windowAll(...) <- required: "assigner"
[.trigger(...)] <- optional: "trigger" (else default trigger)
[.evictor(...)] <- optional: "evictor" (else no evictor)
[.allowedLateness(...)] <- optional: "lateness" (else zero)
[.sideOutputLateData(...)] <- optional: "output tag" (else no side output
for late data)
.reduce/aggregate/fold/apply() <- required: "function"
[.getSideOutput(...)] <- optional: "output tag"
global window
global window + trigger 一起配合才能使用
单词每出现三次统计一次
stream.keyBy(0)
.window(GlobalWindows.create())
//如果不加这个程序是启动不起来的
.trigger(CountTrigger.of(3))
.sum(1)
.print();
以上需求实现是每三个元素到来就触发统计,但是 sum stage 会一直累加保留,那有啥办法可以每次只打印最近三个元素的和呢 ? 解决方案就是需要自定义 trigger,状态自己存储,当触发窗口计算时
对已经计算的窗口元素清除。 具体实现见下述所示。
需求:实现每隔3个单词,计算最近的计数,计数结果不能包含之前的计数结果。
/**
* 使用Trigger 自己实现一个类似CountWindow的效果
*/
public class CountWindowWordCount {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource
env.socketTextStream("10.148.15.10", 8888);
SingleOutputStreamOperator
dataStream.flatMap(new FlatMapFunction
@Override
public void flatMap(String line, Collector
collector) throws Exception {
String[] fields = line.split(",");
for (String word : fields) {
collector.collect(Tuple2.of(word, 1));
}
}
});
WindowedStream
= stream.keyBy(0)
.window(GlobalWindows.create())
.trigger(new MyCountTrigger(3));
//可以看看里面的源码,跟我们写的很像
// WindowedStream
keyedWindow = stream.keyBy(0)
// .window(GlobalWindows.create())
// .trigger(CountTrigger.of(3));
DataStream
wordCounts.print().setParallelism(1);
env.execute("Streaming WordCount");
}
private static class MyCountTrigger
extends Trigger
// 表示指定的元素的最大的数量
private long maxCount;
// 用于存储每个 key 对应的 count 值
private ReducingStateDescriptor
= new ReducingStateDescriptor
ReduceFunction
@Override
public Long reduce(Long aLong, Long t1) throws Exception {
return aLong + t1;
}
}, Long.class);
public MyCountTrigger(long maxCount) {
this.maxCount = maxCount;
}
/**
* 当一个元素进入到一个 window 中的时候就会调用这个方法
* @param element 元素
* @param timestamp 进来的时间
* @param window 元素所属的窗口
* @param ctx 上下文
* @return TriggerResult
* 1. TriggerResult.CONTINUE :表示对 window 不做任何处理
* 2. TriggerResult.FIRE :表示触发 window 的计算
* 3. TriggerResult.PURGE :表示清除 window 中的所有数据
* 4. TriggerResult.FIRE_AND_PURGE :表示先触发 window 计算,然后删除
window 中的数据
* @throws Exception
*/
@Override
public TriggerResult onElement(Tuple2
long timestamp,
GlobalWindow window,
TriggerContext ctx) throws Exception {
// 拿到当前 key 对应的 count 状态值
ReducingState
ctx.getPartitionedState(stateDescriptor);
// count 累加 1
count.add(1L);
// 如果当前 key 的 count 值等于 maxCount
if (count.get() == maxCount) {
count.clear();
// 触发 window 计算,删除数据
return TriggerResult.FIRE_AND_PURGE;
}
// 否则,对 window 不做任何的处理
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long time,
GlobalWindow window,
TriggerContext ctx) throws
Exception {
// 写基于 Processing Time 的定时器任务逻辑
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onEventTime(long time,
GlobalWindow window,
TriggerContext ctx) throws Exception {
// 写基于 Event Time 的定时器任务逻辑
return TriggerResult.CONTINUE;
}
@Override
public void clear(GlobalWindow window, TriggerContext ctx) throws
Exception {
// 清除状态值
ctx.getPartitionedState(stateDescriptor).clear();
}
}
}
需求:实现每隔2个单词,计算最近3个单词; 要求不使用现成的API 实现
实现思路:
1、 定义自定义 trigger 当收集到2个单词之后触发计算
2、自定义 trigger时只会触发计算,不会把窗口中的元素清除掉
3、自定义 Evictor 每次触发计算时,只计算最近三个。把之前的老数据清除掉
4、以上组合就实现了 每隔2个单词,计算最近三个单词的效果。
private static class MyCountEvictor
implements Evictor
// window 的大小
private long windowCount;
public MyCountEvictor(long windowCount) {
this.windowCount = windowCount;
}
/**
* 在 window 计算之前删除特定的数据
* @param elements window 中所有的元素
* @param size window 中所有元素的大小
* @param window window
* @param evictorContext 上下文
*/
@Override
public void evictBefore(Iterable
int size, GlobalWindow window, EvictorContext
evictorContext) {
if (size <= windowCount) {
return;
} else {
int evictorCount = 0;
Iterator
elements.iterator();
while (iterator.hasNext()) {
iterator.next();
evictorCount++;
// 如果删除的数量小于当前的 window 大小减去规定的 window 的大小,就
需要删除当前的元素
if (evictorCount > size - windowCount) {
break;
} else {
iterator.remove();
}
}
}
}
/**
* 在 window 计算之后删除特定的数据
* @param elements window 中所有的元素
* @param size window 中所有元素的大小
* @param window window
* @param evictorContext 上下文
*/
@Override
public void evictAfter(Iterable
4.3.4 window增量聚合
窗口中每进入一条数据,就进行一次计算,等时间到了展示最后的结果
常用的聚合算子
int size, GlobalWindow window, EvictorContext
evictorContext) {
}
}
}
窗口中每进入一条数据,就进行一次计算,等时间到了展示最后的结果; 所以该算子是性能很高的, 只保留最终的状态。
常用的聚合算子
reduce(reduceFunction)
aggregate(aggregateFunction)
sum(),min(),max()
等属于窗口的数据到齐,才开始进行聚合计算【可以实现对窗口内的数据进行排序等需求】
apply(windowFunction)
process(processWindowFunction)
processWindowFunction比windowFunction提供了更多的上下文信息。类似于map和RichMap的关系
两个window之间可以进行join,join操作只支持三种类型的window:滚动窗口,滑动窗口,会话窗口
stream.join(otherStream) //两个流进行关联
.where(
.equalTo(
.window(
.apply(
DataStream
DataStream
orangeStream.join(greenStream)
.where(
.equalTo(
.window(TumblingEventTimeWindows.of(Time.milliseconds(2)))
.apply (new JoinFunction
@Override
public String join(Integer first, Integer second) {
return first + "," + second;
}
});
DataStream
DataStream
orangeStream.join(greenStream)
.where(
.equalTo(
.window(SlidingEventTimeWindows.of(Time.milliseconds(2) /* size */,
Time.milliseconds(1) /* slide */))
.apply (new JoinFunction
@Override
public String join(Integer first, Integer second) {
return first + "," + second;
}
});
DataStream
DataStream
orangeStream.join(greenStream)
.where(
.equalTo(
.window(EventTimeSessionWindows.withGap(Time.milliseconds(1)))
.apply (new JoinFunction
@Override
public String join(Integer first, Integer second) {
return first + "," + second;
}
});
DataStream
DataStream
orangeStream
.keyBy(
.intervalJoin(greenStream.keyBy(
.between(Time.milliseconds(-2), Time.milliseconds(1))
.process (new ProcessJoinFunction
public void processElement(Integer left, Integer right, Context ctx,
Collector
out.collect(first + "," + second);
}
});