热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

FlinkCEP(八)模式的检测处理

文章目录处理超时事件使用PatternProcessFunction的侧输出流使用PatternTimeoutFunction应用案例处理迟到数据处理超时事件复杂事件的检测结果一

文章目录

  • 处理超时事件
    • 使用 PatternProcessFunction 的侧输出流
    • 使用 PatternTimeoutFunction
  • 应用案例
  • 处理迟到数据


处理超时事件

复杂事件的检测结果一般只有两种:要么匹配,要么不匹配。检测处理的过程具体如下:
(1)如果当前事件符合模式匹配的条件,就接受该事件,保存到对应的 Map 中;
(2)如果在模式序列定义中,当前事件后面还应该有其他事件,就继续读取事件流进行检测;如果模式序列的定义已经全部满足,那么就成功检测到了一组匹配的复杂事件,调用PatternProcessFunction 的 processMatch()方法进行处理;
(3)如果当前事件不符合模式匹配的条件,就丢弃该事件;
(4)如果当前事件破坏了模式序列中定义的限制条件,比如不满足严格近邻要求,那么当前已检测的一组部分匹配事件都被丢弃,重新开始检测。

不过在有时间限制的情况下,需要考虑的问题会有一点特别。比如我们用.within()指定了模式检测的时间间隔,超出这个时间当前这组检测就应该失败了。然而这种“超时失败”跟真正的“匹配失败”不同,它其实是一种“部分成功匹配”;因为只有在开头能够正常匹配的前提下,没有等到后续的匹配事件才会超时。所以往往不应该直接丢弃,而是要输出一个提示或报警信息。这就要求我们有能力捕获并处理超时事件。

使用 PatternProcessFunction 的侧输出流

在 Flink CEP 中 , 提供了一个专门捕捉超时的部分匹配事件的接 口 , 叫作TimedOutPartialMatchHandler。这个接口需要实现一个processTimedOutMatch()方法,可以将超时的、已检测到的部分匹配事件放在一个 Map 中,作为方法的第一个参数;方法的第二个参数则是 PatternProcessFunction 的上下文 Context。所以这个接口必须与 PatternProcessFunction结合使用,对处理结果的输出则需要利用侧输出流来进行。

举例:

