热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Flink入门到实战-阶段四(时间和窗口图解)

本文主要介绍关于flink,大数据的知识点,对【Flink入门到实战-阶段四(时间和窗口图解)】和【flink入门及实战第3讲】有兴趣的朋友可以看下由【顶尖高手养成计划】投稿的技术文章,希望该技术和经

本文主要介绍关于flink,大数据的知识点,对【Flink入门到实战-阶段四(时间和窗口图解)】和【flink入门及实战第3讲】有兴趣的朋友可以看下由【顶尖高手养成计划】投稿的技术文章,希望该技术和经验能帮到你解决你所遇的flink相关技术问题。

flink入门及实战第3讲

Flink 中的时间语义

处理时间(Processing Time

处理时间的概念非常简单,就是指执行处理操作的机器的系统时间。

事件时间(Event Time)

事件时间,是指每个事件在对应的设备上发生的时间,也就是数据生成的时间。

另外,除了事件时间和处理时间, Flink 还有一个“摄入时间”( Ingestion Time )的概念, 它是指数据进入 Flink 数据流的时间,也就是 Source 算子读入数据的时间。摄入时间相当于是 事件时间和处理时间的一个中和,它是把 Source 任务的处理时间,当作了数据的产生时间添 加到数据里。这样一来,水位线( watermark )也就基于这个时间直接生成,不需要单独指定 了。这种时间语义可以保证比较好的正确性,同时又不会引入太大的延迟。它的具体行为跟事 件时间非常像,可以当作特殊的事件时间来处理。 水位线

为什么会用到水位线?

主要是解决,分布式数据处理的真确性,如果使用处理时间,那么打比方如果8:59的数据没有在9点的时候到达,那么这个时候8:59的数据就没有办法处理,用到水位线加一定的l乱序延迟那么就可以根据事件的发生时间进行正确的处理

如果出现下游有多个并行子任务的情形,我们 只要将水位线广播出去,就可以通知到所有下游任务当前的时间进度了。 有序流中的水位线(理想情况)

有序流中周期性插入水位线

乱序流中的水位线(实际情况)

乱序流

处理方式,如果后面的数据的时间要比水位线小,那么就是不用改变水位线

但是问题是处理太频繁了,所以这里我们使用的是周期性的处理一批的数据,然后得到最大的水位线用来作为现在的水位线进行往下面进行传播 

上面就会有窗口关闭的问题

 

想解决窗口什么时候关闭,那么就是用到了延迟的机制,就是水位线到2的时候,这个时候我们就是设置为0,就是2的数据到了的时候等2的水位线过来的时候再处理 

下面是一个示例,我们可以使用周期性的方式生成正确的水位线。

      第一个水位线时间戳为 7 ,它表示当前事件时间是 7 秒, 7 秒之前的数据 都已经到齐,之后再也不会有了;同样,第二个、第三个水位线时间戳分别为 12 20 ,表示 11 秒、 20 秒之前的数据都已经到齐,如果有对应的窗口就可以直接关闭了,统计的结果一定 是正确的。这里由于水位线是周期性生成的,所以插入的位置不一定是在时间戳最大的数据后 面。       另外需要注意的是,这里一个窗口所收集的数据,并不是之前所有已经到达的数据。因为 数据属于哪个窗口,是由数据本身的时间戳决定的,一个窗口只会收集真正属于它的那些数据。 也就是说,上图中尽管水位线 W(20) 之前有时间戳为 22 的数据到来, 10~20 秒的窗口中也不 会收集这个数据,进行计算依然可以得到正确的结果。关于窗口的原理,我们会在后面继续展 开讲解。 水位线的特性

      现在我们可以知道,水位线就代表了当前的事件时间时钟,而且可以在数据的时间戳基础
上加一些延迟来保证不丢数据,这一点对于乱序流的正确处理非常重要。
我们可以总结一下水位线的特性:
⚫ 水位线是插入到数据流中的一个标记,可以认为是一个特殊的数据
⚫ 水位线主要的内容是一个时间戳,用来表示当前事件时间的进展

水位线是基于数据的时间戳生成的 水位线的时间戳必须单调递增,以确保任务的事件时间时钟一直向前推进 水位线可以通过设置延迟,来保证正确处理乱序数据 一个水位线 Watermark(t) ,表示在当前流中事件时间已经达到了时间戳 t, 这代表 t 前的所有数据都到齐了,之后流中不会出现时间戳 t’ ≤ t 的数据 水位线是 Flink 流处理中保证结果正确性的核心机制,它往往会跟窗口一起配合,完成对 乱序数据的正确处理。关于这部分内容,我们会稍后进一步展开讲解。 水位线在代码中生成 有序流
stream.assignTimestampsAndWatermarks(
                WatermarkStrategy.
  
   forMonotonousTimestamps() .withTimestampAssigner(new SerializableTimestampAssigner
   
    () { @Override public long extractTimestamp(Event element, long recordTimestamp) { return element.timestamp; } }) );
   
  
乱序流
stream.assignTimestampsAndWatermarks(
                        // 针对乱序流插入水位线,延迟时间设置为 5s
                        WatermarkStrategy.
  
   forBoundedOutOfOrderness(Duration.ofSeconds(5)) .withTimestampAssigner(new SerializableTimestampAssigner
   
    () { // 抽取时间戳的逻辑 @Override public long extractTimestamp(Event element, long recordTimestamp) { return element.timestamp; } }) ).print();
   
  
水位线的传递

自己本身的水位线就是上游分区的最小的水位线对于向下游传递的时候,广播自己的水位线给下游所有的水位线 如果接收到的水位线要比现在的水位线小,那么就不改变自己的水位线 窗口

flink的窗口,根据时间的处理时间放到不同的桶里面,spark没有处理事件乱序过来的能力

窗口的分类 按照驱动类型分类

窗口本身是截取有界数据的一种方式,所以窗口一个非常重要的信息其实就是“怎样截取

数据”。换句话说,就是以什么标准来开始和结束数据的截取,我们把它叫作窗口的“驱动类 型”。 我们最容易想到的就是按照时间段去截取数据,这种窗口就叫作“时间窗口”( Time Window )。这在实际应用中最常见,之前所举的例子也都是时间窗口。除了由时间驱动之外, 窗口其实也可以由数据驱动,也就是说按照固定的个数,来截取一段数据集,这种窗口叫作“计 数窗口”( Count Window

按照窗口分配数据的规则分类

滚动窗口(Tumbling Windows

按时间,或者是信息的数量进行滚动

 滑动窗口(Sliding Windows

下面也是按时间还有信息的数量

 会话窗口(Session Windows

他是根据会话的超时时间来划分,这里就不能用计算的概念了

 全局窗口(Global Windows

要自定义一个触发器才能触发下一个窗口的操作

窗口Api 窗口分配器

时间窗口

滚动处理时间窗口
stream.keyBy(...).window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
                .aggregate(...)
滑动处理时间窗口
stream.keyBy(...).window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
                .aggregate(...)
滚动事件时间窗口
stream.keyBy(...).window(TumblingEventTimeWindows.of(Time.seconds(5))) .aggregate(...)
滑动事件时间窗口
stream.keyBy(...).window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
                .aggregate(...)

计数窗口

滚动计数窗口
stream.keyBy(...)
.countWindow(10)
滑动计数窗口
stream.keyBy(...)
.countWindow(10,3)
窗口函数

流之间的装换

增量聚合函数:就是数据来一条就把结果计算出来,时间到了的时候,直接把结果往后面传递

全量聚合函数:  就是数据一批到时间在处理 

增量聚合函数reduce

pojo

public class Event {
    public String id;

    public String name;

    public Long timeStemp;

    public Event() {
    }

    public Event(String id, String name, Long timeStemp) {
        this.id = id;
        this.name = name;
        this.timeStemp = timeStemp;
    }

    @Override
    public String toString() {
        return "Event{" +
                "id='" + id + '\'' +
                ", name='" + name + '\'' +
                ", timeStemp=" + timeStemp +
                '}';
    }
}

应用程序

public class FlinkApp {
    public static void main(String[] args) throws Exception {
        //得到执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource
  
    initData = env.addSource(new SourceFunction
   
    () { boolean flag = true; String[] names = {"a", "boy", "mary"}; String[] urls = {"/baidu", "xinlang", "google"}; Random random = new Random(); @Override public void run(SourceContext
    
      ctx) throws Exception { while (flag) { Thread.sleep(1000); ctx.collect(new Event( urls[random.nextInt(3)], names[random.nextInt(3)], Calendar.getInstance().getTimeInMillis() )); } } @Override public void cancel() { flag = false; } }); //对于初始的数据设置水位线 SingleOutputStreamOperator
     
       watermarksData = initData.assignTimestampsAndWatermarks( // 针对乱序流插入水位线,延迟时间设置为 10s WatermarkStrategy.
      
       forBoundedOutOfOrderness(Duration.ofSeconds(5)) .withTimestampAssigner(new SerializableTimestampAssigner
       
        () { // 抽取时间戳的逻辑 @Override public long extractTimestamp(Event element, long recordTimestamp) { return element.timeStemp; } }) ); //对于数据进行map操作用于后面的聚合 KeyedStream
        
         , String> keyedStream = watermarksData.map(new MapFunction
         
          >() { @Override public Tuple2
          
            map(Event value) throws Exception { return Tuple2.of(value.name, 1); } }).keyBy(new KeySelector
           
            , String>() { @Override public String getKey(Tuple2
            
              value) throws Exception { return value.f0; } }); //对于分组以后的数据进行开窗处理,滑动窗口的大小为5秒 keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(10))) .reduce(new ReduceFunction
             
              >() { @Override public Tuple2
              
                reduce(Tuple2
               
                 value1, Tuple2
                
                  value2) throws Exception { return Tuple2.of(value1.f0,value1.f1+value2.f1); } }).print(); env.execute(); } }
                
               
              
             
            
           
          
         
        
       
      
     
    
   
  
预聚合函数aggregate
public class FlinkApp {
    public static void main(String[] args) throws Exception {
        //得到执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource
  
    initData = env.addSource(new SourceFunction
   
    () { boolean flag = true; String[] names = {"a", "boy", "mary"}; String[] urls = {"/baidu", "xinlang", "google"}; Random random = new Random(); @Override public void run(SourceContext
    
      ctx) throws Exception { while (flag) { Thread.sleep(1000); ctx.collect(new Event( urls[random.nextInt(3)], names[random.nextInt(3)], Calendar.getInstance().getTimeInMillis() )); } } @Override public void cancel() { flag = false; } }); //对于初始的数据设置水位线 SingleOutputStreamOperator
     
       watermarksData = initData.assignTimestampsAndWatermarks( // 针对乱序流插入水位线,延迟时间设置为 10s WatermarkStrategy.
      
       forBoundedOutOfOrderness(Duration.ofSeconds(5)) .withTimestampAssigner(new SerializableTimestampAssigner
       
        () { // 抽取时间戳的逻辑 @Override public long extractTimestamp(Event element, long recordTimestamp) { return element.timeStemp; } }) ); //对于数据进行map操作用于后面的聚合 KeyedStream
        
         , String> keyedStream = watermarksData.map(new MapFunction
         
          >() { @Override public Tuple2
          
            map(Event value) throws Exception { return Tuple2.of(value.name, 1); } }).keyBy(new KeySelector
           
            , String>() { @Override public String getKey(Tuple2
            
              value) throws Exception { return value.f0; } }); //对于分组以后的数据进行开窗处理,滑动窗口的大小为10秒 keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(10))) //窗口规约聚合,这里也就是窗口里面数据的预聚合的功能 .aggregate(new AggregateFunction
             
              , Tuple2
              
               , Tuple2
               
                >() { //初始化中间的状态 @Override public Tuple2
                
                  createAccumulator() { return Tuple2.of("init",0); } //窗口里面每来一个数据就调用一次,第一个数据是新数据,第二个参数就是累加器 @Override public Tuple2
                 
                   add(Tuple2
                  
                    value, Tuple2
                   
                     accumulator) { return Tuple2.of(value.f0,accumulator.f1+value.f1); } //窗口触发的时候调用 @Override public Tuple2
                    
                      getResult(Tuple2
                     
                       accumulator) { return accumulator; } //会话窗口的时候会用到合并的功能,就是两个累加器合并,如果不是会话窗口就不用实现 @Override public Tuple2
                      
                        merge(Tuple2
                       
                         a, Tuple2
                        
                          b) { return Tuple2.of(a.f0,a.f1+b.f1); } }).print(); env.execute(); } }
                        
                       
                      
                     
                    
                   
                  
                 
                
               
              
             
            
           
          
         
        
       
      
     
    
   
  
全窗口函数process

就是窗口触发以后,然后所有的数据过来一块处理,上面的预聚合就是窗口里面的数据来一条处理一条,窗口触发的时候就输出结果

public class FlinkApp {
    public static void main(String[] args) throws Exception {
        //得到执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource
  
    initData = env.addSource(new SourceFunction
   
    () { boolean flag = true; String[] names = {"a", "boy", "mary"}; String[] urls = {"/baidu", "xinlang", "google"}; Random random = new Random(); @Override public void run(SourceContext
    
      ctx) throws Exception { while (flag) { Thread.sleep(1000); ctx.collect(new Event( urls[random.nextInt(3)], names[random.nextInt(3)], Calendar.getInstance().getTimeInMillis() )); } } @Override public void cancel() { flag = false; } }); //对于初始的数据设置水位线 SingleOutputStreamOperator
     
       watermarksData = initData.assignTimestampsAndWatermarks( // 针对乱序流插入水位线,延迟时间设置为 10s WatermarkStrategy.
      
       forBoundedOutOfOrderness(Duration.ofSeconds(5)) .withTimestampAssigner(new SerializableTimestampAssigner
       
        () { // 抽取时间戳的逻辑 @Override public long extractTimestamp(Event element, long recordTimestamp) { return element.timeStemp; } }) ); //对于数据进行map操作用于后面的聚合 KeyedStream
        
         , String> keyedStream = watermarksData.map(new MapFunction
         
          >() { @Override public Tuple2
          
            map(Event value) throws Exception { return Tuple2.of(value.name, 1); } }).keyBy(new KeySelector
           
            , String>() { @Override public String getKey(Tuple2
            
              value) throws Exception { return value.f0; } }); //对于分组以后的数据进行开窗处理,滑动窗口的大小为10秒 keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(10))) // IN, OUT, KEY, W .process(new ProcessWindowFunction
             
              , String, String, TimeWindow>() { //第一个参数是key第二个参数是窗口触发以后过来的所有的数据,第三个参数是上下文对象 //第四个参数是传递给下游的收集器 @Override public void process(String s, Context context, Iterable
              
               > elements, Collector
               
                 out) throws Exception { Integer sum=0; //对于窗口里面过来的数据进行累加 for (Tuple2
                
                  element : elements) { sum += element.f1; } //得到窗口的开始时间和结束的时间 long start = context.window().getStart(); long end = context.window().getEnd(); String res="key: "+s+" start: "+start+" end: "+end+" "+"sum: "+sum; out.collect(res); } }).print(); env.execute(); } }
                
               
              
             
            
           
          
         
        
       
      
     
    
   
  

输出的结果

11> key: a start: 1657974850000 end: 1657974860000 sum: 4
14> key: boy start: 1657974850000 end: 1657974860000 sum: 1
11> key: mary start: 1657974850000 end: 1657974860000 sum: 2
aggregate和process结合使用
public class FlinkApp {
    public static void main(String[] args) throws Exception {
        //得到执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource
  
    initData = env.addSource(new SourceFunction
   
    () { boolean flag = true; String[] names = {"a", "boy", "mary"}; String[] urls = {"/baidu", "xinlang", "google"}; Random random = new Random(); @Override public void run(SourceContext
    
      ctx) throws Exception { while (flag) { Thread.sleep(1000); ctx.collect(new Event( urls[random.nextInt(3)], names[random.nextInt(3)], Calendar.getInstance().getTimeInMillis() )); } } @Override public void cancel() { flag = false; } }); //对于初始的数据设置水位线 SingleOutputStreamOperator
     
       watermarksData = initData.assignTimestampsAndWatermarks( // 针对乱序流插入水位线,延迟时间设置为 10s WatermarkStrategy.
      
       forBoundedOutOfOrderness(Duration.ofSeconds(5)) .withTimestampAssigner(new SerializableTimestampAssigner
       
        () { // 抽取时间戳的逻辑 @Override public long extractTimestamp(Event element, long recordTimestamp) { return element.timeStemp; } }) ); //对于数据进行map操作用于后面的聚合 KeyedStream
        
         , String> keyedStream = watermarksData.map(new MapFunction
         
          >() { @Override public Tuple2
          
            map(Event value) throws Exception { return Tuple2.of(value.name, 1); } }).keyBy(new KeySelector
           
            , String>() { @Override public String getKey(Tuple2
            
              value) throws Exception { return value.f0; } }); //对于分组以后的数据进行开窗处理,滑动窗口的大小为10秒 keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(10))) .aggregate(new AggregateFunction
             
              , Integer, Integer>() { @Override public Integer createAccumulator() { return 0; } @Override public Integer add(Tuple2
              
                value, Integer accumulator) { return accumulator + value.f1; } @Override public Integer getResult(Integer accumulator) { return accumulator; } @Override public Integer merge(Integer a, Integer b) { return a + b; } //第一个参数就是前面aggregate的增量聚合函数过来的值 //第二个参数就是返回的值 //第三个参数就是key //第四个参数默认TimeWindow }, new ProcessWindowFunction
               
                () { @Override public void process(String s, ProcessWindowFunction
                
                 .Context context, Iterable
                 
                   elements, Collector
                  
                    out) throws Exception { //因为窗口触发以后就是在aggregate的作用下,过来的数据就是一个那么我们直接next Integer next = elements.iterator().next(); //得到窗口的开始时间和结束的时间 long start = context.window().getStart(); long end = context.window().getEnd(); String res="key: "+s+" start: "+start+" end: "+end+" "+"sum: "+next; out.collect(res); } }).print(); env.execute(); } }
                  
                 
                
               
              
             
            
           
          
         
        
       
      
     
    
   
  
处理迟到数据(加深窗口和水位线的认识)

应用测试

public class FlinkApp {
    public static void main(String[] args) throws Exception {
        //得到执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource
  
    socketTextStream = env.socketTextStream("master", 9999); //并行度设置为1才能看到效果,因为如果不为1,那么有些分区的水位线就是负无穷 //由于自己的水位线是分区里面最小的水位线,那么自己的一直都是负无穷 //就触发不了水位线的上升 env.setParallelism(1); //第一个参数就一个名字,第二个参数用来表示事件时间 SingleOutputStreamOperator
   
    > initData = socketTextStream.map(new MapFunction
    
     >() { @Override public Tuple2
     
       map(String value) throws Exception { String[] s = value.split(" "); //假设我们在控制台输入的参数是a 15s,那么我们要15*1000才能得到时间戳的毫秒时间 return Tuple2.of(s[0], Long.parseLong(s[1]) * 1000L); } }); //设置水位线 SingleOutputStreamOperator
      
       > watermarks = initData.assignTimestampsAndWatermarks( // 针对乱序流插入水位线,延迟时间设置为 2s WatermarkStrategy.
       
        >forBoundedOutOfOrderness(Duration.ofSeconds(2)) .withTimestampAssigner(new SerializableTimestampAssigner
        
         >() { @Override public long extractTimestamp(Tuple2
         
           element, long recordTimestamp) { //指定事件时间 return element.f1; } }) ); //定义一个侧输出流的标识 OutputTag
          
           > outputTag = new OutputTag
           
            >("late") { }; SingleOutputStreamOperator
            
             > result = watermarks.keyBy(data -> data.f0) //窗口的大小为10s,注意这里是事件时间 .window(TumblingEventTimeWindows.of(Time.seconds(10))) //定义窗口关闭延迟,就是允许最大的迟到数据,由于上面设置的最大延迟为2s,在加上这个2s那么就是 //允许最大的迟到数据为4秒 .allowedLateness(Time.seconds(2)) //使用定义的标识 .sideOutputLateData(outputTag) .reduce(new ReduceFunction
             
              >() { @Override public Tuple2
              
                reduce(Tuple2
               
                 value1, Tuple2
                
                  value2) throws Exception { return Tuple2.of(value1.f0, value2.f1 + value1.f1); } }); // .aggregate();下面就可以定义处理函数进行处理 result.print("result"); //得到侧输出流的数据 DataStream
                 
                  > sideOutput = result.getSideOutput(outputTag); sideOutput.print("late"); env.execute(); } } 
                 
                
               
              
             
            
           
          
         
        
       
      
     
    
   
  

在linux里面使用nc

nc -lk 9999

然后输入

a 1
a 15
a 2

得到的结果

result> (a,1000)
late> (a,2000)

原理图

 

本文《Flink入门到实战-阶段四(时间和窗口图解)》版权归顶尖高手养成计划所有,引用Flink入门到实战-阶段四(时间和窗口图解)需遵循CC 4.0 BY-SA版权协议。


推荐阅读
author-avatar
人走茶凉
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有