作者:伊利纯羊毛 | 来源:互联网 | 2023-08-07 18:58
文章目录1概念2使用2.1pom文件2.2使用2.3PatternAPI3案例3.1检测连续三次登录失败的事件3.2订单超时检测1概念一个或多个由简单事件构成的事件流通过一定的
文章目录 1 概念 2 使用 2.1 pom文件 2.2 使用 2.3 Pattern API 3 案例 3.1 检测连续三次登录失败的事件 3.2 订单超时检测
1 概念 一个或多个由简单事件构成的事件流通过一定的规则匹配,然后输出用户想得到的数据,满足规则的复杂事件。 特征: • 目标:从有序的简单事件流中发现一些高阶特征 • 输入:一个或多个由简单事件构成的事件流 • 处理:识别简单事件之间的内在联系,多个符合一定规则的简单事件构成复杂事件 • 输出:满足规则的复杂事件 在这里插入图片描述
2 使用 2.1 pom文件 < dependency> < groupId> org. apache. flink< / groupId> < artifactId> flink- cep- scala_${ scala. binary. version} < / artifactId> < version> ${ flink. version} < / version> < / dependency>
2.2 使用 Event Streams 登录事件流
case class LoginEvent( userId: String , ip: String , eventType: String , eventTime: String ) val env = StreamExecutionEnvironment. getExecutionEnvironment env. setStreamTimeCharacteristic( TimeCharacteristic. EventTime) env. setParallelism( 1 ) val loginEventStream = env. fromCollection( List( LoginEvent( "1" , "192.168.0.1" , "fail" , "1558430842" ) , LoginEvent( "1" , "192.168.0.2" , "fail" , "1558430843" ) , LoginEvent( "1" , "192.168.0.3" , "fail" , "1558430844" ) , LoginEvent( "2" , "192.168.10.10" , "success" , "1558430845" ) ) ) . assignAscendingTimestamps( _. eventTime. toLong * 1000 )
2.3 Pattern API 每个 Pattern 都应该包含几个步骤,或者叫做 state。从一个 state 到另一个 state,通常我们需要定义一些条件,例如下列的代码;
val loginFailPattern = Pattern. begin[ LoginEvent] ( "begin" ) . where( _. eventType. equals( "fail" ) ) . next( "next" ) . where( _. eventType. equals( "fail" ) ) . within( Time. seconds( 10 )
start:一个event where: 是过滤条件 next:下一个event(要紧挨着) followedBy : 下一个不要挨着 or: 或 within: 指定在某段时间内 我们也可以通过 subtype 来限制 event 的子类型:
start. subtype( SubEvent. class ) . where( . . . ) ;
然后就可以上在指定流上根据pattern 来过滤出来想要的数据流
val input = . . . val pattern = . . . 目录 163 val patternStream = CEP. pattern( input, pattern) val patternStream = CEP. pattern( loginEventStream. keyBy( _. userId) , loginFailPattern)
一旦获得 PatternStream,我们就可以通过 select 或 flatSelect,从一个 Map 序列找到我们需要的告警信息
使用select选出我们的数据
select 方法需要实现一个 PatternSelectFunction,通过 select 方法来输出需要的警告。它接受 一个 Map 对,包含 string/ event,其中 key 为 state 的名字, event 则为真是的 Event。val loginFailDataStream = patternStream. select( ( pattern: Map[ String , Iterable[ LoginEvent] ] ) => { val first = pattern. getOrElse( "begin" , null ) . iterator. next( ) val second = pattern. getOrElse( "next" , null ) . iterator. next( ) ( second. userId, second. ip, second. eventType) } ) 其返回值仅为 1 条记录。flatSelect通过实现 PatternFlatSelectFunction,实现与 select 相似的功能。唯一的区别就是 flatSelect 方法可以返回多条记录。
3 案例 3.1 检测连续三次登录失败的事件 package org. example. cepimport org. apache. flink. cep. scala. CEPimport org. apache. flink. cep. scala. pattern. Patternimport org. apache. flink. streaming. api. TimeCharacteristicimport org. apache. flink. streaming. api. scala. _import org. apache. flink. streaming. api. windowing. time. Timeimport scala. collection. Mapobject CepExample { case class LoginEvent( userId: String , ip: String , eventType: String , eventTime: Long ) def main( args: Array[ String ] ) : Unit = { val env = StreamExecutionEnvironment. getExecutionEnvironmentenv. setParallelism( 1 ) env. setStreamTimeCharacteristic( TimeCharacteristic. EventTime) val stream = env. fromElements( LoginEvent( "user_1" , "0.0.0.0" , "fail" , 1000L ) , LoginEvent( "user_1" , "0.0.0.1" , "fail" , 2000L ) , LoginEvent( "user_1" , "0.0.0.2" , "fail" , 3000L ) , LoginEvent( "user_2" , "0.0.0.0" , "success" , 4000L ) ) . assignAscendingTimestamps( _. eventTime) . keyBy( _. userId) val pattern = Pattern. begin[ LoginEvent] ( "first" ) . where( _. eventType. equals( "fail" ) ) . next( "second" ) . where( _. eventType. equals( "fail" ) ) . next( "third" ) . where( _. eventType. equals( "fail" ) ) . within( Time. seconds( 10 ) ) val patternedStream = CEP. pattern( stream, pattern) patternedStream. select( func) . print( ) env. execute( ) } val func = ( pattern: Map[ String , Iterable[ LoginEvent] ] ) => { val first = pattern. getOrElse( "first" , null ) . iterator. next( ) val second = pattern. getOrElse( "second" , null ) . iterator. next( ) val third = pattern. getOrElse( "third" , null ) . iterator. next( ) first. userId + "连续三次登录失败!" } }
3.2 订单超时检测 package test7import org. apache. flink. cep. scala. CEPimport org. apache. flink. cep. scala. pattern. Patternimport org. apache. flink. streaming. api. TimeCharacteristicimport org. apache. flink. streaming. api. scala. _import org. apache. flink. streaming. api. windowing. time. Timeimport org. apache. flink. util. Collectorimport scala. collection. Mapobject OrderTimeout { case class OrderEvent( orderId: String , eventType: String , eventTime: Long ) def main( args: Array[ String ] ) : Unit = { val env = StreamExecutionEnvironment. getExecutionEnvironmentenv. setStreamTimeCharacteristic( TimeCharacteristic. EventTime) env. setParallelism( 1 ) val stream = env. fromElements( OrderEvent( "order_1" , "create" , 2000L ) , OrderEvent( "order_2" , "create" , 3000L ) , OrderEvent( "order_2" , "pay" , 4000L ) ) . assignAscendingTimestamps( _. eventTime) . keyBy( _. orderId) val pattern = Pattern. begin[ OrderEvent] ( "create" ) . where( _. eventType. equals( "create" ) ) . next( "pay" ) . where( _. eventType. equals( "pay" ) ) . within( Time. seconds( 5 ) ) val patternedStream = CEP. pattern( stream, pattern) val orderTimeoutOutput = new OutputTag[ String ] ( "timeout" ) val timeoutFunc = ( map: Map[ String , Iterable[ OrderEvent] ] , ts: Long , out: Collector[ String ] ) => { println( "ts" + ts) val orderStart = map( "create" ) . headout. collect( orderStart. orderId + "没有支付!" ) } val selectFunc = ( map: Map[ String , Iterable[ OrderEvent] ] , out: Collector[ String ] ) => { val order = map( "pay" ) . headout. collect( order. orderId + "已经支付!" ) } val outputStream = patternedStream. flatSelect( orderTimeoutOutput) ( timeoutFunc) ( selectFunc) outputStream. print( ) outputStream. getSideOutput( new OutputTag[ String ] ( "timeout" ) ) . print( ) env. execute( ) } }