热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Flink原理,实战与性能优化读书笔记(三)三种时间&水位线机制

Flink原理,实战与性能优化读书笔记(三)三种时间&水位线机制,Go语言社区,Golang程序员人脉社

第四章

时间概念与Watermark

flink根据时间产生的位置不同,把时间区分为三种时间概念

  • 事件生成时间(event Time)
    数据从终端产生,或者从系统中产生的时间
  • 事件接入时间(Ingestion Time)
    数据经过中间件传入到flink之后,在dataSource中接入的时候会生成时间接入时间。
  • 事件处理时间(Processing Time)
    数据在各个算子实例执行转换操作过程中,算子实例所在系统的时间为数据处理时间。

在这里插入图片描述解决问题:为什么要区分这三种时间?与流式计算有何关系?

事件时间

可以在flink系统中指定事件时间属性或者设定时间提取器来提取事件时间。

所有进入到flink流式系统处理的事件,时间都是在外部系统中产生,然后经过网络进入到flink系统内处理的。所以所有进入到flink系统的事件,时间都是在外部系统中产生,经过网络进入到flink内部进行处理的。在理论情况下,事件时间对应的时间戳一定会早于在flink系统中产生的时间戳,但在实际情况中往往会出现数据记录乱序,延迟到达等问题。
eventTime的意义就在于,能够借助于时间产生时候的时间来还原事件的先后关系。

接入时间

数据进入flink系统的时间。

依赖于source operator所在主机的系统时钟。

Ingestion Time 介于Event Time 和 Process Time之间,相对于Process Time,其生成的代价相对高。后续数据处理Operator所在机器的时钟没有关系,从而不会因为某台机器时钟不同步或网络时延而导致计算结果不准确的问题。

相比于Event Time,Ingestion Time不能处理乱序事件,所以也就不用生成对应的Watermarks。

处理时间

数据在操作算子计算过程中获取到的所在主机时间。用户选择用Processing Time,所有和时间相关的计算算子,例如windows计算,会直接使用其所在主机的系统时间。

processing time定义的目的:flink程序性能比较高,延时也相对比较低,对接入到系统中的数据时间相关计算完全交给算子内部决定。窗口计算依赖的时间都是在具体算子运行的过程中产生,不需要做时间上的对比与协调。

但是processing time不擅长处理数据乱序情况。在分布式系统中,数据本身不乱序,如果每台机器的时间不同步,可能导致数据处理过程中数据乱序的问题。

应用:对时间计算精度要求不高的场景。例如统计某些延时非常高的日志数据。

 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
//env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
       

eventTime和watermark

水位线究竟有什么作用?

通常由于网络等影响,事件数据往往不能及时传输到flink中,导致数据乱序到达或者延迟到达。

因此需要一种能够控制数据处理的过程和速度的机制,用来保证乱序事件中计算结果的正确性。

比如基于事件时间的Window创建后,具体该如何确定属于该Window的数据元素已经全部到达。如果确定全部到达,就可以对Window的所有数据做窗口计算操作(如汇总、分组等),如果数据没有全部到达,则继续等待该窗口中的数据全部到达才开始处理。

作用原理:

会使用读取进入系统的最新事件时间减去固定的时间间隔作为watermark,这个时间间隔为用户外部配置的支持最大延迟到达时间长度。如果事件超过这个时间间隔到达,就被认为迟到事件或者异常事件。

简单来讲,当事件接入到Flink系统时,会在Sources Operator中根据当前最新事件时间产生Watermarks时间戳,记为X,进入到Flink系统中的数据事件时间,记为Y,如果Y

从另外一个角度讲,需要保证所有进入到window的数据元素满足其事件时间Y>=X ,才能触发对窗口内元素的计算。
否则窗口会等待watermark大于窗口结束时间。

编程实践

/**
 * This generator generates watermarks assuming that elements come out of order to a certain degree only.
 * The latest elements for a certain timestamp t will arrive at most n milliseconds after the earliest
 * elements for timestamp t.
 */
class BoundedOutOfOrdernessGenerator extends AssignerWithPeriodicWatermarks[MyEvent] {
 
    val maxOutOfOrderness = 3500L; // 3.5 seconds
 
    var currentMaxTimestamp: Long;
 