class MyPatternProcessFunction extends PatternProcessFunction <Event, String> implements TimedOutPartialMatchHandler <Event> {// 正常匹配事件的处理&#64;Overridepublic void processMatch(Map <String, List <Event>> match, Context ctx, Collector <String> out) throws Exception {...}// 超时部分匹配事件的处理 &#64;Overridepublic void processTimedOutMatch(Map <String, List <Event>> match, Context ctx) throws Exception {Event startEvent &#61; match.get("start").get(0);OutputTag <Event> outputTag &#61; new OutputTag <Event> ("time-out") {};ctx.output(outputTag, startEvent);}}

可以在 processTimedOutMatch()方法中定义了一个输出标签&#xff08;OutputTag&#xff09;。调用 ctx.output()方法&#xff0c;就可以将超时的部分匹配事件输出到标签所标识的侧输出流了。

使用 PatternTimeoutFunction

PatternProcessFunction通过实现TimedOutPartialMatchHandler接口扩展出了处理超时事件的能力&#xff0c;这是官方推荐的做法。此外&#xff0c;Flink CEP 中也保留了简化版的PatternSelectFunction&#xff0c;它无法直接处理超时事件&#xff0c;不过我们可以通过调用 PatternStream的.select()方法时多传入一个 PatternTimeoutFunction 参数来实现这一点。PatternTimeoutFunction 是早期版本中用于捕获超时事件的接口。它需要实现一个 timeout()方法&#xff0c;同样会将部分匹配的事件放在一个 Map 中作为参数传入&#xff0c;此外还有一个参数是当前的时间戳。提取部分匹配事件进行处理转换后&#xff0c;可以将通知或报警信息输出。由于调用.select()方法后会得到唯一的 DataStream&#xff0c;所以正常匹配事件和超时事件的处理结果不应该放在同一条流中。正常匹配事件的处理结果会进入转换后得到的 DataStream&#xff0c;而超时事件的处理结果则会进入侧输出流&#xff1b;这个侧输出流需要另外传入一个侧输出标签&#xff08;OutputTag&#xff09;来指定。

所以最终我们在调用 PatternStream 的.select()方法时需要传入三个参数&#xff1a;侧输出流标签&#xff08; OutputTag &#xff09;&#xff0c; 超 时 事 件 处 理 函 数 PatternTimeoutFunction &#xff0c; 匹 配 事 件 提 取 函 数PatternSelectFunction。

下面是一个代码中的调用方式&#xff1a;

// 定义一个侧输出流标签&#xff0c;用于标识超时侧输出流
OutputTag < String > timeoutTag &#61; new OutputTag <String> ("timeout") {};
// 将匹配到的&#xff0c;和超时部分匹配的复杂事件提取出来&#xff0c;然后包装成提示信息输出
SingleOutputStreamOperator <String> resultStream &#61; patternStream.select(timeoutTag,// 超时部分匹配事件的处理new PatternTimeoutFunction <Event, String> () {&#64;Overridepublic String timeout(Map <String, List <Event>> pattern, long timeoutTimestamp) throws Exception {Event event &#61; pattern.get("start").get(0);return "超时&#xff1a;" &#43; event.toString();}},// 正常匹配事件的处理new PatternSelectFunction <Event, String> () {&#64;Overridepublic String select(Map <String, List <Event>> pattern) throws Exception {...}});
// 将正常匹配和超时部分匹配的处理结果流打印输出
resultStream.print("matched");
resultStream.getSideOutput(timeoutTag).print("timeout");

这里需要注意的是&#xff0c;在超时事件处理的过程中&#xff0c;从 Map 里只能取到已经检测到匹配的那些事件&#xff1b;如果取可能未匹配的事件并调用它的对象方法&#xff0c;则可能会报空指针异常&#xff08;NullPointerException&#xff09;。另外&#xff0c;超时事件处理的结果进入侧输出流&#xff0c;正常匹配事件的处理结果进入主流&#xff0c;两者的数据类型可以不同。

应用案例

在电商平台中&#xff0c;最终创造收入和利润的是用户下单购买的环节。用户下单的行为可以表明用户对商品的需求&#xff0c;但在现实中&#xff0c;并不是每次下单都会被用户立刻支付。当拖延一段时间后&#xff0c;用户支付的意愿会降低。所以为了让用户更有紧迫感从而提高支付转化率&#xff0c;同时也为了防范订单支付环节的安全风险&#xff0c;电商网站往往会对订单状态进行监控&#xff0c;设置一个失效时间&#xff08;比如 15分钟&#xff09;&#xff0c;如果下单后一段时间仍未支付&#xff0c;订单就会被取消。

首先定义出要处理的数据类型。我们面对的是订单事件&#xff0c;主要包括用户对订单的创建&#xff08;下单&#xff09;和支付两种行为。因此可以定义 POJO 类 OrderEvent 如下&#xff0c;其中属性字段包括用户 ID、订单 ID、事件类型&#xff08;操作类型&#xff09;以及时间戳。

订单事件源代码

public class OrderEvent {public String userId; //用户IDpublic String orderId; //订单IDpublic String eventType; //事件类型&#xff08;操作类型&#xff09;public Long timestamp; //时间戳public OrderEvent() {}public OrderEvent(String userId, String orderId, String eventType, Long timestamp) {this.userId &#61; userId;this.orderId &#61; orderId;this.eventType &#61; eventType;this.timestamp &#61; timestamp;}&#64;Overridepublic String toString() {return "OrderEvent[" &#43;"userId&#61;&#39;" &#43; userId &#43; &#39;\&#39;&#39; &#43;"orderId&#61;&#39;" &#43; orderId &#43; &#39;\&#39;&#39; &#43;", eventType&#61;&#39;" &#43; eventType &#43; &#39;\&#39;&#39; &#43;", timestamp&#61;" &#43; timestamp &#43;&#39;]&#39;;}
}

当前需求的重点在于对超时未支付的用户进行监控提醒&#xff0c;也就是需要检测有下单行为、但15 分钟内没有支付行为的复杂事件。在下单和支付之间&#xff0c;可以有其他操作&#xff08;比如对订单的修改&#xff09;&#xff0c;所以两者之间是宽松近邻关系。

重点要处理的是超时的部分匹配事件。对原始的订单事件流按照订单 ID 进行分组&#xff0c;然后检测每个订单的“下单-支付”复杂事件&#xff0c;如果出现超时事件需要输出报警提示信息。

整体代码实现如下&#xff1a;

public class OrderTimeoutDetectExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env &#61; StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 1.获取订单事件流&#xff0c;并提取时间戳、生成水位线SingleOutputStreamOperator<OrderEvent> orderStream &#61; env.fromElements(new OrderEvent("user_1", "order_1", "create", 1000L),new OrderEvent("user_2", "order_2", "create", 2000L),new OrderEvent("user_1", "order_1", "modify", 10 * 1000L),new OrderEvent("user_1", "order_1", "pay", 60 * 1000L),new OrderEvent("user_2", "order_3", "create", 10 * 60 * 1000L),new OrderEvent("user_2", "order_3", "pay", 20 * 60 * 1000L)).assignTimestampsAndWatermarks(WatermarkStrategy.<OrderEvent>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner(new SerializableTimestampAssigner<OrderEvent>() {&#64;Overridepublic long extractTimestamp(OrderEvent element, long recordTimestamp) {return element.timestamp;}}));//2.定义模式Pattern<OrderEvent, OrderEvent> pattern &#61; Pattern.<OrderEvent>begin("create").where(new SimpleCondition<OrderEvent>() { //首先是下单事件&#64;Overridepublic boolean filter(OrderEvent value) throws Exception {return value.eventType.equals("create");}}).followedBy("pay").where(new SimpleCondition<OrderEvent>() { //其次是支付事件,中间可以修改订单,宽松近邻&#64;Overridepublic boolean filter(OrderEvent value) throws Exception {return value.eventType.equals("pay");}}).within(Time.minutes(15));// 要求在十五分钟之内完成//3.将应用模式应用到订单流上,检测匹配的复杂事件PatternStream<OrderEvent> patternStream &#61; CEP.pattern(orderStream.keyBy(event -> event.orderId), pattern);//4.定义一个侧输出流标签OutputTag<String> timeoutTag &#61; new OutputTag<String>("timeout"){};//5.将完全匹配和超时部分匹配的复杂事件提取出来&#xff0c;进行处理SingleOutputStreamOperator<String> result &#61; patternStream.process(new OrderPayMatch());// 将正常匹配和超时部分匹配的处理结果流打印输出result.print("正常支付");result.getSideOutput(timeoutTag).print("timeout");env.execute();}//实现自定义的 PatternProcessFunction&#xff0c;需实现 TimedOutPartialMatchHandler 接口public static class OrderPayMatch extends PatternProcessFunction<OrderEvent,String> implements TimedOutPartialMatchHandler<OrderEvent>{// 处理正常匹配事件&#64;Overridepublic void processMatch(Map<String, List<OrderEvent>> match, Context context, Collector<String> out) throws Exception {//获取当前的支付事件OrderEvent payEvent &#61; match.get("pay").get(0);out.collect("用户" &#43; payEvent.userId &#43; "的订单&#xff1a;" &#43; payEvent.orderId &#43; " 已支付&#xff01;");}//处理超时未支付事件&#64;Overridepublic void processTimedOutMatch(Map<String, List<OrderEvent>> match, Context context) throws Exception {OrderEvent createEvent &#61; match.get("create").get(0);OutputTag<String> timeoutTag &#61; new OutputTag<String>("timeout"){};context.output(timeoutTag,"用户" &#43; createEvent.userId &#43; "的订单 &#xff1a;" &#43; createEvent.orderId &#43; " 超时未支付&#xff01;");}}
}

