作者:哲纸 | 来源:互联网 | 2023-09-12 14:55
第四章
时间概念与Watermark
flink根据时间产生的位置不同,把时间区分为三种时间概念
事件生成时间(event Time)
数据从终端产生,或者从系统中产生的时间
事件接入时间(Ingestion Time)
数据经过中间件传入到flink之后,在dataSource中接入的时候会生成时间接入时间。
事件处理时间(Processing Time)
数据在各个算子实例执行转换操作过程中,算子实例所在系统的时间为数据处理时间。
解决问题:为什么要区分这三种时间?与流式计算有何关系?
事件时间
可以在flink系统中指定事件时间属性或者设定时间提取器来提取事件时间。
所有进入到flink流式系统处理的事件,时间都是在外部系统中产生,然后经过网络进入到flink系统内处理的。所以所有进入到flink系统的事件,时间都是在外部系统中产生,经过网络进入到flink内部进行处理的。在理论情况下,事件时间对应的时间戳一定会早于在flink系统中产生的时间戳,但在实际情况中往往会出现数据记录乱序,延迟到达等问题。
eventTime的意义就在于,能够借助于时间产生时候的时间来还原事件的先后关系。
接入时间
数据进入flink系统的时间。
依赖于source operator 所在主机的系统时钟。
Ingestion Time 介于Event Time 和 Process Time之间,相对于Process Time,其生成的代价相对高。后续数据处理Operator所在机器的时钟没有关系,从而不会因为某台机器时钟不同步或网络时延而导致计算结果不准确的问题。
相比于Event Time,Ingestion Time不能处理乱序事件,所以也就不用生成对应的Watermarks。
处理时间
数据在操作算子计算过程中获取到的所在主机时间。用户选择用Processing Time,所有和时间相关的计算算子,例如windows计算,会直接使用其所在主机的系统时间。
processing time定义的目的:flink程序性能比较高,延时也相对比较低,对接入到系统中的数据时间相关计算完全交给算子内部决定。窗口计算依赖的时间都是在具体算子运行的过程中产生,不需要做时间上的对比与协调。
但是processing time不擅长处理数据乱序情况。在分布式系统中,数据本身不乱序,如果每台机器的时间不同步,可能导致数据处理过程中数据乱序的问题。
应用:对时间计算精度要求不高的场景。例如统计某些延时非常高的日志数据。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
//env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
eventTime和watermark
水位线究竟有什么作用?
通常由于网络等影响,事件数据往往不能及时传输到flink中,导致数据乱序到达或者延迟到达。
因此需要一种能够控制数据处理的过程和速度的机制 ,用来保证乱序事件中计算结果的正确性。
比如基于事件时间的Window创建后,具体该如何确定属于该Window的数据元素已经全部到达。如果确定全部到达,就可以对Window的所有数据做窗口计算操作(如汇总、分组等),如果数据没有全部到达,则继续等待该窗口中的数据全部到达才开始处理。
作用原理:
会使用读取进入系统的最新事件时间减去固定的时间间隔作为watermark,这个时间间隔为用户外部配置的支持最大延迟到达时间长度。如果事件超过这个时间间隔到达,就被认为迟到事件或者异常事件。
简单来讲,当事件接入到Flink系统时,会在Sources Operator中根据当前最新事件时间产生Watermarks时间戳,记为X,进入到Flink系统中的数据事件时间,记为Y,如果Y
从另外一个角度讲,需要保证所有进入到window的数据元素满足其事件时间Y>=X ,才能触发对窗口内元素的计算。
否则窗口会等待watermark大于窗口结束时间。
编程实践
/**
* This generator generates watermarks assuming that elements come out of order to a certain degree only.
* The latest elements for a certain timestamp t will arrive at most n milliseconds after the earliest
* elements for timestamp t.
*/
class BoundedOutOfOrdernessGenerator extends AssignerWithPeriodicWatermarks[MyEvent] {
val maxOutOfOrderness = 3500L; // 3.5 seconds
var currentMaxTimestamp: Long;
override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): LOng= {
val timestamp = element.getCreationTime()
currentMaxTimestamp = max(timestamp, currentMaxTimestamp)
timestamp;
}
override def getCurrentWatermark(): Watermark = {
// return the watermark as current highest timestamp minus the out-of-orderness bound
new Watermark(currentMaxTimestamp - maxOutOfOrderness);
}
}
程序中有一个extractTimestamp方法,就是根据数据本身的Event time来获取;还有一个getCurrentWatermar方法,是用currentMaxTimestamp - maxOutOfOrderness来获取的。
下面是根据eventTime最大值作为时间水印的实例代码
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.table.expressions.E;
import org.apache.flink.util.Collector;
import javax.annotation.Nullable;
import java.text.SimpleDateFormat;
import java.util.*;
public class watermarkTest {
/**
* 基于事件序列最大值
* @param args
*/
public static void main(String[]args)throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
env.setParallelism(1);
int port = 9091;
DataStream text = env.socketTextStream("localhost",port,"n");
//input map
/**
* input String -> (String,Long)
* input -> (description,timestamp)
*/
DataStream> inputMap = text.map(new MapFunction>() {
@Override
public Tuple2 map(String s) throws Exception {
String [] arr = s.split(",");
return new Tuple2<>(arr[0],Long.parseLong(arr[1]));
}
});
//赋予时间戳然后生成水位线
DataStream> watermarkStream = inputMap.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks>() {
Long currentMaxTimeStamp = 0L;
final Long maxOutOfOrderness = 10000L;
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
@Nullable
@Override
public Watermark getCurrentWatermark() {
return new Watermark(currentMaxTimeStamp-maxOutOfOrderness);
}
@Override
public long extractTimestamp(Tuple2 element, long previousTimeStamp) {
long timestamp = element.f1;
currentMaxTimeStamp = Math.max(currentMaxTimeStamp,timestamp);
assert getCurrentWatermark() != null;
System.out.println("键值 :"+element.f0+",事件事件:[ "+sdf.format(element.f1)+" ],currentMaxTimestamp:[ "+
sdf.format(currentMaxTimeStamp)+" ],水印时间:[ "+sdf.format(getCurrentWatermark().getTimestamp())+" ]");
return timestamp;
}
});
DataStream window = watermarkStream.keyBy(0)
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
.apply(new WindowFunction, String, Tuple, TimeWindow>() {
@Override
public void apply(Tuple tuple, TimeWindow timeWindow, Iterable> iterable, Collector collector) throws Exception {
/*
*对window内数据进行排序,保证数据排序
*用list保存迭代流所有数据,然后排序
*/
String key = tuple.toString();
List arrayList = new ArrayList();
Iterator> it = iterable.iterator();
while (it.hasNext()) {
Tuple2 next = it.next();
arrayList.add(next.f1);
}
Collections.sort(arrayList);
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
String result = "n 键值 : "+ key + "n 触发窗内数据个数 : " + arrayList.size() + "n 触发窗起始数据: " + sdf.format(arrayList.get(0)) + "n 触发窗最后(可能是延时)数据:" +
sdf.format(arrayList.get(arrayList.size() - 1))
+ "n 实际窗起始和结束时间: " + sdf.format(timeWindow.getStart()) + "《----》" + sdf.format(timeWindow.getEnd()) + " n n ";
collector.collect(result);
}
});
window.print();
env.execute("eventtime-watermark");
}
}
分析之前,先牢记下列事实:
1.按照信号发生的顺序,时间是不断增加的,所以在时间序列上若出现事件时间小于时间序列最大值,一般都是延时的事件,时间序列最大值不会改变。
2.每处理一条事件数据,watermark时间就生成一次,后面窗的触发就是依据水印时间。若设置乱序延时为10s,则生成规则就是:水印时间 = 当前最大时间戳 - 最大允许延迟时间
3. 触发条件:
水位线时间 > 窗口结束时间
收集一些对watermark的通俗理解
1.接受的数据就相当于浮在水面的物体,水位线的高度只会升高不会降低,每当一个新数据进来时,会重新计算水位线时间,但是计算结果小于当前水位线时间,则不会更新现有的水位线。 当水位线到达窗口触发时间时才会触发窗口的计算。
watermark的意义在于数据无序传递的时候有一定容错率,如果晚来的数据在容错范围之内,会当做正常传递来处理。
2.接受的数据的时间在水位线以下就可以正常接受,水位线不会更新,超过水位线的,水位线会更新,计算按照水位线为准.
ref
摘自:《Flink原理、实战与性能优化》 — 张利兵
在豆瓣阅读书店查看:https://read.douban.com/ebook/114289022/
本作品由华章数媒授权豆瓣阅读全球范围内电子版制作与发行。
© 版权所有,侵权必究。
秦凯新技术博客
https://juejin.im/post/5bf95810e51d452d705fef33