热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

ApacheFlink漫谈系列Watermark是个啥?

实际问题(乱序)在介绍Watermark相关内容之前我们先抛出一个具体的问题,在实际的流

实际问题(乱序)

在介绍Watermark相关内容之前我们先抛出一个具体的问题,在实际的流式计算中数据到来的顺序对计算结果的正确性有至关重要的影响,比如:某数据源中的某些数据由于某种原因(如:网络原因,外部存储自身原因)会有5秒的延时,也就是在实际时间的第1秒产生的数据有可能在第5秒中产生的数据之后到来(比如到Window处理节点).选具体某个delay的元素来说,假设在一个5秒的Tumble窗口(详见后续Window篇介绍),有一个EventTime是 11秒的数据,在第16秒时候到来了。图示第11秒的数据,在16秒到来了,如下图:



那么对于一个Count聚合的Tumble(5s)的window,上面的情况如何处理才能window2=4,window3=2 呢?


Apache Flink的时间类型

开篇我们描述的问题是一个很常见的TimeWindow中数据乱序的问题,乱序是相对于事件产生时间和到达Apache Flink 实际处理算子的顺序而言的,关于时间在Apache Flink中有如下三种时间类型,如下图:


  • ProcessingTime 

    是数据流入到具体某个算子时候相应的系统时间。ProcessingTime 有最好的性能和最低的延迟。但在分布式计算环境中ProcessingTime具有不确定性,相同数据流多次运行有可能产生不同的计算结果。


  • IngestionTime

    IngestionTime是数据进入Apache Flink框架的时间,是在Source Operator中设置的。与ProcessingTime相比可以提供更可预测的结果,因为IngestionTime的时间戳比较稳定(在源处只记录一次),同一数据在流经不同窗口操作时将使用相同的时间戳,而对于ProcessingTime同一数据在流经不同窗口算子会有不同的处理时间戳。


  • EventTime

    EventTime是事件在设备上产生时候携带的。在进入Apache Flink框架之前EventTime通常要嵌入到记录中,并且EventTime也可以从记录中提取出来。在实际的网上购物订单等业务场景中,大多会使用EventTime来进行数据计算。


开篇描述的问题和本篇要介绍的Watermark所涉及的时间类型均是指EventTime类型。


什么是Watermark

Watermark是Apache Flink为了处理EventTime 窗口计算提出的一种机制,本质上也是一种时间戳,由Apache Flink Source或者自定义的Watermark生成器按照需求Punctuated或者Periodic两种方式生成的一种系统Event,与普通数据流Event一样流转到对应的下游算子,接收到Watermark Event的算子以此不断调整自己管理的EventTime clock。Apache Flink 框架保证Watermark单调递增,算子接收到一个Watermark时候,框架知道不会再有任何小于该Watermark的时间戳的数据元素到来了,所以Watermark可以看做是告诉Apache Flink框架数据流已经处理到什么位置(时间维度)的方式。Watermark的产生和Apache Flink内部处理逻辑如下图所示: 



Watermark的产生方式

目前Apache Flink 有两种生产Watermark的方式,如下:


  • Punctuated

    数据流中每一个递增的EventTime都会产生一个Watermark。 

    在实际的生产中Punctuated方式在TPS很高的场景下会产生大量的Watermark在一定程度上对下游算子造成压力,所以只有在实时性要求非常高的场景才会选择Punctuated的方式进行Watermark的生成。

  • Periodic

    周期性的(一定时间间隔或者达到一定的记录条数)产生一个Watermark。在实际的生产中Periodic的方式必须结合时间和积累条数两个维度继续周期性产生Watermark,否则在极端情况下会有很大的延时。


所以Watermark的生成方式需要根据业务场景的不同进行不同的选择。


Watermark的接口定义

