热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

FlinkCEP(八)模式的检测处理

文章目录处理超时事件使用PatternProcessFunction的侧输出流使用PatternTimeoutFunction应用案例处理迟到数据处理超时事件复杂事件的检测结果一

文章目录

  • 处理超时事件
    • 使用 PatternProcessFunction 的侧输出流
    • 使用 PatternTimeoutFunction
  • 应用案例
  • 处理迟到数据


处理超时事件

复杂事件的检测结果一般只有两种:要么匹配,要么不匹配。检测处理的过程具体如下:
(1)如果当前事件符合模式匹配的条件,就接受该事件,保存到对应的 Map 中;
(2)如果在模式序列定义中,当前事件后面还应该有其他事件,就继续读取事件流进行检测;如果模式序列的定义已经全部满足,那么就成功检测到了一组匹配的复杂事件,调用PatternProcessFunction 的 processMatch()方法进行处理;
(3)如果当前事件不符合模式匹配的条件,就丢弃该事件;
(4)如果当前事件破坏了模式序列中定义的限制条件,比如不满足严格近邻要求,那么当前已检测的一组部分匹配事件都被丢弃,重新开始检测。

不过在有时间限制的情况下,需要考虑的问题会有一点特别。比如我们用.within()指定了模式检测的时间间隔,超出这个时间当前这组检测就应该失败了。然而这种“超时失败”跟真正的“匹配失败”不同,它其实是一种“部分成功匹配”;因为只有在开头能够正常匹配的前提下,没有等到后续的匹配事件才会超时。所以往往不应该直接丢弃,而是要输出一个提示或报警信息。这就要求我们有能力捕获并处理超时事件。

使用 PatternProcessFunction 的侧输出流

在 Flink CEP 中 , 提供了一个专门捕捉超时的部分匹配事件的接 口 , 叫作TimedOutPartialMatchHandler。这个接口需要实现一个processTimedOutMatch()方法,可以将超时的、已检测到的部分匹配事件放在一个 Map 中,作为方法的第一个参数;方法的第二个参数则是 PatternProcessFunction 的上下文 Context。所以这个接口必须与 PatternProcessFunction结合使用,对处理结果的输出则需要利用侧输出流来进行。

举例:

