热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

FlinkCEP简单应用

连续两次登录失败importorg.apache.flink.cep.PatternSelectFunctionimportorg.apache.flink.cep.scala.{

连续两次登录失败

import org.apache.flink.cep.PatternSelectFunction
import org.apache.flink.cep.scala.{CEP, PatternStream}
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.streaming.api.scala._
import java.util
//001,192.168.0.1,fail,1646038440
case class Login(uid:String,ip:String,status:String,ts:Long)
/**
*
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/libs/cep/
*/
object LoginCEP {
def main(args: Array[String]): Unit
= {
//获得流处理执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(
1)
//获取数据源 创建数据集
val stream: DataStream[String] = env.socketTextStream("hdp1", 9999)
val loginDS: DataStream[Login]
= stream.map(line => {
val li: Array[String]
= line.split(",")
Login(li(
0), li(1), li(2), li(3).trim.toLong)
})
val timeDS: DataStream[Login]
= loginDS.assignAscendingTimestamps(_.ts * 1000)
//1、输入数据
val eventDS: KeyedStream[Login, String] = timeDS.keyBy(_.uid)
//2、定义规则:连续两次登录失败
val pattern: Pattern[Login, Login] = Pattern
.begin[Login](
"start")
.
where(_.status.equals("fail"))
.next(
"next")
.
where(_.status.equals("fail"))
//3、将规则作用到输入数据
val result: PatternStream[Login] = CEP.pattern(eventDS, pattern)
//4、输出
val alertDS: DataStream[String] = result.select(new MyPatterSelectFunction)
alertDS.print()
env.execute()
}
}
class MyPatterSelectFunction extends PatternSelectFunction[Login,String] {
override def select(map: util.Map[String, util.List[Login]]): String = {
//map得到的就是符合规则的数据
"连续两次登录失败警告:"+map
}
}

 


分流

import com.alibaba.fastjson.JSON
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.cep.{PatternSelectFunction, PatternTimeoutFunction}
import org.apache.flink.cep.scala.{CEP, PatternStream}
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, FlinkKafkaProducer}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import java.util
import java.util.Properties
import scala.util.parsing.json.JSON
object t1 {
def main(args: Array[String]): Unit
= {
//Flink 流 处理环境
val env: StreamExecutiOnEnvironment= StreamExecutionEnvironment.getExecutionEnvironment
//设置并行度为1
env.setParallelism(1)
//指定事件时间 已经过时的方法
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
//kafka
val properties = new Properties()
properties.setProperty(
"bootstrap.servers", "hdp1:9092")
properties.setProperty(
"group.id", "test")
val stream
= env
.addSource(
new FlinkKafkaConsumer[String]("dwd_page_log", new SimpleStringSchema(), properties))
val v1: DataStream[Stu]
= stream.map(x => {
val ss: Array[String]
= x.split(",")
Stu(scala.util.parsing.json.JSON.formatted(ss(
0)),scala.util.parsing.json.JSON.formatted(ss(1)),ss(2).trim.toLong*1000L)
})
//指定时间 并分组
val ks: KeyedStream[Stu, String] = v1.assignAscendingTimestamps(_.time).keyBy(_.str)
//定义cep模式序列开始没有last_page_id的数据
//定义cep模式序列next没有last_page_id的数据
//定义cep模式序列时间设置10S内
val pattern: Pattern[Stu, Stu] = Pattern
.begin[Stu](
"start")
.
where(!_.str.equals("last_page_id"))
.next(
"next")
.
where(!_.str2.equals("last_page_id"))
.within(Time.seconds(
10))
//讲模式序列添加到流中
val ps: PatternStream[Stu] = CEP.pattern(ks, pattern)
//创建超时标签
val lag = new OutputTag[String]("outlag")
//使用select算子给跳出数据输出到测流
val ds: DataStream[String] = ps.select(lag, new MyTime, new MySel)
//获取跳出用户流
ds.print("跳出用户")
ds.getSideOutput(lag).print(
"超时:")
//跳出用户流落地到kafka的dwm_user_jump_detail
val value = new FlinkKafkaProducer[String]("dwm_user_jump_detail", new SimpleStringSchema(), properties)
ds.addSink(value)
env.execute()
}
}
//实体类
case class Stu(str: String,str2:String, time: Long)
//处理select 输出
class MyTime extends PatternTimeoutFunction[(Stu),(String)] {
override def timeout(map: util.Map[String, util.List[Stu]], l: Long): String = {
val value: String
= map.get("start").get(0).str
"id" + 1 + "用户id:" + value +"超时"
}
}
class MySel extends PatternSelectFunction[(Stu),(String)] {
override def select(map: util.Map[String, util.List[(Stu)]]): String = {
map.toString
}
}

 

我有一杯酒,足以慰风尘。



推荐阅读
author-avatar
201153蜡笔小新
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有