    override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): LOng= {
        val timestamp = element.getCreationTime()
        currentMaxTimestamp = max(timestamp, currentMaxTimestamp)
        timestamp;
    }
 
    override def getCurrentWatermark(): Watermark = {
        // return the watermark as current highest timestamp minus the out-of-orderness bound
        new Watermark(currentMaxTimestamp - maxOutOfOrderness);
    }
}

程序中有一个extractTimestamp方法,就是根据数据本身的Event time来获取;还有一个getCurrentWatermar方法,是用currentMaxTimestamp - maxOutOfOrderness来获取的。

下面是根据eventTime最大值作为时间水印的实例代码

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.table.expressions.E;
import org.apache.flink.util.Collector;


import javax.annotation.Nullable;
import java.text.SimpleDateFormat;
import java.util.*;


public class watermarkTest {
    /**
     * 基于事件序列最大值
     * @param args
     */
    public static void main(String[]args)throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        //env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
        env.setParallelism(1);
        int port = 9091;
        DataStream text = env.socketTextStream("localhost",port,"n");

        //input map
        /**
         * input String -> (String,Long)
         *        input ->  (description,timestamp)
         */

        DataStream> inputMap = text.map(new MapFunction>() {
            @Override
            public Tuple2 map(String s) throws Exception {
                String [] arr = s.split(",");
                return new Tuple2<>(arr[0],Long.parseLong(arr[1]));
            }
        });


        //赋予时间戳然后生成水位线
        DataStream> watermarkStream = inputMap.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks>() {

            Long currentMaxTimeStamp = 0L;
            final Long maxOutOfOrderness = 10000L;

            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");

            @Nullable
            @Override
            public Watermark getCurrentWatermark() {
                return new Watermark(currentMaxTimeStamp-maxOutOfOrderness);
            }

            @Override
            public long extractTimestamp(Tuple2 element, long previousTimeStamp) {

                long timestamp = element.f1;
                currentMaxTimeStamp = Math.max(currentMaxTimeStamp,timestamp);
                assert getCurrentWatermark() != null;
                System.out.println("键值 :"+element.f0+",事件事件:[ "+sdf.format(element.f1)+" ],currentMaxTimestamp:[ "+
                        sdf.format(currentMaxTimeStamp)+" ],水印时间:[ "+sdf.format(getCurrentWatermark().getTimestamp())+" ]");
                return timestamp;
            }
        });


        DataStream window = watermarkStream.keyBy(0)
                .window(TumblingEventTimeWindows.of(Time.seconds(3)))
                .apply(new WindowFunction, String, Tuple, TimeWindow>() {
                    @Override
                    public void apply(Tuple tuple, TimeWindow timeWindow, Iterable> iterable, Collector collector) throws Exception {

                        /*
                         *对window内数据进行排序,保证数据排序
                         *用list保存迭代流所有数据,然后排序
                         */
                        String key = tuple.toString();
                        List arrayList = new ArrayList();
                        Iterator> it = iterable.iterator();
                        while (it.hasNext()) {
                            Tuple2 next = it.next();
                            arrayList.add(next.f1);
                        }
                        Collections.sort(arrayList);
                        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
                        String result = "n 键值 : "+ key + "n              触发窗内数据个数 : " + arrayList.size() + "n              触发窗起始数据: " + sdf.format(arrayList.get(0)) + "n              触发窗最后(可能是延时)数据:" +
                                sdf.format(arrayList.get(arrayList.size() - 1))
                                + "n              实际窗起始和结束时间: " + sdf.format(timeWindow.getStart()) + "《----》" + sdf.format(timeWindow.getEnd()) + " n n ";

                        collector.collect(result);
                    }
                });

        window.print();
        env.execute("eventtime-watermark");
    }

}

分析之前,先牢记下列事实:

1.按照信号发生的顺序,时间是不断增加的,所以在时间序列上若出现事件时间小于时间序列最大值,一般都是延时的事件,时间序列最大值不会改变。
2.每处理一条事件数据,watermark时间就生成一次,后面窗的触发就是依据水印时间。若设置乱序延时为10s,则生成规则就是:水印时间 = 当前最大时间戳 - 最大允许延迟时间
3. 触发条件:

水位线时间 > 窗口结束时间

收集一些对watermark的通俗理解

