热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

聊聊flink的SlidingWindow

序本文主要研究一下flink的SlidingWindowSlidingEventTimeWindowsflink-streaming-java_2.11-1.7.0-sources

本文主要研究一下flink的Sliding Window

SlidingEventTimeWindows

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/assigners/SlidingEventTimeWindows.java

@PublicEvolving
public class SlidingEventTimeWindows extends WindowAssigner {private static final long serialVersionUID &#61; 1L;private final long size;private final long slide;private final long offset;protected SlidingEventTimeWindows(long size, long slide, long offset) {if (offset <0 || offset >&#61; slide || size <&#61; 0) {throw new IllegalArgumentException("SlidingEventTimeWindows parameters must satisfy 0 <&#61; offset 0");}this.size &#61; size;this.slide &#61; slide;this.offset &#61; offset;}&#64;Overridepublic Collection assignWindows(Object element, long timestamp, WindowAssignerContext context) {if (timestamp > Long.MIN_VALUE) {List windows &#61; new ArrayList<>((int) (size / slide));long lastStart &#61; TimeWindow.getWindowStartWithOffset(timestamp, offset, slide);for (long start &#61; lastStart;start > timestamp - size;start -&#61; slide) {windows.add(new TimeWindow(start, start &#43; size));}return windows;} else {throw new RuntimeException("Record has Long.MIN_VALUE timestamp (&#61; no timestamp marker). " &#43;"Is the time characteristic set to &#39;ProcessingTime&#39;, or did you forget to call " &#43;"&#39;DataStream.assignTimestampsAndWatermarks(...)&#39;?");}}public long getSize() {return size;}public long getSlide() {return slide;}&#64;Overridepublic Trigger getDefaultTrigger(StreamExecutionEnvironment env) {return EventTimeTrigger.create();}&#64;Overridepublic String toString() {return "SlidingEventTimeWindows(" &#43; size &#43; ", " &#43; slide &#43; ")";}public static SlidingEventTimeWindows of(Time size, Time slide) {return new SlidingEventTimeWindows(size.toMilliseconds(), slide.toMilliseconds(), 0);}public static SlidingEventTimeWindows of(Time size, Time slide, Time offset) {return new SlidingEventTimeWindows(size.toMilliseconds(), slide.toMilliseconds(),offset.toMilliseconds() % slide.toMilliseconds());}&#64;Overridepublic TypeSerializer getWindowSerializer(ExecutionConfig executionConfig) {return new TimeWindow.Serializer();}&#64;Overridepublic boolean isEventTime() {return true;}
}

  • SlidingEventTimeWindows继承了Window&#xff0c;其中元素类型为Object&#xff0c;而窗口类型为TimeWindow&#xff1b;它有三个参数&#xff0c;一个是size&#xff0c;一个是slide&#xff0c;一个是offset&#xff0c;其中offset必须大于等于0&#xff0c;offset必须大于slide&#xff0c;size必须大于0
  • assignWindows方法以slide作为size通过TimeWindow.getWindowStartWithOffset(timestamp, offset, slide)计算lastStart&#xff0c;然后以为start &#43; size > timestamp为循环条件&#xff0c;每次对start减去slide&#xff0c;挨个计算TimeWindow(start, start &#43; size)&#xff1b;getDefaultTrigger方法返回的是EventTimeTrigger&#xff1b;getWindowSerializer方法返回的是TimeWindow.Serializer()&#xff1b;isEventTime返回的为true
  • SlidingEventTimeWindows提供了of静态工厂方法&#xff0c;可以指定size、slide及offset参数&#xff0c;它对于传入的offset参数转为毫秒然后与slide.toMilliseconds()取余作为最后的offset值

SlidingProcessingTimeWindows

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java

