热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

十三FlinkCEP

文章目录1概念2使用2.1pom文件2.2使用2.3PatternAPI3案例3.1检测连续三次登录失败的事件3.2订单超时检测1概念一个或多个由简单事件构成的事件流通过一定的

文章目录

  • 1 概念
  • 2 使用
    • 2.1 pom文件
    • 2.2 使用
    • 2.3 Pattern API
  • 3 案例
    • 3.1 检测连续三次登录失败的事件
    • 3.2 订单超时检测


1 概念

一个或多个由简单事件构成的事件流通过一定的规则匹配,然后输出用户想得到的数据,满足规则的复杂事件。
特征:
• 目标:从有序的简单事件流中发现一些高阶特征
• 输入:一个或多个由简单事件构成的事件流
• 处理:识别简单事件之间的内在联系,多个符合一定规则的简单事件构成复杂事件
• 输出:满足规则的复杂事件
在这里插入图片描述
在这里插入图片描述


2 使用


2.1 pom文件

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>

2.2 使用

Event Streams
登录事件流

case class LoginEvent(userId: String,ip: String,eventType: String,eventTime: String
)
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
val loginEventStream = env.fromCollection(List(LoginEvent("1", "192.168.0.1", "fail", "1558430842"),LoginEvent("1", "192.168.0.2", "fail", "1558430843"),LoginEvent("1", "192.168.0.3", "fail", "1558430844"),LoginEvent("2", "192.168.10.10", "success", "1558430845")
))
.assignAscendingTimestamps(_.eventTime.toLong * 1000)

2.3 Pattern API

每个 Pattern 都应该包含几个步骤,或者叫做 state。从一个 state 到另一个 state,通常我们需要定义一些条件,例如下列的代码;

/**
选出在10秒内连续两个两次登录失败
*/

val loginFailPattern = Pattern.begin[LoginEvent]("begin") //第一个event
.where(_.eventType.equals("fail"))
.next("next") //第二个event
.where(_.eventType.equals("fail"))
.within(Time.seconds(10)

start:一个event
where: 是过滤条件
next:下一个event(要紧挨着)
followedBy : 下一个不要挨着
or: 或
within: 指定在某段时间内
我们也可以通过 subtype 来限制 event 的子类型:

start.subtype(SubEvent.class).where(...);

然后就可以上在指定流上根据pattern 来过滤出来想要的数据流

val input = ...
val pattern = ...目录 163
val patternStream = CEP.pattern(input, pattern)
val patternStream = CEP
.pattern(
loginEventStream.keyBy(_.userId), loginFailPattern
)

一旦获得 PatternStream,我们就可以通过 select 或 flatSelect,从一个 Map 序列找到我们需要的告警信息

使用select选出我们的数据

select 方法需要实现一个 PatternSelectFunction,通过 select 方法来输出需要的警告。它接受
一个 Map 对,包含 string/event,其中 key 为 state 的名字, event 则为真是的 Event。
val loginFailDataStream = patternStream
.select((pattern: Map[String, Iterable[LoginEvent]]) => {
val first = pattern.getOrElse("begin", null).iterator.next()
val second = pattern.getOrElse("next", null).iterator.next()
(second.userId, second.ip, second.eventType)
})
其返回值仅为 1 条记录。flatSelect通过实现 PatternFlatSelectFunction,实现与 select 相似的功能。唯一的区别就是 flatSelect 方法可以返回多条记录。

3 案例


3.1 检测连续三次登录失败的事件

package org.example.cepimport org.apache.flink.cep.scala.CEP
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Timeimport scala.collection.Map// 检测连续三次登录失败的事件
object CepExample {case class LoginEvent(userId: String, ip: String, eventType: String, eventTime: Long)def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)val stream = env.fromElements(LoginEvent("user_1", "0.0.0.0", "fail", 1000L),LoginEvent("user_1", "0.0.0.1", "fail", 2000L),LoginEvent("user_1", "0.0.0.2", "fail", 3000L),LoginEvent("user_2", "0.0.0.0", "success", 4000L)).assignAscendingTimestamps(_.eventTime).keyBy(_.userId)// 定义需要匹配的模板val pattern = Pattern.begin[LoginEvent]("first").where(_.eventType.equals("fail")).next("second").where(_.eventType.equals("fail")).next("third").where(_.eventType.equals("fail")).within(Time.seconds(10)) // 10s之内连续三次登录失败// 第一个参数:需要匹配的流,第二个参数:模板val patternedStream = CEP.pattern(stream, pattern)patternedStream.select(func).print()env.execute()}// 注意匿名函数的类型val func = (pattern: Map[String, Iterable[LoginEvent]]) => {val first = pattern.getOrElse("first", null).iterator.next()val second = pattern.getOrElse("second", null).iterator.next()val third = pattern.getOrElse("third", null).iterator.next()first.userId + "连续三次登录失败!"}
}

3.2 订单超时检测

