在事件发生之后,生成的数据被收集起来,首先进入分布式消息队列,然后被 Flink 系统中的 Source 算子读取,进而向下游的转换算子(窗口算子)传递,最终由窗口算子进行计算处理。
在事件时间语义下,我们对于时间的衡量是依赖于数据本身。由于分布式系统中网络传输延迟的不确定性,实际应用中数据流是乱序的。在这种情況下,就不能简单地把数据自带的时间戳当作时钟了,而是需要用另外的标志来表示事件时间的进展,在 Flink 中把它叫作事件时间的“水位线”(Watermarks)。在实际中,我们更关系事件时间。
水位线可以看作一条特殊的数据记录,它是插入到数据流中的一个标记点, 主要内容就是一个时间戳,用来指示当前的事件时间。
而它插入流中的位置,就应该是在某个数据到来之后;这样就可以从这个数据中提取时间戳,作为当前水位线的时间戳了。
Watermarks(t)
表示当前流中事件时间已经达到 t
,表示 t
之前的数据都已经到达了。在理想状态下,数据处理的过程会保持原先的顺序,遵守先来后到的原则。这样的话我们从每个数据中提取时间戳可以保证总是从小到大的,水位线也是不断增长的,事件时钟也不断向前推进。
在实际应用中,如果当前数据量非常大,可能会有很多数据的时间戳是相同的,这时候每来一条数据就提取时间戳,插入水位线,就会做很多无用功。所以为了提高效率,一般会每隔一段时间生成一个水位线,这个水位线的时间戳,就是当前最新数据的时间戳。
“乱序”(out-of-order)是指数据的先后顺序不一致,主要是基于数据的产 生时间而言的。
插入新的水位线时,要先判断一下当前时间戳是否比之前的大,否则就不再生成新的水位线。
如果要考虑到大量数据同时到来的处理效率,我们可以周期性地生成水位线。这时只需要保存一下之前所有数据中的最大时间戳,需要插入水位线时,就直接以它作为时间戳生成新的水位线。
但是周期性会带来一个问题,我们无法正确处理“迟到”的数据。为了让窗口能够正确收集到迟到的数据,我们也可以等上一段时间。也就是用当前己有数据的最大时间戳减去等待时间,就是要插入的水位线的时间戳。
Flink 中的水位线是流处理中对低延迟和结果正确性的一个权衡机制。
在 DataStream API 中,有一个单独用于生成水位线的方法 assignTimestampsAndWatermarks()
,它主要用来为流中的数据分配时间戳,并生成水位线来指示事件时间。
public SingleOutputStreamOperator
该方法需要传入水位线生成策略 WatermarkStrategy
,它包含了时间戳分配器(TimestampAssigner)和水位线生成器(WatermarkGenerator)。
public interface WatermarkStrategy
@Override
public TimestampAssigner
return this.assigner;
}
@Override
WatermarkGenerator
}
onEvent()
和 onPredicEmit()
两个方法。setAutoWatermarkInterva()
方法来设置,默认为 200ms。// 有序流的Watermark生成
DataStream
new Event("Mary", "./home", 1000L),
new Event("Bob", "./cart", 2000L),
new Event("Bob", "./home", 3000L),
new Event("Mary", "./cart", 2000L))
.assignTimestampsAndWatermarks(WatermarkStrategy.
.withTimestampAssigner(new SerializableTimestampAssigner
@Override
public long extractTimestamp(Event event, long l) {
return event.timestamp;
}
})
);
// 无序流的Watermark生成
DataStream
new Event("Mary", "./home", 1000L),
new Event("Bob", "./cart", 2000L),
new Event("Bob", "./home", 3000L),
new Event("Mary", "./cart", 2000L))
.assignTimestampsAndWatermarks(WatermarkStrategy.
.withTimestampAssigner(new SerializableTimestampAssigner
@Override
public long extractTimestamp(Event event, long l) {
return event.timestamp;
}
})
);
在流处理中,上游任务处理完水位线、时钟改变之后,要把当前的水位线再次发出,广播给所有的下游子任务。而当一个任务接收到多个上游并行任务传递来的水位线时,应该以最小的那个作为当前任务的事件时钟。
想要更加方便高效地处理无界流,一种方式就是将无限数据切割成有限的心数据块”进行处理,这就是所谓的“窗口”。
在 Flink 中,窗口其实并不是一个“框”,流进来的数据被框住了就只能进这一个窗 口。相比之下,我们应该把窗口理解成一个“桶”,窗口可以把流切割成有限大小的多个“存储桶”(bucket)。每个数据都会分发到对应的桶中,当到达窗口结束时间时,就对每个桶中收集的数据进行计算处理。
Flink 中窗口并不是静态准备好的,而是动态创建的。当有落在这个窗口区间范围的数据到达时,才创建对应的窗口。另外,这里我们认为到达窗口结束时间时, 窗口就触发计算并关闭。
区别:在调用窗口算子之前,是否有 keyBy
操作。
keyBy
操作后,数据流会按照 key 被分为多条逻辑流,这 就是 KeyedStream。基于 KeyedStrearn 进行窗口操作时,窗口计算会在多个并行子任务上同时执行。相同 key 的数据会被发送到同一个并行子任务,而窗口操作会基于每个 key 进行单独的处理。stream.keyBy().window()
keyBy
,那么原始的 DataStream 就不会分成多条逻辑流。这时窗口逻辑只能在一个任务上执行,就相当于并行度变成了1。stream.windowAll()
窗口操作主要有两个部分:窗口分配器和窗口函数。
stream.keyBy(
// 指明了窗口的类型
.window(
// 定义窗口具体的处理逻辑
.aggregate(
定义窗口分配器(Window Assigners)是构建窗口算子的第一步,它的作用就是定义数据应该被“分配”到哪个窗口。
window()
方法,需要传入一个 WindowAssigner
作为参数,返回WindowedStream
。windowAll()
方法,同样传入一个 WindowAssigner
,返回的是 AllWindowedStream
。// 滑动计数:第一个参数计数个数,第二个参数是滑动个数大小
stream.keyBy(event -> event.user).countWindow(10, 2);
// 滚动事件时间窗口
stream.keyBy(event -> event.user).window(TumblingEventTimeWindows.of(Time.hours(1)));
// 滑动事件时间窗口:第一个参数时间窗口大小,第二个参数是滑动时间大小
stream.keyBy(event -> event.user).window(SlidingEventTimeWindows.of(Time.hours(1), Time.minutes(5)));
// 事件时间会话窗口
stream.keyBy(event -> event.user).window(EventTimeSessionWindows.withGap(Time.seconds(2)));
窗口将数据收集起来,最基本的处理操作就是进行聚合。每来一个数据就在之前结果上聚合一次,这就是“增量聚合”。
SingleOutputStreamOperator
.assignTimestampsAndWatermarks(WatermarkStrategy.
.withTimestampAssigner(new SerializableTimestampAssigner
@Override
public long extractTimestamp(Event event, long l) {
return event.timestamp;
}
})
);
stream.map(event -> Tuple2.of(event.user, 1L))
.returns(new TypeHint
.keyBy(tuple -> tuple.f0)
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.reduce((t0, t1) -> Tuple2.of(t0.f0, t0.f1 + t1.f1))
.print();
createAccumulator()
:创建一个累加器,就是为聚合创建一个初始状态,每个聚合任务只调用一次。add()
:将输入的元素添加到累加器中。getResult()
:从累加器中提取聚合的输出结果。merge()
:合并两个累加器,并将合并后的状态作为一个累加器返回。stream.keyBy(event -> event.user)
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.aggregate(new AggregateFunction
@Override
public Tuple2
return Tuple2.of(0L, 0);
}
@Override
public Tuple2
return Tuple2.of(longIntegerTuple2.f0 + event.timestamp, longIntegerTuple2.f1 + 1);
}
@Override
public String getResult(Tuple2
Timestamp timestamp = new Timestamp(longIntegerTuple2.f0 / longIntegerTuple2.f1);
return timestamp.toString();
}
@Override
public Tuple2
return Tuple2.of(longIntegerTuple2.f0 + acc1.f0, longIntegerTuple2.f1 + acc1.f1);
}
}).print();
与增量聚合函数不同,全窗口函数需要先收集窗口中的数据,并在内部缓存起来,等到窗口要输出结果的时候再取出数据进行计算。
apply
方法,传入一个 WindowFunction 的实现类。stream.keyBy(event -> true)
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.process(new UVCountByWindow())
.print();
// 自定义ProcessWindowFunction
public static class UVCountByWindow extends ProcessWindowFunction
@Override
public void process(Boolean aBoolean, ProcessWindowFunction
HashSet
for(Event event: iterable) {
userSet.add(event.user);
}
Integer uv = userSet.size();
// 结合窗口信息输出
Long start = context.window().getStart();
Long end = context.window().getEnd();
collector.collect("窗口 " + new Timestamp(start) + "-" + new Timestamp(end) + "UV=" + uv);
}
}
触发器主要是用来控制窗口什么时候触发计算(执行窗口函数)。
stream.keyBy(
// 指明了窗口的类型
.window(
// 定义自定义触发器
.trigger(new MyTrigger())
Trigger 是一个抽象类,自定义时必须实现下面四个抽象方法:
onElement()
:窗口中每到来一个元素,都会调用这个方法。onEventTime()
:当注册的事件时间计时器触发时,将调用这个方法。onProcessingTime()
:当注册的处理时间计时器触发时,将调用这个方法。clear()
:当窗口关闭销毁时,调用这个方法,一般用来清除自定义的状态。前三个方法返回的都是 TriggerResult
类型,这是一个枚举类型,其中定义了对窗口进行操作的四种类型。
移除器主要用来定义移除某些数据的逻辑。基于 WindowedStream 调用 evictor
方法,就可以传入一个自定义的移除器。Evictor 是一个接口,不同的窗口类型都有各自预实现的移除器。
stream.keyBy(
// 指明了窗口的类型
.window(
// 定义自定义移除器
.trigger(new MyEvictor())
Evictor 接口定义了两个接口:
evictBefore()
:定义执行窗口函数之前的移除数据操作evictAfter()
:定义执行窗口函数之后的移除数据操作默认情况下,预实现的移除器都是在执行窗口函数之前移除数据的。
在事件时间语义下,窗口中可能会出现数据迟到的情況,因此,Flink 提供了一个特殊的接口,可以为窗口算子设置一个允许的最大延迟。
基于 WindowedStream 调用 allowedLateness()
方法,传入一个 Time
类型的延迟时间,就可以表示允许这段时间内的延迟数据。
stream.keyBy(
// 指明了窗口的类型
.window(TumblingEventTimeWindows.of(Time.hours(1)))
// 允许延迟1分钟
.allowedLateness(Time.minutes(1))
另一种处理迟到数据的方法:将迟到的数据存入到侧输出流中。
基于 WindowedStream 调用 sideOutputLateData()
方法,该方法需要传入一个输出标签,用来标记分支的迟到数据。
OutputTag
stream.keyBy(
// 指明了窗口的类型
.window(TumblingEventTimeWindows.of(Time.hours(1)))
// 侧输出流
.sideOutputLateData(outputTag)
将迟到数据放入侧输出流之后,还应该可以将它提取出来。基于窗口处理完成之后的 DataStream,调用 getSideoutput()
方法,传入对应的输出标签,就可以获取到迟到数据所在的流了。
DataStream