复杂事件的检测结果一般只有两种:要么匹配,要么不匹配。检测处理的过程具体如下:
(1)如果当前事件符合模式匹配的条件,就接受该事件,保存到对应的 Map 中;
(2)如果在模式序列定义中,当前事件后面还应该有其他事件,就继续读取事件流进行检测;如果模式序列的定义已经全部满足,那么就成功检测到了一组匹配的复杂事件,调用PatternProcessFunction 的 processMatch()方法进行处理;
(3)如果当前事件不符合模式匹配的条件,就丢弃该事件;
(4)如果当前事件破坏了模式序列中定义的限制条件,比如不满足严格近邻要求,那么当前已检测的一组部分匹配事件都被丢弃,重新开始检测。
不过在有时间限制的情况下,需要考虑的问题会有一点特别。比如我们用.within()指定了模式检测的时间间隔,超出这个时间当前这组检测就应该失败了。然而这种“超时失败”跟真正的“匹配失败”不同,它其实是一种“部分成功匹配”;因为只有在开头能够正常匹配的前提下,没有等到后续的匹配事件才会超时。所以往往不应该直接丢弃,而是要输出一个提示或报警信息。这就要求我们有能力捕获并处理超时事件。
在 Flink CEP 中 , 提供了一个专门捕捉超时的部分匹配事件的接 口 , 叫作TimedOutPartialMatchHandler。这个接口需要实现一个processTimedOutMatch()方法,可以将超时的、已检测到的部分匹配事件放在一个 Map 中,作为方法的第一个参数;方法的第二个参数则是 PatternProcessFunction 的上下文 Context。所以这个接口必须与 PatternProcessFunction结合使用,对处理结果的输出则需要利用侧输出流来进行。
举例:
class MyPatternProcessFunction extends PatternProcessFunction <Event, String> implements TimedOutPartialMatchHandler <Event> {// 正常匹配事件的处理&#64;Overridepublic void processMatch(Map <String, List <Event>> match, Context ctx, Collector <String> out) throws Exception {...}// 超时部分匹配事件的处理 &#64;Overridepublic void processTimedOutMatch(Map <String, List <Event>> match, Context ctx) throws Exception {Event startEvent &#61; match.get("start").get(0);OutputTag <Event> outputTag &#61; new OutputTag <Event> ("time-out") {};ctx.output(outputTag, startEvent);}}
可以在 processTimedOutMatch()方法中定义了一个输出标签&#xff08;OutputTag&#xff09;。调用 ctx.output()方法&#xff0c;就可以将超时的部分匹配事件输出到标签所标识的侧输出流了。
PatternProcessFunction通过实现TimedOutPartialMatchHandler接口扩展出了处理超时事件的能力&#xff0c;这是官方推荐的做法。此外&#xff0c;Flink CEP 中也保留了简化版的PatternSelectFunction&#xff0c;它无法直接处理超时事件&#xff0c;不过我们可以通过调用 PatternStream的.select()方法时多传入一个 PatternTimeoutFunction 参数来实现这一点。PatternTimeoutFunction 是早期版本中用于捕获超时事件的接口。它需要实现一个 timeout()方法&#xff0c;同样会将部分匹配的事件放在一个 Map 中作为参数传入&#xff0c;此外还有一个参数是当前的时间戳。提取部分匹配事件进行处理转换后&#xff0c;可以将通知或报警信息输出。由于调用.select()方法后会得到唯一的 DataStream&#xff0c;所以正常匹配事件和超时事件的处理结果不应该放在同一条流中。正常匹配事件的处理结果会进入转换后得到的 DataStream&#xff0c;而超时事件的处理结果则会进入侧输出流&#xff1b;这个侧输出流需要另外传入一个侧输出标签&#xff08;OutputTag&#xff09;来指定。
所以最终我们在调用 PatternStream 的.select()方法时需要传入三个参数&#xff1a;侧输出流标签&#xff08; OutputTag &#xff09;&#xff0c; 超 时 事 件 处 理 函 数 PatternTimeoutFunction &#xff0c; 匹 配 事 件 提 取 函 数PatternSelectFunction。
下面是一个代码中的调用方式&#xff1a;
// 定义一个侧输出流标签&#xff0c;用于标识超时侧输出流
OutputTag < String > timeoutTag &#61; new OutputTag <String> ("timeout") {};
// 将匹配到的&#xff0c;和超时部分匹配的复杂事件提取出来&#xff0c;然后包装成提示信息输出
SingleOutputStreamOperator <String> resultStream &#61; patternStream.select(timeoutTag,// 超时部分匹配事件的处理new PatternTimeoutFunction <Event, String> () {&#64;Overridepublic String timeout(Map <String, List <Event>> pattern, long timeoutTimestamp) throws Exception {Event event &#61; pattern.get("start").get(0);return "超时&#xff1a;" &#43; event.toString();}},// 正常匹配事件的处理new PatternSelectFunction <Event, String> () {&#64;Overridepublic String select(Map <String, List <Event>> pattern) throws Exception {...}});
// 将正常匹配和超时部分匹配的处理结果流打印输出
resultStream.print("matched");
resultStream.getSideOutput(timeoutTag).print("timeout");
这里需要注意的是&#xff0c;在超时事件处理的过程中&#xff0c;从 Map 里只能取到已经检测到匹配的那些事件&#xff1b;如果取可能未匹配的事件并调用它的对象方法&#xff0c;则可能会报空指针异常&#xff08;NullPointerException&#xff09;。另外&#xff0c;超时事件处理的结果进入侧输出流&#xff0c;正常匹配事件的处理结果进入主流&#xff0c;两者的数据类型可以不同。
应用案例在电商平台中&#xff0c;最终创造收入和利润的是用户下单购买的环节。用户下单的行为可以表明用户对商品的需求&#xff0c;但在现实中&#xff0c;并不是每次下单都会被用户立刻支付。当拖延一段时间后&#xff0c;用户支付的意愿会降低。所以为了让用户更有紧迫感从而提高支付转化率&#xff0c;同时也为了防范订单支付环节的安全风险&#xff0c;电商网站往往会对订单状态进行监控&#xff0c;设置一个失效时间&#xff08;比如 15分钟&#xff09;&#xff0c;如果下单后一段时间仍未支付&#xff0c;订单就会被取消。
首先定义出要处理的数据类型。我们面对的是订单事件&#xff0c;主要包括用户对订单的创建&#xff08;下单&#xff09;和支付两种行为。因此可以定义 POJO 类 OrderEvent 如下&#xff0c;其中属性字段包括用户 ID、订单 ID、事件类型&#xff08;操作类型&#xff09;以及时间戳。
订单事件源代码
public class OrderEvent {public String userId; //用户IDpublic String orderId; //订单IDpublic String eventType; //事件类型&#xff08;操作类型&#xff09;public Long timestamp; //时间戳public OrderEvent() {}public OrderEvent(String userId, String orderId, String eventType, Long timestamp) {this.userId &#61; userId;this.orderId &#61; orderId;this.eventType &#61; eventType;this.timestamp &#61; timestamp;}&#64;Overridepublic String toString() {return "OrderEvent[" &#43;"userId&#61;&#39;" &#43; userId &#43; &#39;\&#39;&#39; &#43;"orderId&#61;&#39;" &#43; orderId &#43; &#39;\&#39;&#39; &#43;", eventType&#61;&#39;" &#43; eventType &#43; &#39;\&#39;&#39; &#43;", timestamp&#61;" &#43; timestamp &#43;&#39;]&#39;;}
}
当前需求的重点在于对超时未支付的用户进行监控提醒&#xff0c;也就是需要检测有下单行为、但15 分钟内没有支付行为的复杂事件。在下单和支付之间&#xff0c;可以有其他操作&#xff08;比如对订单的修改&#xff09;&#xff0c;所以两者之间是宽松近邻关系。
重点要处理的是超时的部分匹配事件。对原始的订单事件流按照订单 ID 进行分组&#xff0c;然后检测每个订单的“下单-支付”复杂事件&#xff0c;如果出现超时事件需要输出报警提示信息。
整体代码实现如下&#xff1a;
public class OrderTimeoutDetectExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env &#61; StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 1.获取订单事件流&#xff0c;并提取时间戳、生成水位线SingleOutputStreamOperator<OrderEvent> orderStream &#61; env.fromElements(new OrderEvent("user_1", "order_1", "create", 1000L),new OrderEvent("user_2", "order_2", "create", 2000L),new OrderEvent("user_1", "order_1", "modify", 10 * 1000L),new OrderEvent("user_1", "order_1", "pay", 60 * 1000L),new OrderEvent("user_2", "order_3", "create", 10 * 60 * 1000L),new OrderEvent("user_2", "order_3", "pay", 20 * 60 * 1000L)).assignTimestampsAndWatermarks(WatermarkStrategy.<OrderEvent>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner(new SerializableTimestampAssigner<OrderEvent>() {&#64;Overridepublic long extractTimestamp(OrderEvent element, long recordTimestamp) {return element.timestamp;}}));//2.定义模式Pattern<OrderEvent, OrderEvent> pattern &#61; Pattern.<OrderEvent>begin("create").where(new SimpleCondition<OrderEvent>() { //首先是下单事件&#64;Overridepublic boolean filter(OrderEvent value) throws Exception {return value.eventType.equals("create");}}).followedBy("pay").where(new SimpleCondition<OrderEvent>() { //其次是支付事件,中间可以修改订单,宽松近邻&#64;Overridepublic boolean filter(OrderEvent value) throws Exception {return value.eventType.equals("pay");}}).within(Time.minutes(15));// 要求在十五分钟之内完成//3.将应用模式应用到订单流上,检测匹配的复杂事件PatternStream<OrderEvent> patternStream &#61; CEP.pattern(orderStream.keyBy(event -> event.orderId), pattern);//4.定义一个侧输出流标签OutputTag<String> timeoutTag &#61; new OutputTag<String>("timeout"){};//5.将完全匹配和超时部分匹配的复杂事件提取出来&#xff0c;进行处理SingleOutputStreamOperator<String> result &#61; patternStream.process(new OrderPayMatch());// 将正常匹配和超时部分匹配的处理结果流打印输出result.print("正常支付");result.getSideOutput(timeoutTag).print("timeout");env.execute();}//实现自定义的 PatternProcessFunction&#xff0c;需实现 TimedOutPartialMatchHandler 接口public static class OrderPayMatch extends PatternProcessFunction<OrderEvent,String> implements TimedOutPartialMatchHandler<OrderEvent>{// 处理正常匹配事件&#64;Overridepublic void processMatch(Map<String, List<OrderEvent>> match, Context context, Collector<String> out) throws Exception {//获取当前的支付事件OrderEvent payEvent &#61; match.get("pay").get(0);out.collect("用户" &#43; payEvent.userId &#43; "的订单&#xff1a;" &#43; payEvent.orderId &#43; " 已支付&#xff01;");}//处理超时未支付事件&#64;Overridepublic void processTimedOutMatch(Map<String, List<OrderEvent>> match, Context context) throws Exception {OrderEvent createEvent &#61; match.get("create").get(0);OutputTag<String> timeoutTag &#61; new OutputTag<String>("timeout"){};context.output(timeoutTag,"用户" &#43; createEvent.userId &#43; "的订单 &#xff1a;" &#43; createEvent.orderId &#43; " 超时未支付&#xff01;");}}
}
运行代码&#xff0c;控制台打印结果如下&#xff1a;
订单 1 和订单 3 都在 15 分钟进行了支付&#xff0c;订单 1 中间的修改行为不会影响结果&#xff1b;而订单 2 未能支付&#xff0c;因此侧输出流输出了一条报警信息。且同一用户可以下多个订单&#xff0c;最后的判断只是基于同一订单做出的。这与我们预期的效果完全一致。用处理函数进行状态编程&#xff0c;结合定时器也可以实现同样的功能&#xff0c;但明显 CEP 的实现更加方便&#xff0c;也更容易迁移和扩展。
Gitee上的源代码
处理迟到数据CEP 主要处理的是先后发生的一组复杂事件&#xff0c;所以事件的顺序非常关键。事件先后顺序的具体定义与时间语义有关。如果是处理时间语义&#xff0c;那比较简单&#xff0c;只要按照数据处理的系统时间算就可以了&#xff1b;而如果是事件时间语义&#xff0c;需要按照事件自身的时间戳来排序。这就有可能出现时间戳大的事件先到、时间戳小的事件后到的现象&#xff0c;也就是所谓的“乱序数据” 或“迟到数据”。 在 Flink CEP 中沿用了通过设置水位线&#xff08;watermark&#xff09;延迟来处理乱序数据的做法。
当一个事件到来时&#xff0c;并不会立即做检测匹配处理&#xff0c;而是先放入一个缓冲区&#xff08;buffer&#xff09;。缓冲区内的数据&#xff0c;会按照时间戳由小到大排序&#xff1b;当一个水位线到来时&#xff0c;就会将缓冲区中所有时间戳小于水位线的事件依次取出&#xff0c;进行检测匹配。这样就保证了匹配事件的顺序和事件时间的进展一致&#xff0c;处理的顺序就一定是正确的。这里水位线的延迟时间&#xff0c;也就是事件在缓冲区等待的最大时间。
这样又会带来另一个问题&#xff1a;水位线延迟时间不可能保证将所有乱序数据完美包括进来&#xff0c;总会有一些事件延迟比较大&#xff0c;以至于等它到来的时候水位线早已超过了它的时间戳。这时之前的数据都已处理完毕&#xff0c;这样的“迟到数据”就只能被直接丢弃了——这与窗口对迟到数据的默认处理一致。
如果不希望迟到数据丢掉&#xff0c;应该也可以借鉴窗口的做法。Flink CEP 同样提 供 了 将 迟 到 事 件 输 出 到 侧 输 出 流 的 方 式 &#xff1a; 我 们 可 以 基 于 PatternStream 直接调用.sideOutputLateData()方法&#xff0c;传入一个 OutputTag&#xff0c;将迟到数据放入侧输出流另行处理。代码中调用方式如下&#xff1a;
PatternStream <Event> patternStream &#61; CEP.pattern(input, pattern);
// 定义一个侧输出流的标签
OutputTag <String> lateDataOutputTag &#61; new OutputTag <String> ("late-data") {};
// 将迟到数据输出到侧输出流
SingleOutputStreamOperator <ComplexEvent> result &#61; patternStream.sideOutputLateData(lateDataOutputTag) .select( // 处理正常匹配数据new PatternSelectFunction <Event, ComplexEvent> () {...});
// 从结果中提取侧输出流
DataStream <String> lateData &#61; result.getSideOutput(lateDataOutputTag);
可以看到&#xff0c;整个处理流程与窗口非常相似。经处理匹配数据得到结果数据流之后&#xff0c;可以调用.getSideOutput()方法来提取侧输出流&#xff0c;捕获迟到数据进行额外处理。