val loginFailPattern = Pattern .begin[LoginEvent]("fail").where(_.eventType =="fail").times(3).consecutive() .within(Time.seconds(5)) //2.将模式应用到数据流上,得到一个PattenStream val patternStream = CEP.pattern(loginEventStream.keyBy(_.userId), loginFailPattern) //3.检出符合模式的数据流,需要调用select val loginFailWarningStream = patternStream.select(new LoginFailEventMatch())
//实现自定义PatternSelectFunction class LoginFailEventMatch()extends PatternSelectFunction[LoginEvent,LoginFailWarning]{ overridedef select(pattern: util.Map[String, util.List[LoginEvent]]): LoginFailWarning ={ //当前匹配到的事件序列,就保存在Map里 // val firstFailEvent = pattern.get("firstFail").iterator().next() // val secOndFailEvent= pattern.get("secondFail").get(0) val iter = pattern.get("fail").iterator() val firstFailEvent = iter.next() val secondFailEvent = iter.next() val thirdFailEvent = iter.next() LoginFailWarning(firstFailEvent.userId,firstFailEvent.timestamp,thirdFailEvent.timestamp,"login fail") } }