运行代码&#xff0c;控制台打印结果如下&#xff1a;
在这里插入图片描述

订单 1 和订单 3 都在 15 分钟进行了支付&#xff0c;订单 1 中间的修改行为不会影响结果&#xff1b;而订单 2 未能支付&#xff0c;因此侧输出流输出了一条报警信息。且同一用户可以下多个订单&#xff0c;最后的判断只是基于同一订单做出的。这与我们预期的效果完全一致。用处理函数进行状态编程&#xff0c;结合定时器也可以实现同样的功能&#xff0c;但明显 CEP 的实现更加方便&#xff0c;也更容易迁移和扩展。

Gitee上的源代码

处理迟到数据

CEP 主要处理的是先后发生的一组复杂事件&#xff0c;所以事件的顺序非常关键。事件先后顺序的具体定义与时间语义有关。如果是处理时间语义&#xff0c;那比较简单&#xff0c;只要按照数据处理的系统时间算就可以了&#xff1b;而如果是事件时间语义&#xff0c;需要按照事件自身的时间戳来排序。这就有可能出现时间戳大的事件先到、时间戳小的事件后到的现象&#xff0c;也就是所谓的“乱序数据” 或“迟到数据”。 在 Flink CEP 中沿用了通过设置水位线&#xff08;watermark&#xff09;延迟来处理乱序数据的做法。