package test7import org.apache.flink.cep.scala.CEP
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collectorimport scala.collection.Mapobject OrderTimeout {case class OrderEvent(orderId: String, eventType: String, eventTime: Long)def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)env.setParallelism(1)val stream = env.fromElements(OrderEvent("order_1", "create", 2000L),OrderEvent("order_2", "create", 3000L),OrderEvent("order_2", "pay", 4000L)).assignAscendingTimestamps(_.eventTime).keyBy(_.orderId)val pattern = Pattern.begin[OrderEvent]("create").where(_.eventType.equals("create")).next("pay").where(_.eventType.equals("pay")).within(Time.seconds(5))val patternedStream = CEP.pattern(stream, pattern)// 用来输出超时订单的侧输出标签val orderTimeoutOutput = new OutputTag[String]("timeout")// 用来处理超时订单的函数val timeoutFunc = (map: Map[String, Iterable[OrderEvent]], ts: Long, out: Collector[String]) => {println("ts" + ts) // 2s + 5sval orderStart = map("create").head// 将报警信息发送到侧输出流去out.collect(orderStart.orderId + "没有支付!")}val selectFunc = (map: Map[String, Iterable[OrderEvent]], out: Collector[String]) => {val order = map("pay").headout.collect(order.orderId + "已经支付!")}val outputStream = patternedStream// 第一个参数:用来输出超时事件的侧输出标签// 第二个参数:用来输出超时事件的函数// 第三个参数:用来输出没有超时的事件的函数.flatSelect(orderTimeoutOutput)(timeoutFunc)(selectFunc)outputStream.print()outputStream.getSideOutput(new OutputTag[String]("timeout")).print()env.execute()}
}

推荐阅读
  • 2018年9月21日,Destoon官方发布了安全更新,修复了一个由用户“索马里的海贼”报告的前端GETShell漏洞。该漏洞存在于20180827版本的某CMS中,攻击者可以通过构造特定的HTTP请求,利用该漏洞在服务器上执行任意代码,从而获得对系统的控制权。此次更新建议所有用户尽快升级至最新版本,以确保系统的安全性。 ... [详细]
  • 如何在Java中使用DButils类
    这期内容当中小编将会给大家带来有关如何在Java中使用DButils类,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。D ... [详细]
  • 在JavaWeb开发中,文件上传是一个常见的需求。无论是通过表单还是其他方式上传文件,都必须使用POST请求。前端部分通常采用HTML表单来实现文件选择和提交功能。后端则利用Apache Commons FileUpload库来处理上传的文件,该库提供了强大的文件解析和存储能力,能够高效地处理各种文件类型。此外,为了提高系统的安全性和稳定性,还需要对上传文件的大小、格式等进行严格的校验和限制。 ... [详细]
  • 本文介绍了如何利用Struts1框架构建一个简易的四则运算计算器。通过采用DispatchAction来处理不同类型的计算请求,并使用动态Form来优化开发流程,确保代码的简洁性和可维护性。同时,系统提供了用户友好的错误提示,以增强用户体验。 ... [详细]
  • 本文详细介绍了在CentOS 6.5 64位系统上使用阿里云ECS服务器搭建LAMP环境的具体步骤。首先,通过PuTTY工具实现远程连接至服务器。接着,检查当前系统的磁盘空间使用情况,确保有足够的空间进行后续操作,可使用 `df` 命令进行查看。此外,文章还涵盖了安装和配置Apache、MySQL和PHP的相关步骤,以及常见问题的解决方法,帮助用户顺利完成LAMP环境的搭建。 ... [详细]
  • Spring Boot 中配置全局文件上传路径并实现文件上传功能
    本文介绍如何在 Spring Boot 项目中配置全局文件上传路径,并通过读取配置项实现文件上传功能。通过这种方式,可以更好地管理和维护文件路径。 ... [详细]
  • com.sun.javadoc.PackageDoc.exceptions()方法的使用及代码示例 ... [详细]
  • 解决Only fullscreen opaque activities can request orientation错误的方法
    本文介绍了在使用PictureSelectorLight第三方框架时遇到的Only fullscreen opaque activities can request orientation错误,并提供了一种有效的解决方案。 ... [详细]
  • 开发日志:高效图片压缩与上传技术解析 ... [详细]
  • 在PHP中如何正确调用JavaScript变量及定义PHP变量的方法详解 ... [详细]
  • 在《Cocos2d-x学习笔记:基础概念解析与内存管理机制深入探讨》中,详细介绍了Cocos2d-x的基础概念,并深入分析了其内存管理机制。特别是针对Boost库引入的智能指针管理方法进行了详细的讲解,例如在处理鱼的运动过程中,可以通过编写自定义函数来动态计算角度变化,利用CallFunc回调机制实现高效的游戏逻辑控制。此外,文章还探讨了如何通过智能指针优化资源管理和避免内存泄漏,为开发者提供了实用的编程技巧和最佳实践。 ... [详细]
  • 服务器部署中的安全策略实践与优化
    服务器部署中的安全策略实践与优化 ... [详细]
  • 为了在Hadoop 2.7.2中实现对Snappy压缩和解压功能的原生支持,本文详细介绍了如何重新编译Hadoop源代码,并优化其Native编译过程。通过这一优化,可以显著提升数据处理的效率和性能。此外,还探讨了编译过程中可能遇到的问题及其解决方案,为用户提供了一套完整的操作指南。 ... [详细]
  • 本文深入探讨了如何利用Maven高效管理项目中的外部依赖库。通过介绍Maven的官方依赖搜索地址(),详细讲解了依赖库的添加、版本管理和冲突解决等关键操作。此外,还提供了实用的配置示例和最佳实践,帮助开发者优化项目构建流程,提高开发效率。 ... [详细]
  • Kafka 是由 Apache 软件基金会开发的高性能分布式消息系统,支持高吞吐量的发布和订阅功能,主要使用 Scala 和 Java 编写。本文将深入解析 Kafka 的安装与配置过程,为程序员提供详尽的操作指南,涵盖从环境准备到集群搭建的每一个关键步骤。 ... [详细]
author-avatar
伊利纯羊毛
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有