技术方案:Spark、kafka、opentsdb、Yahoo的egads
模型静态训练:采用两种算法进行模型的训练:指数移动平均和HotWinters,模型一天训练一次,即每天0点开始训练,每天凌晨0:5分根据训练好的模型进行异常检测,具体包括点的预测以及点的异常检测;
模型实时训练:HotWinters根据3个指标进行预测,其中两个可以进行静态的训练,另外一个指标在进行异常检测之后要 对模型进行实时的训练;对模型进行实时训练就要求每个批次的数据训练的结果做一个有状态的保存,经过调查采用mapWithState算子进行实现;
模型的训练流程为 根据 配置的要训练的指标从opentsdb时序数据库中读取数据,然后调用opentsdb的接口进行训练,把训练好的模型保存到相对应的路径;
模型预存流程为 根据要预测的指标通过spark streamimg从kafka实时读取数据,读取的点首先进行预测,然后在进行异常检测;当模型不进行实时训练时,模型广播到excutor端进行优化,当需要对模型进行实时训练时间,直接把模型包装成RDD即可,代码如下ModelUpdateOnline类所示
思考过程:对模型进行实时训练问题,由于spark的算子与egads不兼容,每个批次预测之后,再DStream.foreachRDD方法中,把RDD的数据collect到driver端,然后,再进行模型的训练,训练之后 再广播到各个executor,这样每个批次都要进行广播,当广播模型比较多时,网络开销特别大;通过调研采用mapWithState来保证增量更新的状态,优势,不需要每批次模型被增量更新后都要存储到redis,下一个批次再从redis读取数据,这样网络开销也比较大。
优化点:1)目前这些配置文件和训练好的模型都在服务器本地文件系统中,后续把这些文件放到hdfs上面以保证spark程序在预测时间能够driver模式;
2)模型训练当模型比较多时间,由于采用单线程 性能是一个瓶颈。
遇到的问题:1)模型太多,要同时训练多个模型;2)动态训练模型时,要能保证模型更新的状态(调研后,采用mapWithState算子)
package com.tingyun.mlpredict.done
import com.networkbench.avro.cache.ZookeeperAvroSchemaPersister
import com.networkbench.avro.serialize.AvroMessageDecoder
import com.networkbench.newlens.datacollector.backend.aggregate.wrappedmessage.own.MonitorWrappedMessage
import kafka.serializer.{DefaultDecoder, StringDecoder}
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming._
import com.yahoo.egads.control.ModelAdapter
import com.yahoo.egads.data.TimeSeries
object ModelUpdateOnline{
def main(args: Array[String]) {
val sparkCOnf= new SparkConf().setAppName("StreamingAnomalyDetector")
val ssc = new StreamingContext(sparkConf, Minutes(1))
val sc = ssc.sparkContext
ssc.checkpoint("E:\\tmp")
//val mAdapter = Egads.loadModel("E:\\andy_ty\\work\\ml_egads\\anomolydetection\\src\\main\\resources\\mem\\2017-08-30_127082_2897_TripleExponentialSmoothingModel")
//val initialRDD = ssc.sparkContext.parallelize(List[(String, ModelAdapter)](("127287_-1",TestModel()),("127287_3272",mAdapter),("127116_-1",mAdapter),("126887_2552",mAdapter),("127082_2897",mAdapter)))
val initialRDD = List[(String,TestModel)](("127287_-1",TestModel(Seq[MonitorWrappedMessage]())),("127287_3272",TestModel(Seq[MonitorWrappedMessage]())),("127116_-1",TestModel(Seq[MonitorWrappedMessage]())),("126887_2552",TestModel(Seq[MonitorWrappedMessage]())))
var initialRddBC = sc.broadcast(initialRDD)
val numThreads = "2"
val topics = "alarm-detect-streaming"
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
val kafkaParams = Map[String, String]("zookeeper.connect" -> "10.194.1.2:2181,10.194.1.12:2181,10.194.1.13:2181", "group.id" -> "group01","zookeeper.connection.timeout.ms" -> "10000")
val monitorWrappedMessage1 = KafkaUtils.createStream[String, Array[Byte], StringDecoder, DefaultDecoder](
ssc, kafkaParams, topicMap, StorageLevel.MEMORY_AND_DISK_SER_2).mapPartitions( partitiOns=> {
val zookeeperAvroSchemaPersister = new ZookeeperAvroSchemaPersister
zookeeperAvroSchemaPersister.setServers("10.194.1.2:2181")
zookeeperAvroSchemaPersister.setConnectionTimeout(10000)
zookeeperAvroSchemaPersister.init()
val avroMessageDecoder = new AvroMessageDecoder
avroMessageDecoder.setAvroMessageEntityPackageToScan("com.networkbench.newlens.datacollector.backend.aggregate.wrappedmessage.own")
avroMessageDecoder.setAvroSchemaPersister(zookeeperAvroSchemaPersister)
val mWMessage = partitions.map(line => avroMessageDecoder.decode(line._2).asInstanceOf[MonitorWrappedMessage]).toList
zookeeperAvroSchemaPersister.destroy() // 关闭zk链接
mWMessage.toIterator
})
monitorWrappedMessage1.print(100)
val mOnitorWrappedMessage= monitorWrappedMessage1.map(mmm => (mmm.getApplicationId + "_" + mmm.getApplicationInstanceId,mmm))
/* val params = Map("bootstrap.servers" -> "master:9092", "group.id" -> "scala-stream-group")
val topic = Set("test")
val initialRDD = ssc.sparkContext.parallelize(List[(String, Int)]())
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, params, topic)
val word = messages.flatMap(_._2.split(" ")).map { x => (x, 1) }*/
//自定义mappingFunction,累加单词出现的次数并更新状态
def mappingFuncDemo(word: String, monitorWrappedMessage: Option[MonitorWrappedMessage], state: State[ModelAdapter]):Option[(String,ModelAdapter)] = {
/*state.get.update(new TimeSeries.DataSequence(monitorWrappedMessage.get.getTimestamp,1f))
state.update(state.get())*/
/* val preMA = state.getOption().getOrElse(new ModelAdapter())
preMA.update(new TimeSeries.DataSequence(monitorWrappedMessage.get.getApplicationId,1f))
state.update(preMA)
val output = (word, preMA)
Some(output)*/
val preMA = state.getOption()
var ma = new ModelAdapter()
preMA match{
case Some(modelAdapter) =>{ println(modelAdapter.firstTimeStamp + "==111=" + monitorWrappedMessage.get.getApplicationId);
ma = preMA.get
}
case _ =>{
println( "=222==" + monitorWrappedMessage.get.getApplicationId);
}
}
/*preMA.update(new TimeSeries.DataSequence(monitorWrappedMessage.get.getApplicationId,1f))
state.update(preMA)*/
val output = (word, ma)
ma.update(new TimeSeries.DataSequence(monitorWrappedMessage.get.getApplicationId,1f))
Some(output)
}
//word来自于DStream中的key,monitorWrappedMessage来自于DStream中的value,state参数来自于initialState初始化的RDD,当不初始化则来自于 创建的默认空置(即val existingEvents: Seq[MonitorWrappedMessage] = state.getOption().map(_.monitorWrappedMessages.getOrElse(Seq[MonitorWrappedMessage]()))
//initialState初始化的RDD为prevRDD,当前批次为currentRDD;当没有通过initialState初始化的RDD时,则prevRDD为新创建的 空对象。
def mappingFunc(word: String, monitorWrappedMessage: Option[MonitorWrappedMessage], state: State[TestModel]):Option[(String,TestModel)] = {
val preMA = state.getOption()
//var ma = new TestModel(Seq[MonitorWrappedMessage]())
preMA match{
case Some(testModel) =>{
//println(monitorWrappedMessage.get.getApplicationId +"==111=="+ testModel.monitorWrappedMessages );
val testModelnew = TestModel(monitorWrappedMessage.get +: testModel.monitorWrappedMessages)
state.update(testModelnew)
Some((word,testModelnew))
}
case _ =>{
//println( "=222==" + monitorWrappedMessage.get.getApplicationId);
None;
}
}
/* val existingEvents: Seq[MonitorWrappedMessage] = state.getOption().map(_.monitorWrappedMessages)
.getOrElse(Seq[MonitorWrappedMessage]()) 当没有初始化RDD时则创建默认值*/
/* val testModel = TestModel(monitorWrappedMessage.get +: existingEvents)
state.update(testModel)*/
//Some((word,ma))
}
// 当initialState 初始化,第一个批次会从 这个实例好的rdd 对应的 map中根据key(就是word,来自于 上一个DSTream中的key)取值,并执行 mappingFunc 中的业务逻辑;
// 当没有通过initialState 初始化,在创建时间要添加
//调用mapWithState进行管理流数据的状态
val stateDstream = monitorWrappedMessage.mapWithState(StateSpec.function(mappingFunc _).initialState(sc.parallelize(initialRDD)).timeout(Minutes(5))).map(
ll => {ll match {
case Some(test) =>{test._1 +"===33333=="+ test._2.monitorWrappedMessages}
case _ => {"======NODATA======="}
}
}
).print()
ssc.start()
ssc.awaitTermination()
}
}