当一个事件到来时&#xff0c;并不会立即做检测匹配处理&#xff0c;而是先放入一个缓冲区&#xff08;buffer&#xff09;。缓冲区内的数据&#xff0c;会按照时间戳由小到大排序&#xff1b;当一个水位线到来时&#xff0c;就会将缓冲区中所有时间戳小于水位线的事件依次取出&#xff0c;进行检测匹配。这样就保证了匹配事件的顺序和事件时间的进展一致&#xff0c;处理的顺序就一定是正确的。这里水位线的延迟时间&#xff0c;也就是事件在缓冲区等待的最大时间。

这样又会带来另一个问题&#xff1a;水位线延迟时间不可能保证将所有乱序数据完美包括进来&#xff0c;总会有一些事件延迟比较大&#xff0c;以至于等它到来的时候水位线早已超过了它的时间戳。这时之前的数据都已处理完毕&#xff0c;这样的“迟到数据”就只能被直接丢弃了——这与窗口对迟到数据的默认处理一致。

如果不希望迟到数据丢掉&#xff0c;应该也可以借鉴窗口的做法。Flink CEP 同样提 供 了 将 迟 到 事 件 输 出 到 侧 输 出 流 的 方 式 &#xff1a; 我 们 可 以 基 于 PatternStream 直接调用.sideOutputLateData()方法&#xff0c;传入一个 OutputTag&#xff0c;将迟到数据放入侧输出流另行处理。代码中调用方式如下&#xff1a;

PatternStream <Event> patternStream &#61; CEP.pattern(input, pattern);
// 定义一个侧输出流的标签
OutputTag <String> lateDataOutputTag &#61; new OutputTag <String> ("late-data") {};
// 将迟到数据输出到侧输出流
SingleOutputStreamOperator <ComplexEvent> result &#61; patternStream.sideOutputLateData(lateDataOutputTag) .select( // 处理正常匹配数据new PatternSelectFunction <Event, ComplexEvent> () {...});
// 从结果中提取侧输出流
DataStream <String> lateData &#61; result.getSideOutput(lateDataOutputTag);

可以看到&#xff0c;整个处理流程与窗口非常相似。经处理匹配数据得到结果数据流之后&#xff0c;可以调用.getSideOutput()方法来提取侧输出流&#xff0c;捕获迟到数据进行额外处理。