public class SlidingProcessingTimeWindows extends WindowAssigner {private static final long serialVersionUID &#61; 1L;private final long size;private final long offset;private final long slide;private SlidingProcessingTimeWindows(long size, long slide, long offset) {if (offset <0 || offset >&#61; slide || size <&#61; 0) {throw new IllegalArgumentException("SlidingProcessingTimeWindows parameters must satisfy 0 <&#61; offset 0");}this.size &#61; size;this.slide &#61; slide;this.offset &#61; offset;}&#64;Overridepublic Collection assignWindows(Object element, long timestamp, WindowAssignerContext context) {timestamp &#61; context.getCurrentProcessingTime();List windows &#61; new ArrayList<>((int) (size / slide));long lastStart &#61; TimeWindow.getWindowStartWithOffset(timestamp, offset, slide);for (long start &#61; lastStart;start > timestamp - size;start -&#61; slide) {windows.add(new TimeWindow(start, start &#43; size));}return windows;}public long getSize() {return size;}public long getSlide() {return slide;}&#64;Overridepublic Trigger getDefaultTrigger(StreamExecutionEnvironment env) {return ProcessingTimeTrigger.create();}&#64;Overridepublic String toString() {return "SlidingProcessingTimeWindows(" &#43; size &#43; ", " &#43; slide &#43; ")";}public static SlidingProcessingTimeWindows of(Time size, Time slide) {return new SlidingProcessingTimeWindows(size.toMilliseconds(), slide.toMilliseconds(), 0);}public static SlidingProcessingTimeWindows of(Time size, Time slide, Time offset) {return new SlidingProcessingTimeWindows(size.toMilliseconds(), slide.toMilliseconds(),offset.toMilliseconds() % slide.toMilliseconds());}&#64;Overridepublic TypeSerializer getWindowSerializer(ExecutionConfig executionConfig) {return new TimeWindow.Serializer();}&#64;Overridepublic boolean isEventTime() {return false;}
}

  • SlidingProcessingTimeWindows继承了Window&#xff0c;其中元素类型为Object&#xff0c;而窗口类型为TimeWindow&#xff1b;它有三个参数&#xff0c;一个是size&#xff0c;一个是slide&#xff0c;一个是offset&#xff0c;其中offset必须大于等于0&#xff0c;offset必须大于slide&#xff0c;size必须大于0
  • assignWindows方法以slide作为size通过TimeWindow.getWindowStartWithOffset(timestamp, offset, slide)计算lastStart(与SlidingEventTimeWindows不同的是SlidingProcessingTimeWindows的这个方法里头使用context.getCurrentProcessingTime()值重置了timestamp)&#xff0c;然后以为start &#43; size > timestamp为循环条件&#xff0c;每次对start减去slide&#xff0c;挨个计算TimeWindow(start, start &#43; size)&#xff1b;getDefaultTrigger方法返回的是ProcessingTimeTrigger&#xff1b;getWindowSerializer方法返回的是TimeWindow.Serializer()&#xff1b;isEventTime返回的为false
  • SlidingEventTimeWindows提供了of静态工厂方法&#xff0c;可以指定size、slide及offset参数&#xff0c;它对于传入的offset参数转为毫秒然后与slide.toMilliseconds()取余作为最后的offset值

小结

  • flink的Sliding Window分为SlidingEventTimeWindows及SlidingProcessingTimeWindows&#xff0c;它们都继承了WindowAssigner&#xff0c;其中元素类型为Object&#xff0c;而窗口类型为TimeWindow&#xff1b;它有三个参数&#xff0c;一个是size&#xff0c;一个是slide&#xff0c;一个是offset&#xff0c;其中offset必须大于等于0&#xff0c;offset必须大于slide&#xff0c;size必须大于0
  • WindowAssigner定义了assignWindows、getDefaultTrigger、getWindowSerializer、isEventTime这几个抽象方法&#xff0c;同时定义了抽象静态类WindowAssignerContext&#xff1b;它有两个泛型&#xff0c;其中T为元素类型&#xff0c;而W为窗口类型&#xff1b;SlidingEventTimeWindows及SlidingProcessingTimeWindows的窗口类型为TimeWindow&#xff0c;它有start及end属性&#xff0c;其中start为inclusive&#xff0c;而end为exclusive&#xff0c;maxTimestamp返回的是end-1&#xff0c;它还提供了mergeWindows及getWindowStartWithOffset静态方法&#xff1b;前者用于合并重叠的时间窗口&#xff0c;后者用于获取指定timestamp、offset、windowSize的window start
  • SlidingEventTimeWindows及SlidingProcessingTimeWindows的不同在于assignWindows、getDefaultTrigger、isEventTime方法&#xff1b;前者assignWindows使用的是参数中的timestamp&#xff0c;而后者使用的是context.getCurrentProcessingTime()&#xff1b;前者的getDefaultTrigger返回的是EventTimeTrigger&#xff0c;而后者返回的是ProcessingTimeTrigger&#xff1b;前者isEventTime方法返回的为true&#xff0c;而后者返回的为false

doc

  • Sliding Windows