对应Apache Flink Watermark两种不同的生成方式,我们了解一下对应的接口定义,如下:


  • Periodic Watermarks - AssignerWithPeriodicWatermarks


    /**
    * Returns the current watermark. This method is periodically called by the
    * system to retrieve the current watermark. The method may return {@code null} to
    * indicate that no new Watermark is available.
    *
    * <p>The returned watermark will be emitted only if it is non-null and itsTimestamp
    * is larger than that of the previously emitted watermark (to preserve the contract of
    * ascending watermarks). If the current watermark is still
    * identical to the previous one, no progress in EventTime has happened since
    * the previous call to this method. If a null value is returned, or theTimestamp
    * of the returned watermark is smaller than that of the last emitted one, then no
    * new watermark will be generated.
    *
    * <p>The interval in which this method is called and Watermarks are generated
    * depends on {@link ExecutionConfig#getAutoWatermarkInterval()}.
    *
    * @see org.Apache.flink.streaming.api.watermark.Watermark
    * @see ExecutionConfig#getAutoWatermarkInterval()
    *
    * @return {@code Null}, if no watermark should be emitted, or the next watermark to emit.
    */
    @Nullable
    Watermark getCurrentWatermark();


    • Punctuated Watermarks - AssignerWithPunctuatedWatermarks 


      public interface AssignerWithPunctuatedWatermarks<T> extends TimestampAssigner<T> {


      /**
      * Asks this implementation if it wants to emit a watermark. This method is called right after
      * the {@link #extractTimestamp(Object, long)} method.
      *
      *

      The returned watermark will be emitted only if it is non-null and its timestamp
      * is larger than that of the previously emitted watermark (to preserve the contract of
      * ascending watermarks). If a null value is returned, or the timestamp of the returned
      * watermark is smaller than that of the last emitted one, then no new watermark will
      * be generated.
      *
      *

      For an example how to use this method, see the documentation of
      * {@link AssignerWithPunctuatedWatermarks this class}.
      *
      * @return {@code Null}, if no watermark should be emitted, or the next watermark to emit.
      */
      @Nullable
      Watermark checkAndGetNextWatermark(T lastElement, long extractedTimestamp);
      }

      AssignerWithPunctuatedWatermarks 继承了TimestampAssigner接口 -TimestampAssigner


        public interface TimestampAssigner<T> extends Function {


        /**
        * Assigns a timestamp to an element, in milliseconds since the Epoch.
        *
        *

        The method is passed the previously assigned timestamp of the element.
        * That previous timestamp may have been assigned from a previous assigner,
        * by ingestion time. If the element did not carry a timestamp before, this value is
        * {@code Long.MIN_VALUE}.
        *
        * @param element The element that the timestamp will be assigned to.
        * @param previousElementTimestamp The previous internal timestamp of the element,
        * or a negative value, if no timestamp has been assigned yet.
        * @return The new timestamp.
        */
        long extractTimestamp(T element, long previousElementTimestamp);
        }

        从接口定义可以看出,Watermark可以在Event(Element)中提取EventTime,进而定义一定的计算逻辑产生Watermark的时间戳。


        Watermark解决如上问题

        从上面的Watermark生成接口和Apache Flink内部对Periodic Watermark的实现来看,Watermark的时间戳可以和Event中的EventTime 一致,也可以自己定义任何合理的逻辑使得Watermark的时间戳不等于Event中的EventTime,Event中的EventTime自产生那一刻起就不可以改变了,不受Apache Flink框架控制,而Watermark的产生是在Apache Flink的Source节点或实现的Watermark生成器计算产生(如上Apache Flink内置的 Periodic Watermark实现), Apache Flink内部对单流或多流的场景有统一的Watermark处理。


        回过头来我们在看看Watermark机制如何解决上面的问题,上面的问题在于如何将迟来的EventTime 位11的元素正确处理。要解决这个问题我们还需要先了解一下EventTime window是如何触发的?EventTime window 计算条件是当Window计算的Timer时间戳 小于等于 当前系统的Watermak的时间戳时候进行计算。 


        • 当Watermark的时间戳等于Event中携带的EventTime时候,上面场景(Watermark=EventTime)的计算结果如下:

            上面对应的DDL定义如下:

         

          create table t1(
            ts timestamp(3),
            other bigint,
            WATERMARK FOR ts AS ts
          ) with (
            'connector' = 'xx'
          )


           

          • 如果想正确处理迟来的数据可以定义Watermark生成策略为 Watermark = EventTime -5s, 如下:


            上面对应的DDL定义如下: 


            create table t1(
            ts timestamp(3),
            other bigint,
              WATERMARK FOR ts AS ts - interval '5' SECOND
            ) with (
            'connector' = 'xx'
            )


            上面正确处理的根源是我们采取了 延迟触发 window 计算 的方式正确处理了 Late Event. 与此同时,我们发现window的延时触发计算,也导致了下游的LATENCY变大,本例子中下游得到window的结果就延迟了5s.


            多流的Watermark处理

            在实际的流计算中往往一个job中会处理多个Source的数据,对Source的数据进行GroupBy分组,那么来自不同Source的相同key值会shuffle到同一个处理节点,并携带各自的Watermark,Apache Flink内部要保证Watermark要保持单调递增,多个Source的Watermark汇聚到一起时候可能不是单调自增的,这样的情况Apache Flink内部是如何处理的呢?如下图所示:


             

            Apache Flink内部实现每一个边上只能有一个递增的Watermark, 当出现多流携带Eventtime汇聚到一起(Join or Union)时候,Apache Flink会选择所有流入的Eventtime中最小min(stream1, stream2...streamN)的一个向下游流出。从而保证watermark的单调递增和保证数据的完整性.如下图:

             

             

            小结

            本节以一个流计算常见的乱序问题介绍了Apache Flink如何利用Watermark机制来处理乱序问题. 本篇内容在一定程度上也体现了EventTime Window中的Trigger机制依赖了Watermark(后续Window篇章会介绍)。Watermark机制是流计算中处理乱序,正确处理Late Event的核心手段。更多细节欢迎关注《Apache Flink 知其然,知其所以然》系列视频课程!


            订阅号&知识星球【免费】

            分享是最好的享受,予人成功是最大的成功,一个人最大的开心不源于自己会什么,而源于能让别人擅长什么,无欲无求,但予人所求! 



            More about Me...

            我坚信:

            "才者,德之资也,德者,才之帅也!"



            推荐阅读
            • 本文讨论了如何使用GStreamer来删除H264格式视频文件中的中间部分,而不需要进行重编码。作者提出了使用gst_element_seek(...)函数来实现这个目标的思路,并提到遇到了一个解决不了的BUG。文章还列举了8个解决方案,希望能够得到更好的思路。 ... [详细]
            • 本文介绍了OpenStack的逻辑概念以及其构成简介,包括了软件开源项目、基础设施资源管理平台、三大核心组件等内容。同时还介绍了Horizon(UI模块)等相关信息。 ... [详细]
            • 本文讨论了在使用Git进行版本控制时,如何提供类似CVS中自动增加版本号的功能。作者介绍了Git中的其他版本表示方式,如git describe命令,并提供了使用这些表示方式来确定文件更新情况的示例。此外,文章还介绍了启用$Id:$功能的方法,并讨论了一些开发者在使用Git时的需求和使用场景。 ... [详细]
            • vue使用
              关键词: ... [详细]
            • Linux服务器密码过期策略、登录次数限制、私钥登录等配置方法
              本文介绍了在Linux服务器上进行密码过期策略、登录次数限制、私钥登录等配置的方法。通过修改配置文件中的参数,可以设置密码的有效期、最小间隔时间、最小长度,并在密码过期前进行提示。同时还介绍了如何进行公钥登录和修改默认账户用户名的操作。详细步骤和注意事项可参考本文内容。 ... [详细]
            • 在Android开发中,使用Picasso库可以实现对网络图片的等比例缩放。本文介绍了使用Picasso库进行图片缩放的方法,并提供了具体的代码实现。通过获取图片的宽高,计算目标宽度和高度,并创建新图实现等比例缩放。 ... [详细]
            • 本文介绍了brain的意思、读音、翻译、用法、发音、词组、同反义词等内容,以及脑新东方在线英语词典的相关信息。还包括了brain的词汇搭配、形容词和名词的用法,以及与brain相关的短语和词组。此外,还介绍了与brain相关的医学术语和智囊团等相关内容。 ... [详细]
            • Java序列化对象传给PHP的方法及原理解析
              本文介绍了Java序列化对象传给PHP的方法及原理,包括Java对象传递的方式、序列化的方式、PHP中的序列化用法介绍、Java是否能反序列化PHP的数据、Java序列化的原理以及解决Java序列化中的问题。同时还解释了序列化的概念和作用,以及代码执行序列化所需要的权限。最后指出,序列化会将对象实例的所有字段都进行序列化,使得数据能够被表示为实例的序列化数据,但只有能够解释该格式的代码才能够确定数据的内容。 ... [详细]
            • Python正则表达式学习记录及常用方法
              本文记录了学习Python正则表达式的过程,介绍了re模块的常用方法re.search,并解释了rawstring的作用。正则表达式是一种方便检查字符串匹配模式的工具,通过本文的学习可以掌握Python中使用正则表达式的基本方法。 ... [详细]
            • 成功安装Sabayon Linux在thinkpad X60上的经验分享
              本文分享了作者在国庆期间在thinkpad X60上成功安装Sabayon Linux的经验。通过修改CHOST和执行emerge命令,作者顺利完成了安装过程。Sabayon Linux是一个基于Gentoo Linux的发行版,可以将电脑快速转变为一个功能强大的系统。除了作为一个live DVD使用外,Sabayon Linux还可以被安装在硬盘上,方便用户使用。 ... [详细]
            • 怎么在PHP项目中实现一个HTTP断点续传功能发布时间:2021-01-1916:26:06来源:亿速云阅读:96作者:Le ... [详细]
            • 纠正网上的错误:自定义一个类叫java.lang.System/String的方法
              本文纠正了网上关于自定义一个类叫java.lang.System/String的错误答案,并详细解释了为什么这种方法是错误的。作者指出,虽然双亲委托机制确实可以阻止自定义的System类被加载,但通过自定义一个特殊的类加载器,可以绕过双亲委托机制,达到自定义System类的目的。作者呼吁读者对网上的内容持怀疑态度,并带着问题来阅读文章。 ... [详细]
            • 基于Socket的多个客户端之间的聊天功能实现方法
              本文介绍了基于Socket的多个客户端之间实现聊天功能的方法,包括服务器端的实现和客户端的实现。服务器端通过每个用户的输出流向特定用户发送消息,而客户端通过输入流接收消息。同时,还介绍了相关的实体类和Socket的基本概念。 ... [详细]
            • ***byte(字节)根据长度转成kb(千字节)和mb(兆字节)**parambytes*return*publicstaticStringbytes2kb(longbytes){ ... [详细]
            • 本文介绍了关于Java异常的八大常见问题,包括异常管理的最佳做法、在try块中定义的变量不能用于catch或finally的原因以及为什么Double.parseDouble(null)和Integer.parseInt(null)会抛出不同的异常。同时指出这些问题是由于不同的开发人员开发所导致的,不值得过多思考。 ... [详细]
            author-avatar
            哪来的咸鱼
            这个家伙很懒,什么也没留下!
            PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
            Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有