热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

【Flink】处理函数Process

目录处理函数基本处理函数ProcessFunction处理函数的功能ProcessFunction解析处理函数的分类按键分区处理函数KeyedProcessFunction定时器T


目录

  • 处理函数
    • 基本处理函数 ProcessFunction
      • 处理函数的功能
      • ProcessFunction解析
      • 处理函数的分类
    • 按键分区处理函数 KeyedProcessFunction
      • 定时器Timer 和定时服务 TimerService
    • 窗口处理函数
    • 应用案例 TopN
    • 侧输出流

整体框架:
在这里插入图片描述


处理函数


基本处理函数 ProcessFunction


处理函数的功能

ProcessFunction:能拿到别的API拿不到的东西。

处理函数提供了一个定时服务TimerService,可以用它访问流中的事件event、时间戳timestamp,水位线watermark,注册定时事件。

处理函数继承自AbstractRichFunction,拥有富函数类的特性,可以访问状态state和其他运行时信息。
处理函数可以直接将数据输出到侧输出流side output中。

处理函数是DataStream API的底层逻辑。


ProcessFunction解析

ProcessFunction类中:
1. processElement(输入类型,上下文,输入类型Collector)上下文中能获取到:时间戳、侧输出流、"timerService"timerService中能获取到:currentProcessingTime处理时间、currentWatermark事件时间、registerProcessingTimeTimer注册定时器、删除定时器。2. onTimer()注册了定时器后,到点时会触发这个回调,这是定时到了后的处理方法。但是,只有基于KeyedStream才能定义定时器。

处理函数的分类

1. ProcessFunction
2. KeyedProcessFunction----"重要"
3. ProcessWindowFunction
4. ProcessAllWindowFunction
5. CoProcessFunction
6. ProcessJoinFunction
7. BroadcastProcessFunction
8. KeyedBroadcastProcessFunction

按键分区处理函数 KeyedProcessFunction

stream.keyBy().process(new MyKeyedProcessFunction())

定时器Timer 和定时服务 TimerService

处理时间:

stream.keyBy(data->data.user).process(new KeyedProcessFunction<String, Event, String>(){&#64;Overridepublic void processElement(Event value, Context ctx, Collector<String> out) throws Exception(){Long currTs &#61; ctx.timerService().currentProcessingTime();out.collect(ctx.getCurrentKey() &#43; " 数据到达&#xff0c;到达时间&#xff1a;" &#43; new Timestamp(currTs));// 注册一个10s后的定时器ctx.timerService().registerProcessingTimeTimer(currTs &#43; 10 * 1000L);}&#64;Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception() {out.collect(ctx.getCurrentKey() &#43; " 定时器触发&#xff0c;触发时间&#xff1a;"&#43;new Timestamp(timestamp));}})

事件时间&#xff1a;

stream.keyBy(data->data.user).process(new KeyedProcessFunction<String, Event, String>(){&#64;Overridepublic void processElement(Event value, Context ctx, Collector<String> out) throws Exception(){Long currTs &#61; ctx.timestamp();out.collect(ctx.getCurrentKey() &#43; " 数据到达&#xff0c;时间戳&#xff1a;" &#43; new Timestamp(currTs) &#43; " watermark&#xff1a;"&#43; ctx.timerService().currentWatermark());// 注册一个10s后的定时器ctx.timerService().registerProcessingTimeTimer(currTs &#43; 10 * 1000L);}&#64;Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception() {out.collect(ctx.getCurrentKey() &#43; " 定时器触发&#xff0c;触发时间&#xff1a;"&#43;new Timestamp(timestamp) &#43; " watermark&#xff1a;"&#43; ctx.timerService().currentWatermark());}})

窗口处理函数

窗口处理函数的使用
ProcessWindowFunction解析


应用案例 TopN

要求&#xff1a;统计每隔10s的最受欢迎的URL的前两名&#xff0c;每隔5s更新一次结果。

使用ProcessAllWindowFunction
该方法数据量大的时候&#xff0c;把所有数据放在一个窗口里&#xff0c;不靠谱。

使用KeyedProcessFunction

// 1.按照URL分组&#xff0c;统计窗口内每个URL的访问量
SingleOutputStreamOperator<UrlViewCount> urlCountStream &#61; stream.keyBy(data -> data.url).window(SlidingEventTimeWindows.of(Time.seconds(10),Time.seconds(5))).aggregate(new UrlCountViewExample.UrlViewCountAgg(), new UrlCountViewExample.xxxx());// 2.对于同一窗口统计出的访问量&#xff0c;进行收集和排序
urlCountStream.keyBy(data -> data.windowEnd) // 窗口关闭时钟.process(new TopNProcessResult(2)).print();// 实现TopNProcessResult
public static class TopNProcessResult extends KeyedProcessFunction<Long, UrlViewCount, String>{// 定义属性Nprivate int n;// 定义列表状态private ListState<UrlViewCount> urlViewCountListState;public TopNProcessResult(int n){this.n &#61; n;}// 在运行环境中获取状态&#64;Overridepublic void open(Configuration parameters) throws Exception{urlViewCountListState &#61; getRuntimeContext().getListState(new ListStateDescriptor<UrlViewCount>("url-count-list", Types.POJO(UrlViewCount.class)) ;);}&#64;Overridepublic void processElement(UrlViewCount value, Context ctx, Collector<String> out) throws Exception{// 来了数据后&#xff0c;将数据保存到状态中urlViewCountListState.add(value);// 注册windowEnd &#43; 1ms 的定时器ctx.timerService().registerEventTimeTimer(ctx.getCurrentKey() &#43; 1); }&#64;Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception{// 从状态中获取数据ArrayList<UrlViewCount> urlViewCountArrayList &#61; new ArrayList();for(UrlViewCount u : urlViewCountListState.get()){urlViewCountArrayList.add(u);}// 排序urlViewCountArrayList.sort(new Comparator<UrlViewCount>(){&#64;Overridepublic int compare(UrlViewCount o1, UrlViewCount o2){return o2.count.intValue() - o1.count.intValue();}});// 包装信息打印输出StringBuilder result &#61; new StringBuilder();result.append("窗口结束时间&#xff1a;" &#43; new Timestamp(ctx.getCurrentKey()));// 取前两个for(int i&#61;0; i < 2; i&#43;&#43;){UrlViewCount currTuple &#61; urlViewCountArrayList.get(i);String info &#61; "No. "&#43;(i&#43;1)&#43;" "&#43; "url: "&#43; currTuple.url &#43; " "&#43; "访问量&#xff1a; "&#43; currTuple.count &#43; "\n";result.append(info);}out.collect(result.toString());}
}

侧输出流

可以用来做分流操作

// 定义侧输出流标签
OutputTag<String> outputTag &#61; new OutputTag<String>("side-output"){};public void processElement(){// 转换成Long&#xff0c; 输出到主流中out.collect(Long.valueof(value));// 转换成String&#xff0c; 输出到侧输出流中ctx.output(outputTag, String.valueof(value));
}// 获得侧输出流
DataStream<String> stringStream &#61; longStream.getSideOutput(outputTag);

推荐阅读
  • 深入理解Flink的水印机制
    本文详细探讨了Apache Flink框架中的水印机制,这是一种用于处理数据流中时间不一致问题的重要工具。通过介绍水印的工作原理及其在实际应用中的实现方式,帮助读者更好地理解和利用这一功能。 ... [详细]
  • 本文探讨了如何在PHP与MySQL环境中实现高效的分页查询,包括基本的分页实现、性能优化技巧以及高级的分页策略。 ... [详细]
  • 理解浏览器历史记录(2)hashchange、pushState
    阅读目录1.hashchange2.pushState本文也是一篇基础文章。继上文之后,本打算去研究pushState,偶然在一些信息中发现了锚点变 ... [详细]
  • 基于SSM框架的在线考试系统:随机组卷功能详解
    本文深入探讨了基于SSM(Spring, Spring MVC, MyBatis)框架构建的在线考试系统中,随机组卷功能的设计与实现方法。 ... [详细]
  • ASP.NET 进度条实现详解
    本文介绍了如何在ASP.NET中使用HTML和JavaScript创建一个动态更新的进度条,并通过Default.aspx页面进行展示。 ... [详细]
  • 本文探讨了如何通过优化 DOM 操作来提升 JavaScript 的性能,包括使用 `createElement` 函数、动画元素、理解重绘事件及处理鼠标滚动事件等关键主题。 ... [详细]
  • 流处理中的计数挑战与解决方案
    本文探讨了在流处理中进行计数的各种技术和挑战,并基于作者在2016年圣何塞举行的Hadoop World大会上的演讲进行了深入分析。文章不仅介绍了传统批处理和Lambda架构的局限性,还详细探讨了流处理架构的优势及其在现代大数据应用中的重要作用。 ... [详细]
  • C# 中创建和执行存储过程的方法
    本文详细介绍了如何使用 C# 创建和调用 SQL Server 存储过程,包括连接数据库、定义命令类型、设置参数等步骤。 ... [详细]
  • 本文介绍了如何通过 XMLHttpRequest 对象在不同浏览器中实现 AJAX 的 POST 和 GET 请求,并详细说明了 XMLHttpRequest 的五个状态及其含义。 ... [详细]
  • MySQL Decimal 类型的最大值解析及其在数据处理中的应用艺术
    在关系型数据库中,表的设计与SQL语句的编写对性能的影响至关重要,甚至可占到90%以上。本文将重点探讨MySQL中Decimal类型的最大值及其在数据处理中的应用技巧,通过实例分析和优化建议,帮助读者深入理解并掌握这一重要知识点。 ... [详细]
  • 探讨了在HTML表单中使用元素代替进行表单提交的方法。 ... [详细]
  • 利用Node.js实现PSD文件的高效切图
    本文介绍了如何通过Node.js及其psd2json模块,快速实现PSD文件的自动化切图过程,以适应项目中频繁的界面更新需求。此方法不仅提高了工作效率,还简化了从设计稿到实际应用的转换流程。 ... [详细]
  • 在尝试将 mysqldump 文件加载到新的 MySQL 服务器时,遇到因使用保留关键字 'table' 导致的语法错误。 ... [详细]
  • 在使用 Cacti 进行监控时,发现已运行的转码机未产生流量,导致 Cacti 监控界面显示该转码机处于宕机状态。进一步检查 Cacti 日志,发现数据库中存在 SQL 查询失败的问题,错误代码为 145。此问题可能是由于数据库表损坏或索引失效所致,建议对相关表进行修复操作以恢复监控功能。 ... [详细]
  • 本文详细介绍了在 Oracle 数据库中使用 MyBatis 实现增删改查操作的方法。针对查询操作,文章解释了如何通过创建字段映射来处理数据库字段风格与 Java 对象之间的差异,确保查询结果能够正确映射到持久层对象。此外,还探讨了插入、更新和删除操作的具体实现及其最佳实践,帮助开发者高效地管理和操作 Oracle 数据库中的数据。 ... [详细]
author-avatar
黄宗翰琼琦莉雯
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有