推荐阅读
  • 在对WordPress Duplicator插件0.4.4版本的安全评估中,发现其存在跨站脚本(XSS)攻击漏洞。此漏洞可能被利用进行恶意操作,建议用户及时更新至最新版本以确保系统安全。测试方法仅限于安全研究和教学目的,使用时需自行承担风险。漏洞编号:HTB23162。 ... [详细]
  • Python 伦理黑客技术:深入探讨后门攻击(第三部分)
    在《Python 伦理黑客技术:深入探讨后门攻击(第三部分)》中,作者详细分析了后门攻击中的Socket问题。由于TCP协议基于流,难以确定消息批次的结束点,这给后门攻击的实现带来了挑战。为了解决这一问题,文章提出了一系列有效的技术方案,包括使用特定的分隔符和长度前缀,以确保数据包的准确传输和解析。这些方法不仅提高了攻击的隐蔽性和可靠性,还为安全研究人员提供了宝贵的参考。 ... [详细]
  • 优化后的标题:深入探讨网关安全:将微服务升级为OAuth2资源服务器的最佳实践
    本文深入探讨了如何将微服务升级为OAuth2资源服务器,以订单服务为例,详细介绍了在POM文件中添加 `spring-cloud-starter-oauth2` 依赖,并配置Spring Security以实现对微服务的保护。通过这一过程,不仅增强了系统的安全性,还提高了资源访问的可控性和灵活性。文章还讨论了最佳实践,包括如何配置OAuth2客户端和资源服务器,以及如何处理常见的安全问题和错误。 ... [详细]
  • V8不仅是一款著名的八缸发动机,广泛应用于道奇Charger、宾利Continental GT和BossHoss摩托车中。自2008年以来,作为Chromium项目的一部分,V8 JavaScript引擎在性能优化和技术创新方面取得了显著进展。该引擎通过先进的编译技术和高效的垃圾回收机制,显著提升了JavaScript的执行效率,为现代Web应用提供了强大的支持。持续的优化和创新使得V8在处理复杂计算和大规模数据时表现更加出色,成为众多开发者和企业的首选。 ... [详细]
  • 在多年使用Java 8进行新应用开发和现有应用迁移的过程中,我总结了一些非常实用的技术技巧。虽然我不赞同“最佳实践”这一术语,因为它可能暗示了通用的解决方案,但这些技巧在实际项目中确实能够显著提升开发效率和代码质量。本文将深入解析并探讨这四大高级技巧的具体应用,帮助开发者更好地利用Java 8的强大功能。 ... [详细]
  • 在本地环境中部署了两个不同版本的 Flink 集群,分别为 1.9.1 和 1.9.2。近期在尝试启动 1.9.1 版本的 Flink 任务时,遇到了 TaskExecutor 启动失败的问题。尽管 TaskManager 日志显示正常,但任务仍无法成功启动。经过详细分析,发现该问题是由 Kafka 版本不兼容引起的。通过调整 Kafka 客户端配置并升级相关依赖,最终成功解决了这一故障。 ... [详细]
  • Maven Web项目创建时JSP文件常见错误及解决方案
    Maven Web项目创建时JSP文件常见错误及解决方案 ... [详细]
  • 本文介绍了如何利用Struts1框架构建一个简易的四则运算计算器。通过采用DispatchAction来处理不同类型的计算请求,并使用动态Form来优化开发流程,确保代码的简洁性和可维护性。同时,系统提供了用户友好的错误提示,以增强用户体验。 ... [详细]
  • 本指南介绍了如何在ASP.NET Web应用程序中利用C#和JavaScript实现基于指纹识别的登录系统。通过集成指纹识别技术,用户无需输入传统的登录ID即可完成身份验证,从而提升用户体验和安全性。我们将详细探讨如何配置和部署这一功能,确保系统的稳定性和可靠性。 ... [详细]
  • 使用 ListView 浏览安卓系统中的回收站文件 ... [详细]
  • C++ 异步编程中获取线程执行结果的方法与技巧及其在前端开发中的应用探讨
    本文探讨了C++异步编程中获取线程执行结果的方法与技巧,并深入分析了这些技术在前端开发中的应用。通过对比不同的异步编程模型,本文详细介绍了如何高效地处理多线程任务,确保程序的稳定性和性能。同时,文章还结合实际案例,展示了这些方法在前端异步编程中的具体实现和优化策略。 ... [详细]
  • 在使用 Qt 进行 YUV420 图像渲染时,由于 Qt 本身不支持直接绘制 YUV 数据,因此需要借助 QOpenGLWidget 和 OpenGL 技术来实现。通过继承 QOpenGLWidget 类并重写其绘图方法,可以利用 GPU 的高效渲染能力,实现高质量的 YUV420 图像显示。此外,这种方法还能显著提高图像处理的性能和流畅性。 ... [详细]
  • 本文探讨了在任务完成后将其转换为最终状态时的异常处理机制。通过分析 `TaskCompletionSource` 的使用场景,详细阐述了其在异步编程中的重要作用,并提供了具体的实现方法和注意事项,帮助开发者更好地理解和应用这一技术。 ... [详细]
  • 在PHP中实现腾讯云接口签名,以完成人脸核身功能的对接与签名配置时,需要注意将文档中的POST请求改为GET请求。具体步骤包括:使用你的`secretKey`生成签名字符串`$srcStr`,格式为`GET faceid.tencentcloudapi.com?`,确保参数正确拼接,避免因请求方法错误导致的签名问题。此外,还需关注API的其他参数要求,确保请求的完整性和安全性。 ... [详细]
  • 该问题可能由守护进程配置不当引起,例如未识别的JVM选项或内存分配不足。建议检查并调整JVM参数,确保为对象堆预留足够的内存空间(至少1572864KB)。此外,还可以优化应用程序的内存使用,减少不必要的内存消耗。 ... [详细]
author-avatar
糖在嘴里甜在心离_636
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有