class MyPatternProcessFunction extends PatternProcessFunction <Event, String> implements TimedOutPartialMatchHandler <Event> {// 正常匹配事件的处理&#64;Overridepublic void processMatch(Map <String, List <Event>> match, Context ctx, Collector <String> out) throws Exception {...}// 超时部分匹配事件的处理 &#64;Overridepublic void processTimedOutMatch(Map <String, List <Event>> match, Context ctx) throws Exception {Event startEvent &#61; match.get("start").get(0);OutputTag <Event> outputTag &#61; new OutputTag <Event> ("time-out") {};ctx.output(outputTag, startEvent);}}

可以在 processTimedOutMatch()方法中定义了一个输出标签&#xff08;OutputTag&#xff09;。调用 ctx.output()方法&#xff0c;就可以将超时的部分匹配事件输出到标签所标识的侧输出流了。

使用 PatternTimeoutFunction

PatternProcessFunction通过实现TimedOutPartialMatchHandler接口扩展出了处理超时事件的能力&#xff0c;这是官方推荐的做法。此外&#xff0c;Flink CEP 中也保留了简化版的PatternSelectFunction&#xff0c;它无法直接处理超时事件&#xff0c;不过我们可以通过调用 PatternStream的.select()方法时多传入一个 PatternTimeoutFunction 参数来实现这一点。PatternTimeoutFunction 是早期版本中用于捕获超时事件的接口。它需要实现一个 timeout()方法&#xff0c;同样会将部分匹配的事件放在一个 Map 中作为参数传入&#xff0c;此外还有一个参数是当前的时间戳。提取部分匹配事件进行处理转换后&#xff0c;可以将通知或报警信息输出。由于调用.select()方法后会得到唯一的 DataStream&#xff0c;所以正常匹配事件和超时事件的处理结果不应该放在同一条流中。正常匹配事件的处理结果会进入转换后得到的 DataStream&#xff0c;而超时事件的处理结果则会进入侧输出流&#xff1b;这个侧输出流需要另外传入一个侧输出标签&#xff08;OutputTag&#xff09;来指定。

所以最终我们在调用 PatternStream 的.select()方法时需要传入三个参数&#xff1a;侧输出流标签&#xff08; OutputTag &#xff09;&#xff0c; 超 时 事 件 处 理 函 数 PatternTimeoutFunction &#xff0c; 匹 配 事 件 提 取 函 数PatternSelectFunction。

下面是一个代码中的调用方式&#xff1a;

// 定义一个侧输出流标签&#xff0c;用于标识超时侧输出流
OutputTag < String > timeoutTag &#61; new OutputTag <String> ("timeout") {};
// 将匹配到的&#xff0c;和超时部分匹配的复杂事件提取出来&#xff0c;然后包装成提示信息输出
SingleOutputStreamOperator <String> resultStream &#61; patternStream.select(timeoutTag,// 超时部分匹配事件的处理new PatternTimeoutFunction <Event, String> () {&#64;Overridepublic String timeout(Map <String, List <Event>> pattern, long timeoutTimestamp) throws Exception {Event event &#61; pattern.get("start").get(0);return "超时&#xff1a;" &#43; event.toString();}},// 正常匹配事件的处理new PatternSelectFunction <Event, String> () {&#64;Overridepublic String select(Map <String, List <Event>> pattern) throws Exception {...}});
// 将正常匹配和超时部分匹配的处理结果流打印输出
resultStream.print("matched");
resultStream.getSideOutput(timeoutTag).print("timeout");

这里需要注意的是&#xff0c;在超时事件处理的过程中&#xff0c;从 Map 里只能取到已经检测到匹配的那些事件&#xff1b;如果取可能未匹配的事件并调用它的对象方法&#xff0c;则可能会报空指针异常&#xff08;NullPointerException&#xff09;。另外&#xff0c;超时事件处理的结果进入侧输出流&#xff0c;正常匹配事件的处理结果进入主流&#xff0c;两者的数据类型可以不同。

应用案例

在电商平台中&#xff0c;最终创造收入和利润的是用户下单购买的环节。用户下单的行为可以表明用户对商品的需求&#xff0c;但在现实中&#xff0c;并不是每次下单都会被用户立刻支付。当拖延一段时间后&#xff0c;用户支付的意愿会降低。所以为了让用户更有紧迫感从而提高支付转化率&#xff0c;同时也为了防范订单支付环节的安全风险&#xff0c;电商网站往往会对订单状态进行监控&#xff0c;设置一个失效时间&#xff08;比如 15分钟&#xff09;&#xff0c;如果下单后一段时间仍未支付&#xff0c;订单就会被取消。

首先定义出要处理的数据类型。我们面对的是订单事件&#xff0c;主要包括用户对订单的创建&#xff08;下单&#xff09;和支付两种行为。因此可以定义 POJO 类 OrderEvent 如下&#xff0c;其中属性字段包括用户 ID、订单 ID、事件类型&#xff08;操作类型&#xff09;以及时间戳。

订单事件源代码

public class OrderEvent {public String userId; //用户IDpublic String orderId; //订单IDpublic String eventType; //事件类型&#xff08;操作类型&#xff09;public Long timestamp; //时间戳public OrderEvent() {}public OrderEvent(String userId, String orderId, String eventType, Long timestamp) {this.userId &#61; userId;this.orderId &#61; orderId;this.eventType &#61; eventType;this.timestamp &#61; timestamp;}&#64;Overridepublic String toString() {return "OrderEvent[" &#43;"userId&#61;&#39;" &#43; userId &#43; &#39;\&#39;&#39; &#43;"orderId&#61;&#39;" &#43; orderId &#43; &#39;\&#39;&#39; &#43;", eventType&#61;&#39;" &#43; eventType &#43; &#39;\&#39;&#39; &#43;", timestamp&#61;" &#43; timestamp &#43;&#39;]&#39;;}
}

当前需求的重点在于对超时未支付的用户进行监控提醒&#xff0c;也就是需要检测有下单行为、但15 分钟内没有支付行为的复杂事件。在下单和支付之间&#xff0c;可以有其他操作&#xff08;比如对订单的修改&#xff09;&#xff0c;所以两者之间是宽松近邻关系。

重点要处理的是超时的部分匹配事件。对原始的订单事件流按照订单 ID 进行分组&#xff0c;然后检测每个订单的“下单-支付”复杂事件&#xff0c;如果出现超时事件需要输出报警提示信息。

整体代码实现如下&#xff1a;

public class OrderTimeoutDetectExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env &#61; StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 1.获取订单事件流&#xff0c;并提取时间戳、生成水位线SingleOutputStreamOperator<OrderEvent> orderStream &#61; env.fromElements(new OrderEvent("user_1", "order_1", "create", 1000L),new OrderEvent("user_2", "order_2", "create", 2000L),new OrderEvent("user_1", "order_1", "modify", 10 * 1000L),new OrderEvent("user_1", "order_1", "pay", 60 * 1000L),new OrderEvent("user_2", "order_3", "create", 10 * 60 * 1000L),new OrderEvent("user_2", "order_3", "pay", 20 * 60 * 1000L)).assignTimestampsAndWatermarks(WatermarkStrategy.<OrderEvent>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner(new SerializableTimestampAssigner<OrderEvent>() {&#64;Overridepublic long extractTimestamp(OrderEvent element, long recordTimestamp) {return element.timestamp;}}));//2.定义模式Pattern<OrderEvent, OrderEvent> pattern &#61; Pattern.<OrderEvent>begin("create").where(new SimpleCondition<OrderEvent>() { //首先是下单事件&#64;Overridepublic boolean filter(OrderEvent value) throws Exception {return value.eventType.equals("create");}}).followedBy("pay").where(new SimpleCondition<OrderEvent>() { //其次是支付事件,中间可以修改订单,宽松近邻&#64;Overridepublic boolean filter(OrderEvent value) throws Exception {return value.eventType.equals("pay");}}).within(Time.minutes(15));// 要求在十五分钟之内完成//3.将应用模式应用到订单流上,检测匹配的复杂事件PatternStream<OrderEvent> patternStream &#61; CEP.pattern(orderStream.keyBy(event -> event.orderId), pattern);//4.定义一个侧输出流标签OutputTag<String> timeoutTag &#61; new OutputTag<String>("timeout"){};//5.将完全匹配和超时部分匹配的复杂事件提取出来&#xff0c;进行处理SingleOutputStreamOperator<String> result &#61; patternStream.process(new OrderPayMatch());// 将正常匹配和超时部分匹配的处理结果流打印输出result.print("正常支付");result.getSideOutput(timeoutTag).print("timeout");env.execute();}//实现自定义的 PatternProcessFunction&#xff0c;需实现 TimedOutPartialMatchHandler 接口public static class OrderPayMatch extends PatternProcessFunction<OrderEvent,String> implements TimedOutPartialMatchHandler<OrderEvent>{// 处理正常匹配事件&#64;Overridepublic void processMatch(Map<String, List<OrderEvent>> match, Context context, Collector<String> out) throws Exception {//获取当前的支付事件OrderEvent payEvent &#61; match.get("pay").get(0);out.collect("用户" &#43; payEvent.userId &#43; "的订单&#xff1a;" &#43; payEvent.orderId &#43; " 已支付&#xff01;");}//处理超时未支付事件&#64;Overridepublic void processTimedOutMatch(Map<String, List<OrderEvent>> match, Context context) throws Exception {OrderEvent createEvent &#61; match.get("create").get(0);OutputTag<String> timeoutTag &#61; new OutputTag<String>("timeout"){};context.output(timeoutTag,"用户" &#43; createEvent.userId &#43; "的订单 &#xff1a;" &#43; createEvent.orderId &#43; " 超时未支付&#xff01;");}}
}