1.接受的数据就相当于浮在水面的物体,水位线的高度只会升高不会降低,每当一个新数据进来时,会重新计算水位线时间,但是计算结果小于当前水位线时间,则不会更新现有的水位线。 当水位线到达窗口触发时间时才会触发窗口的计算。
watermark的意义在于数据无序传递的时候有一定容错率,如果晚来的数据在容错范围之内,会当做正常传递来处理。

2.接受的数据的时间在水位线以下就可以正常接受,水位线不会更新,超过水位线的,水位线会更新,计算按照水位线为准.

ref

摘自:《Flink原理、实战与性能优化》 — 张利兵
在豆瓣阅读书店查看:https://read.douban.com/ebook/114289022/
本作品由华章数媒授权豆瓣阅读全球范围内电子版制作与发行。
© 版权所有,侵权必究。

秦凯新技术博客
https://juejin.im/post/5bf95810e51d452d705fef33


推荐阅读
  • 本文回顾了作者在求职阿里和腾讯实习生过程中,从最初的迷茫到最后成功获得Offer的心路历程。文中不仅分享了个人的面试经历,还提供了宝贵的面试准备建议和技巧。 ... [详细]
  • Java虚拟机及其发展历程
    Java虚拟机(JVM)是每个Java开发者日常工作中不可或缺的一部分,但其背后的运作机制却往往显得神秘莫测。本文将探讨Java及其虚拟机的发展历程,帮助读者深入了解这一关键技术。 ... [详细]
  • 问题描述现在,不管开发一个多大的系统(至少我现在的部门是这样的),都会带一个日志功能;在实际开发过程中 ... [详细]
  • 本文详细介绍如何在SSM(Spring + Spring MVC + MyBatis)框架中实现分页功能。包括分页的基本概念、数据准备、前端分页栏的设计与实现、后端分页逻辑的编写以及最终的测试步骤。 ... [详细]
  • 【MySQL】frm文件解析
    官网说明:http:dev.mysql.comdocinternalsenfrm-file-format.htmlfrm是MySQL表结构定义文件,通常frm文件是不会损坏的,但是如果 ... [详细]
  • 本文详细介绍了如何使用C#实现不同类型的系统服务账户(如Windows服务、计划任务和IIS应用池)的密码重置方法。 ... [详细]
  • 视觉Transformer综述
    本文综述了视觉Transformer在计算机视觉领域的应用,从原始Transformer出发,详细介绍了其在图像分类、目标检测和图像分割等任务中的最新进展。文章不仅涵盖了基础的Transformer架构,还深入探讨了各类增强版Transformer模型的设计思路和技术细节。 ... [详细]
  • Vue CLI 基础入门指南
    本文详细介绍了 Vue CLI 的基础使用方法,包括环境搭建、项目创建、常见配置及路由管理等内容,适合初学者快速掌握 Vue 开发环境。 ... [详细]
  • 本文探讨了Python类型注解使用率低下的原因,主要归结于历史背景和投资回报率(ROI)的考量。文章不仅分析了类型注解的实际效用,还回顾了Python类型注解的发展历程。 ... [详细]
  • 深入理解Flink的水印机制
    本文详细探讨了Apache Flink框架中的水印机制,这是一种用于处理数据流中时间不一致问题的重要工具。通过介绍水印的工作原理及其在实际应用中的实现方式,帮助读者更好地理解和利用这一功能。 ... [详细]
  • 汇编语言:编程世界的始祖,连C语言都敬畏三分!
    当C语言还在萌芽阶段时,它首次接触到了汇编语言,并对其简洁性感到震惊。尽管汇编语言的指令极其简单,但它却是所有现代编程语言的基础,其重要性不言而喻。 ... [详细]
  • 汇总了2023年7月7日最新的网络安全新闻和技术更新,包括最新的漏洞披露、工具发布及安全事件。 ... [详细]
  • 本文介绍如何使用R语言中的相关包来解析和转换搜狗细胞词库(.scel格式),并将其导出为CSV文件,以便于后续的数据分析和文本挖掘任务。 ... [详细]
  • Go语言实现文件读取与终端输出
    本文介绍如何使用Go语言编写程序,通过命令行参数指定文件路径,读取文件内容并将其输出到控制台。代码示例中包含了错误处理和资源管理的最佳实践。 ... [详细]
  • 本文探讨了如何在PHP与MySQL环境中实现高效的分页查询,包括基本的分页实现、性能优化技巧以及高级的分页策略。 ... [详细]
author-avatar
哲纸
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有