推荐阅读
  • Ihavetwomethodsofgeneratingmdistinctrandomnumbersintherange[0..n-1]我有两种方法在范围[0.n-1]中生 ... [详细]
  • 解决Bootstrap DataTable Ajax请求重复问题
    在最近的一个项目中,我们使用了JQuery DataTable进行数据展示,虽然使用起来非常方便,但在测试过程中发现了一个问题:当查询条件改变时,有时查询结果的数据不正确。通过FireBug调试发现,点击搜索按钮时,会发送两次Ajax请求,一次是原条件的请求,一次是新条件的请求。 ... [详细]
  • 解决Only fullscreen opaque activities can request orientation错误的方法
    本文介绍了在使用PictureSelectorLight第三方框架时遇到的Only fullscreen opaque activities can request orientation错误,并提供了一种有效的解决方案。 ... [详细]
  • 如何在Java中使用DButils类
    这期内容当中小编将会给大家带来有关如何在Java中使用DButils类,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。D ... [详细]
  • 在《Cocos2d-x学习笔记:基础概念解析与内存管理机制深入探讨》中,详细介绍了Cocos2d-x的基础概念,并深入分析了其内存管理机制。特别是针对Boost库引入的智能指针管理方法进行了详细的讲解,例如在处理鱼的运动过程中,可以通过编写自定义函数来动态计算角度变化,利用CallFunc回调机制实现高效的游戏逻辑控制。此外,文章还探讨了如何通过智能指针优化资源管理和避免内存泄漏,为开发者提供了实用的编程技巧和最佳实践。 ... [详细]
  • 您的数据库配置是否安全?DBSAT工具助您一臂之力!
    本文探讨了Oracle提供的免费工具DBSAT,该工具能够有效协助用户检测和优化数据库配置的安全性。通过全面的分析和报告,DBSAT帮助用户识别潜在的安全漏洞,并提供针对性的改进建议,确保数据库系统的稳定性和安全性。 ... [详细]
  • 本指南介绍了如何在ASP.NET Web应用程序中利用C#和JavaScript实现基于指纹识别的登录系统。通过集成指纹识别技术,用户无需输入传统的登录ID即可完成身份验证,从而提升用户体验和安全性。我们将详细探讨如何配置和部署这一功能,确保系统的稳定性和可靠性。 ... [详细]
  • 本文深入解析了WCF Binding模型中的绑定元素,详细介绍了信道、信道管理器、信道监听器和信道工厂的概念与作用。从对象创建的角度来看,信道管理器负责信道的生成。具体而言,客户端的信道通过信道工厂进行实例化,而服务端则通过信道监听器来接收请求。文章还探讨了这些组件之间的交互机制及其在WCF通信中的重要性。 ... [详细]
  • C++ 异步编程中获取线程执行结果的方法与技巧及其在前端开发中的应用探讨
    本文探讨了C++异步编程中获取线程执行结果的方法与技巧,并深入分析了这些技术在前端开发中的应用。通过对比不同的异步编程模型,本文详细介绍了如何高效地处理多线程任务,确保程序的稳定性和性能。同时,文章还结合实际案例,展示了这些方法在前端异步编程中的具体实现和优化策略。 ... [详细]
  • 本文回顾了作者初次接触Unicode编码时的经历,并详细探讨了ASCII、ANSI、GB2312、UNICODE以及UTF-8和UTF-16编码的区别和应用场景。通过实例分析,帮助读者更好地理解和使用这些编码。 ... [详细]
  • 在多线程并发环境中,普通变量的操作往往是线程不安全的。本文通过一个简单的例子,展示了如何使用 AtomicInteger 类及其核心的 CAS 无锁算法来保证线程安全。 ... [详细]
  • 在什么情况下MySQL的可重复读隔离级别会导致幻读现象? ... [详细]
  • Python 伦理黑客技术:深入探讨后门攻击(第三部分)
    在《Python 伦理黑客技术:深入探讨后门攻击(第三部分)》中,作者详细分析了后门攻击中的Socket问题。由于TCP协议基于流,难以确定消息批次的结束点,这给后门攻击的实现带来了挑战。为了解决这一问题,文章提出了一系列有效的技术方案,包括使用特定的分隔符和长度前缀,以确保数据包的准确传输和解析。这些方法不仅提高了攻击的隐蔽性和可靠性,还为安全研究人员提供了宝贵的参考。 ... [详细]
  • MATLAB字典学习工具箱SPAMS:稀疏与字典学习的详细介绍、配置及应用实例
    SPAMS(Sparse Modeling Software)是一个强大的开源优化工具箱,专为解决多种稀疏估计问题而设计。该工具箱基于MATLAB,提供了丰富的算法和函数,适用于字典学习、信号处理和机器学习等领域。本文将详细介绍SPAMS的配置方法、核心功能及其在实际应用中的典型案例,帮助用户更好地理解和使用这一工具箱。 ... [详细]
  • 如何优化MySQL数据库性能以提升查询效率和系统稳定性 ... [详细]
author-avatar
kelly最爱梁君诺_795
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有