运行代码&#xff0c;控制台打印结果如下&#xff1a;
在这里插入图片描述

订单 1 和订单 3 都在 15 分钟进行了支付&#xff0c;订单 1 中间的修改行为不会影响结果&#xff1b;而订单 2 未能支付&#xff0c;因此侧输出流输出了一条报警信息。且同一用户可以下多个订单&#xff0c;最后的判断只是基于同一订单做出的。这与我们预期的效果完全一致。用处理函数进行状态编程&#xff0c;结合定时器也可以实现同样的功能&#xff0c;但明显 CEP 的实现更加方便&#xff0c;也更容易迁移和扩展。

Gitee上的源代码

处理迟到数据

CEP 主要处理的是先后发生的一组复杂事件&#xff0c;所以事件的顺序非常关键。事件先后顺序的具体定义与时间语义有关。如果是处理时间语义&#xff0c;那比较简单&#xff0c;只要按照数据处理的系统时间算就可以了&#xff1b;而如果是事件时间语义&#xff0c;需要按照事件自身的时间戳来排序。这就有可能出现时间戳大的事件先到、时间戳小的事件后到的现象&#xff0c;也就是所谓的“乱序数据” 或“迟到数据”。 在 Flink CEP 中沿用了通过设置水位线&#xff08;watermark&#xff09;延迟来处理乱序数据的做法。

