作者:201153蜡笔小新 | 来源:互联网 | 2023-10-10 10:01
连续两次登录失败importorg.apache.flink.cep.PatternSelectFunctionimportorg.apache.flink.cep.scala.{
连续两次登录失败
import org.apache.flink.cep.PatternSelectFunction
import org.apache.flink.cep.scala.{CEP, PatternStream}
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.streaming.api.scala._
import java.util
//001,192.168.0.1,fail,1646038440
case class Login(uid:String,ip:String,status:String,ts:Long)
/**
* https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/libs/cep/
*/
object LoginCEP {
def main(args: Array[String]): Unit = {
//获得流处理执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
//获取数据源 创建数据集
val stream: DataStream[String] = env.socketTextStream("hdp1", 9999)
val loginDS: DataStream[Login] = stream.map(line => {
val li: Array[String] = line.split(",")
Login(li(0), li(1), li(2), li(3).trim.toLong)
})
val timeDS: DataStream[Login] = loginDS.assignAscendingTimestamps(_.ts * 1000)
//1、输入数据
val eventDS: KeyedStream[Login, String] = timeDS.keyBy(_.uid)
//2、定义规则:连续两次登录失败
val pattern: Pattern[Login, Login] = Pattern
.begin[Login]("start")
.where(_.status.equals("fail"))
.next("next")
.where(_.status.equals("fail"))
//3、将规则作用到输入数据
val result: PatternStream[Login] = CEP.pattern(eventDS, pattern)
//4、输出
val alertDS: DataStream[String] = result.select(new MyPatterSelectFunction)
alertDS.print()
env.execute()
}
}
class MyPatterSelectFunction extends PatternSelectFunction[Login,String] {
override def select(map: util.Map[String, util.List[Login]]): String = {
//map得到的就是符合规则的数据
"连续两次登录失败警告:"+map
}
}
分流
import com.alibaba.fastjson.JSON
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.cep.{PatternSelectFunction, PatternTimeoutFunction}
import org.apache.flink.cep.scala.{CEP, PatternStream}
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, FlinkKafkaProducer}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import java.util
import java.util.Properties
import scala.util.parsing.json.JSON
object t1 {
def main(args: Array[String]): Unit = {
//Flink 流 处理环境
val env: StreamExecutiOnEnvironment= StreamExecutionEnvironment.getExecutionEnvironment
//设置并行度为1
env.setParallelism(1)
//指定事件时间 已经过时的方法
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
//kafka
val properties = new Properties()
properties.setProperty("bootstrap.servers", "hdp1:9092")
properties.setProperty("group.id", "test")
val stream = env
.addSource(new FlinkKafkaConsumer[String]("dwd_page_log", new SimpleStringSchema(), properties))
val v1: DataStream[Stu] = stream.map(x => {
val ss: Array[String] = x.split(",")
Stu(scala.util.parsing.json.JSON.formatted(ss(0)),scala.util.parsing.json.JSON.formatted(ss(1)),ss(2).trim.toLong*1000L)
})
//指定时间 并分组
val ks: KeyedStream[Stu, String] = v1.assignAscendingTimestamps(_.time).keyBy(_.str)
//定义cep模式序列开始没有last_page_id的数据
//定义cep模式序列next没有last_page_id的数据
//定义cep模式序列时间设置10S内
val pattern: Pattern[Stu, Stu] = Pattern
.begin[Stu]("start")
.where(!_.str.equals("last_page_id"))
.next("next")
.where(!_.str2.equals("last_page_id"))
.within(Time.seconds(10))
//讲模式序列添加到流中
val ps: PatternStream[Stu] = CEP.pattern(ks, pattern)
//创建超时标签
val lag = new OutputTag[String]("outlag")
//使用select算子给跳出数据输出到测流
val ds: DataStream[String] = ps.select(lag, new MyTime, new MySel)
//获取跳出用户流
ds.print("跳出用户")
ds.getSideOutput(lag).print("超时:")
//跳出用户流落地到kafka的dwm_user_jump_detail
val value = new FlinkKafkaProducer[String]("dwm_user_jump_detail", new SimpleStringSchema(), properties)
ds.addSink(value)
env.execute()
}
}
//实体类
case class Stu(str: String,str2:String, time: Long)
//处理select 输出
class MyTime extends PatternTimeoutFunction[(Stu),(String)] {
override def timeout(map: util.Map[String, util.List[Stu]], l: Long): String = {
val value: String = map.get("start").get(0).str
"id" + 1 + "用户id:" + value +"超时"
}
}
class MySel extends PatternSelectFunction[(Stu),(String)] {
override def select(map: util.Map[String, util.List[(Stu)]]): String = {
map.toString
}
}
我有一杯酒,足以慰风尘。