当一个事件到来时&#xff0c;并不会立即做检测匹配处理&#xff0c;而是先放入一个缓冲区&#xff08;buffer&#xff09;。缓冲区内的数据&#xff0c;会按照时间戳由小到大排序&#xff1b;当一个水位线到来时&#xff0c;就会将缓冲区中所有时间戳小于水位线的事件依次取出&#xff0c;进行检测匹配。这样就保证了匹配事件的顺序和事件时间的进展一致&#xff0c;处理的顺序就一定是正确的。这里水位线的延迟时间&#xff0c;也就是事件在缓冲区等待的最大时间。

这样又会带来另一个问题&#xff1a;水位线延迟时间不可能保证将所有乱序数据完美包括进来&#xff0c;总会有一些事件延迟比较大&#xff0c;以至于等它到来的时候水位线早已超过了它的时间戳。这时之前的数据都已处理完毕&#xff0c;这样的“迟到数据”就只能被直接丢弃了——这与窗口对迟到数据的默认处理一致。

如果不希望迟到数据丢掉&#xff0c;应该也可以借鉴窗口的做法。Flink CEP 同样提 供 了 将 迟 到 事 件 输 出 到 侧 输 出 流 的 方 式 &#xff1a; 我 们 可 以 基 于 PatternStream 直接调用.sideOutputLateData()方法&#xff0c;传入一个 OutputTag&#xff0c;将迟到数据放入侧输出流另行处理。代码中调用方式如下&#xff1a;

PatternStream <Event> patternStream &#61; CEP.pattern(input, pattern);
// 定义一个侧输出流的标签
OutputTag <String> lateDataOutputTag &#61; new OutputTag <String> ("late-data") {};
// 将迟到数据输出到侧输出流
SingleOutputStreamOperator <ComplexEvent> result &#61; patternStream.sideOutputLateData(lateDataOutputTag) .select( // 处理正常匹配数据new PatternSelectFunction <Event, ComplexEvent> () {...});
// 从结果中提取侧输出流
DataStream <String> lateData &#61; result.getSideOutput(lateDataOutputTag);

可以看到&#xff0c;整个处理流程与窗口非常相似。经处理匹配数据得到结果数据流之后&#xff0c;可以调用.getSideOutput()方法来提取侧输出流&#xff0c;捕获迟到数据进行额外处理。


推荐阅读
  • 本文介绍了南邮ctf-web的writeup,包括签到题和md5 collision。在CTF比赛和渗透测试中,可以通过查看源代码、代码注释、页面隐藏元素、超链接和HTTP响应头部来寻找flag或提示信息。利用PHP弱类型,可以发现md5('QNKCDZO')='0e830400451993494058024219903391'和md5('240610708')='0e462097431906509019562988736854'。 ... [详细]
  • 向QTextEdit拖放文件的方法及实现步骤
    本文介绍了在使用QTextEdit时如何实现拖放文件的功能,包括相关的方法和实现步骤。通过重写dragEnterEvent和dropEvent函数,并结合QMimeData和QUrl等类,可以轻松实现向QTextEdit拖放文件的功能。详细的代码实现和说明可以参考本文提供的示例代码。 ... [详细]
  • Html5-Canvas实现简易的抽奖转盘效果
    本文介绍了如何使用Html5和Canvas标签来实现简易的抽奖转盘效果,同时使用了jQueryRotate.js旋转插件。文章中给出了主要的html和css代码,并展示了实现的基本效果。 ... [详细]
  • 本文介绍了闭包的定义和运转机制,重点解释了闭包如何能够接触外部函数的作用域中的变量。通过词法作用域的查找规则,闭包可以访问外部函数的作用域。同时还提到了闭包的作用和影响。 ... [详细]
  • Iamtryingtomakeaclassthatwillreadatextfileofnamesintoanarray,thenreturnthatarra ... [详细]
  • 在Android开发中,使用Picasso库可以实现对网络图片的等比例缩放。本文介绍了使用Picasso库进行图片缩放的方法,并提供了具体的代码实现。通过获取图片的宽高,计算目标宽度和高度,并创建新图实现等比例缩放。 ... [详细]
  • Spring源码解密之默认标签的解析方式分析
    本文分析了Spring源码解密中默认标签的解析方式。通过对命名空间的判断,区分默认命名空间和自定义命名空间,并采用不同的解析方式。其中,bean标签的解析最为复杂和重要。 ... [详细]
  • Linux重启网络命令实例及关机和重启示例教程
    本文介绍了Linux系统中重启网络命令的实例,以及使用不同方式关机和重启系统的示例教程。包括使用图形界面和控制台访问系统的方法,以及使用shutdown命令进行系统关机和重启的句法和用法。 ... [详细]
  • Java序列化对象传给PHP的方法及原理解析
    本文介绍了Java序列化对象传给PHP的方法及原理,包括Java对象传递的方式、序列化的方式、PHP中的序列化用法介绍、Java是否能反序列化PHP的数据、Java序列化的原理以及解决Java序列化中的问题。同时还解释了序列化的概念和作用,以及代码执行序列化所需要的权限。最后指出,序列化会将对象实例的所有字段都进行序列化,使得数据能够被表示为实例的序列化数据,但只有能够解释该格式的代码才能够确定数据的内容。 ... [详细]
  • android listview OnItemClickListener失效原因
    最近在做listview时发现OnItemClickListener失效的问题,经过查找发现是因为button的原因。不仅listitem中存在button会影响OnItemClickListener事件的失效,还会导致单击后listview每个item的背景改变,使得item中的所有有关焦点的事件都失效。本文给出了一个范例来说明这种情况,并提供了解决方法。 ... [详细]
  • Spring特性实现接口多类的动态调用详解
    本文详细介绍了如何使用Spring特性实现接口多类的动态调用。通过对Spring IoC容器的基础类BeanFactory和ApplicationContext的介绍,以及getBeansOfType方法的应用,解决了在实际工作中遇到的接口及多个实现类的问题。同时,文章还提到了SPI使用的不便之处,并介绍了借助ApplicationContext实现需求的方法。阅读本文,你将了解到Spring特性的实现原理和实际应用方式。 ... [详细]
  • 本文讨论了一个关于cuowu类的问题,作者在使用cuowu类时遇到了错误提示和使用AdjustmentListener的问题。文章提供了16个解决方案,并给出了两个可能导致错误的原因。 ... [详细]
  • 本文介绍了Web学习历程记录中关于Tomcat的基本概念和配置。首先解释了Web静态Web资源和动态Web资源的概念,以及C/S架构和B/S架构的区别。然后介绍了常见的Web服务器,包括Weblogic、WebSphere和Tomcat。接着详细讲解了Tomcat的虚拟主机、web应用和虚拟路径映射的概念和配置过程。最后简要介绍了http协议的作用。本文内容详实,适合初学者了解Tomcat的基础知识。 ... [详细]
  • 展开全部下面的代码是创建一个立方体Thisexamplescreatesanddisplaysasimplebox.#Thefirstlineloadstheinit_disp ... [详细]
  • 本文介绍了机器学习手册中关于日期和时区操作的重要性以及其在实际应用中的作用。文章以一个故事为背景,描述了学童们面对老先生的教导时的反应,以及上官如在这个过程中的表现。同时,文章也提到了顾慎为对上官如的恨意以及他们之间的矛盾源于早年的结局。最后,文章强调了日期和时区操作在机器学习中的重要性,并指出了其在实际应用中的作用和意义。 ... [详细]
author-avatar
kelly最爱梁君诺_795
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有