热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

基于时间序列的异常检测系统的实现思路之一

技术方案:Spark、kafka、opentsdb、Yahoo的egads模型静态训练:采用两种算法进行模型的训练:指数移动平均和HotWinters,模型一天训练一次,即每天0点开始训练,
技术方案:Spark、kafka、opentsdb、Yahoo的egads
模型静态训练:采用两种算法进行模型的训练:指数移动平均和HotWinters,模型一天训练一次,即每天0点开始训练,每天凌晨0:5分根据训练好的模型进行异常检测,具体包括点的预测以及点的异常检测;
模型实时训练:HotWinters根据3个指标进行预测,其中两个可以进行静态的训练,另外一个指标在进行异常检测之后要 对模型进行实时的训练;对模型进行实时训练就要求每个批次的数据训练的结果做一个有状态的保存,经过调查采用mapWithState算子进行实现;


模型的训练流程为  根据 配置的要训练的指标从opentsdb时序数据库中读取数据,然后调用opentsdb的接口进行训练,把训练好的模型保存到相对应的路径;
模型预存流程为 根据要预测的指标通过spark streamimg从kafka实时读取数据,读取的点首先进行预测,然后在进行异常检测;当模型不进行实时训练时,模型广播到excutor端进行优化,当需要对模型进行实时训练时间,直接把模型包装成RDD即可,代码如下ModelUpdateOnline类所示


思考过程:对模型进行实时训练问题,由于spark的算子与egads不兼容,每个批次预测之后,再DStream.foreachRDD方法中,把RDD的数据collect到driver端,然后,再进行模型的训练,训练之后 再广播到各个executor,这样每个批次都要进行广播,当广播模型比较多时,网络开销特别大;通过调研采用mapWithState来保证增量更新的状态,优势,不需要每批次模型被增量更新后都要存储到redis,下一个批次再从redis读取数据,这样网络开销也比较大。


优化点:1)目前这些配置文件和训练好的模型都在服务器本地文件系统中,后续把这些文件放到hdfs上面以保证spark程序在预测时间能够driver模式;

2)模型训练当模型比较多时间,由于采用单线程  性能是一个瓶颈。


遇到的问题:1)模型太多,要同时训练多个模型;2)动态训练模型时,要能保证模型更新的状态(调研后,采用mapWithState算子)



package com.tingyun.mlpredict.done




import com.networkbench.avro.cache.ZookeeperAvroSchemaPersister
import com.networkbench.avro.serialize.AvroMessageDecoder
import com.networkbench.newlens.datacollector.backend.aggregate.wrappedmessage.own.MonitorWrappedMessage
import kafka.serializer.{DefaultDecoder, StringDecoder}
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming._
import com.yahoo.egads.control.ModelAdapter
import com.yahoo.egads.data.TimeSeries


object ModelUpdateOnline{




  def main(args: Array[String]) {


    val sparkCOnf= new SparkConf().setAppName("StreamingAnomalyDetector")
    val ssc = new StreamingContext(sparkConf, Minutes(1))
    val sc = ssc.sparkContext
    ssc.checkpoint("E:\\tmp")


    //val mAdapter = Egads.loadModel("E:\\andy_ty\\work\\ml_egads\\anomolydetection\\src\\main\\resources\\mem\\2017-08-30_127082_2897_TripleExponentialSmoothingModel")
    //val initialRDD = ssc.sparkContext.parallelize(List[(String, ModelAdapter)](("127287_-1",TestModel()),("127287_3272",mAdapter),("127116_-1",mAdapter),("126887_2552",mAdapter),("127082_2897",mAdapter)))
    val initialRDD = List[(String,TestModel)](("127287_-1",TestModel(Seq[MonitorWrappedMessage]())),("127287_3272",TestModel(Seq[MonitorWrappedMessage]())),("127116_-1",TestModel(Seq[MonitorWrappedMessage]())),("126887_2552",TestModel(Seq[MonitorWrappedMessage]())))
    var initialRddBC = sc.broadcast(initialRDD)
    val numThreads = "2"
    val topics = "alarm-detect-streaming"
    val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
    val kafkaParams = Map[String, String]("zookeeper.connect" -> "10.194.1.2:2181,10.194.1.12:2181,10.194.1.13:2181", "group.id" -> "group01","zookeeper.connection.timeout.ms" -> "10000")
    val monitorWrappedMessage1 =  KafkaUtils.createStream[String,  Array[Byte], StringDecoder, DefaultDecoder](
      ssc, kafkaParams, topicMap, StorageLevel.MEMORY_AND_DISK_SER_2).mapPartitions( partitiOns=> {
      val zookeeperAvroSchemaPersister = new ZookeeperAvroSchemaPersister
      zookeeperAvroSchemaPersister.setServers("10.194.1.2:2181")
      zookeeperAvroSchemaPersister.setConnectionTimeout(10000)
      zookeeperAvroSchemaPersister.init()
      val avroMessageDecoder = new AvroMessageDecoder
      avroMessageDecoder.setAvroMessageEntityPackageToScan("com.networkbench.newlens.datacollector.backend.aggregate.wrappedmessage.own")
      avroMessageDecoder.setAvroSchemaPersister(zookeeperAvroSchemaPersister)
      val mWMessage = partitions.map(line => avroMessageDecoder.decode(line._2).asInstanceOf[MonitorWrappedMessage]).toList
      zookeeperAvroSchemaPersister.destroy()  //  关闭zk链接
      mWMessage.toIterator
    })
    monitorWrappedMessage1.print(100)
   val mOnitorWrappedMessage= monitorWrappedMessage1.map(mmm => (mmm.getApplicationId + "_" + mmm.getApplicationInstanceId,mmm))
   /* val params = Map("bootstrap.servers" -> "master:9092", "group.id" -> "scala-stream-group")
    val topic = Set("test")
    val initialRDD = ssc.sparkContext.parallelize(List[(String, Int)]())
    val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, params, topic)
    val word = messages.flatMap(_._2.split(" ")).map { x => (x, 1) }*/
    //自定义mappingFunction,累加单词出现的次数并更新状态
   def mappingFuncDemo(word: String, monitorWrappedMessage: Option[MonitorWrappedMessage], state: State[ModelAdapter]):Option[(String,ModelAdapter)] =  {
      /*state.get.update(new TimeSeries.DataSequence(monitorWrappedMessage.get.getTimestamp,1f))
      state.update(state.get())*/
     /* val preMA = state.getOption().getOrElse(new ModelAdapter())
      preMA.update(new TimeSeries.DataSequence(monitorWrappedMessage.get.getApplicationId,1f))
      state.update(preMA)
      val output = (word, preMA)
      Some(output)*/
     val preMA = state.getOption()
      var ma = new ModelAdapter()
      preMA match{
        case Some(modelAdapter) =>{ println(modelAdapter.firstTimeStamp + "==111=" + monitorWrappedMessage.get.getApplicationId);
          ma = preMA.get
                 }
        case _ =>{
          println( "=222==" + monitorWrappedMessage.get.getApplicationId);
        }
      }
      /*preMA.update(new TimeSeries.DataSequence(monitorWrappedMessage.get.getApplicationId,1f))
      state.update(preMA)*/
      val output = (word, ma)
      ma.update(new TimeSeries.DataSequence(monitorWrappedMessage.get.getApplicationId,1f))
      Some(output)
    }
    //word来自于DStream中的key,monitorWrappedMessage来自于DStream中的value,state参数来自于initialState初始化的RDD,当不初始化则来自于  创建的默认空置(即val existingEvents: Seq[MonitorWrappedMessage] = state.getOption().map(_.monitorWrappedMessages.getOrElse(Seq[MonitorWrappedMessage]()))
    //initialState初始化的RDD为prevRDD,当前批次为currentRDD;当没有通过initialState初始化的RDD时,则prevRDD为新创建的 空对象。
    def mappingFunc(word: String, monitorWrappedMessage: Option[MonitorWrappedMessage], state: State[TestModel]):Option[(String,TestModel)] =  {
      val preMA = state.getOption()
      //var ma = new TestModel(Seq[MonitorWrappedMessage]())
      preMA match{
        case Some(testModel) =>{
          //println(monitorWrappedMessage.get.getApplicationId +"==111=="+ testModel.monitorWrappedMessages );
          val  testModelnew = TestModel(monitorWrappedMessage.get +: testModel.monitorWrappedMessages)
          state.update(testModelnew)
          Some((word,testModelnew))
        }
        case _ =>{
          //println( "=222==" + monitorWrappedMessage.get.getApplicationId);
          None;
        }
      }
     /* val existingEvents: Seq[MonitorWrappedMessage] = state.getOption().map(_.monitorWrappedMessages)
          .getOrElse(Seq[MonitorWrappedMessage]())  当没有初始化RDD时则创建默认值*/


     /* val testModel = TestModel(monitorWrappedMessage.get +: existingEvents)
      state.update(testModel)*/
      //Some((word,ma))
    }
    // 当initialState 初始化,第一个批次会从 这个实例好的rdd  对应的   map中根据key(就是word,来自于  上一个DSTream中的key)取值,并执行  mappingFunc 中的业务逻辑;
    // 当没有通过initialState 初始化,在创建时间要添加
    //调用mapWithState进行管理流数据的状态
    val stateDstream = monitorWrappedMessage.mapWithState(StateSpec.function(mappingFunc _).initialState(sc.parallelize(initialRDD)).timeout(Minutes(5))).map(
       ll => {ll match {
         case Some(test) =>{test._1 +"===33333=="+ test._2.monitorWrappedMessages}
         case _ => {"======NODATA======="}
       }
       }
    ).print()
    ssc.start()
    ssc.awaitTermination()
  }


}


推荐阅读
  • 本文探讨了如何通过Service Locator模式来简化和优化在B/S架构中的服务命名访问,特别是对于需要频繁访问的服务,如JNDI和XMLNS。该模式通过缓存机制减少了重复查找的成本,并提供了对多种服务的统一访问接口。 ... [详细]
  • Hadoop的文件操作位于包org.apache.hadoop.fs里面,能够进行新建、删除、修改等操作。比较重要的几个类:(1)Configurati ... [详细]
  • 本文介绍了.hbs文件作为Ember.js项目中的视图层,类似于HTML文件的功能,并详细讲解了如何在Ember.js应用中集成Bootstrap框架及其相关组件的方法。 ... [详细]
  • 问题场景用Java进行web开发过程当中,当遇到很多很多个字段的实体时,最苦恼的莫过于编辑字段的查看和修改界面,发现2个页面存在很多重复信息,能不能写一遍?有没有轮子用都不如自己造。解决方式笔者根据自 ... [详细]
  • 流处理中的计数挑战与解决方案
    本文探讨了在流处理中进行计数的各种技术和挑战,并基于作者在2016年圣何塞举行的Hadoop World大会上的演讲进行了深入分析。文章不仅介绍了传统批处理和Lambda架构的局限性,还详细探讨了流处理架构的优势及其在现代大数据应用中的重要作用。 ... [详细]
  • 在Java开发中,保护代码安全是一个重要的课题。由于Java字节码容易被反编译,因此使用代码混淆工具如ProGuard变得尤为重要。本文将详细介绍如何使用ProGuard进行代码混淆,以及其基本原理和常见问题。 ... [详细]
  • IO流——字符流 BufferedReader / BufferedWriter 进行文件读写
    目录节点流、处理流读文件:BufferedReader的使用写文件:BufferedWriter的使用节点流处理流节点流和处理流的区别和联系字符流Buf ... [详细]
  • mybatis 详解(七)一对一、一对多、多对多
    mybatis详解(七)------一 ... [详细]
  • 浅析python实现布隆过滤器及Redis中的缓存穿透原理_python
    本文带你了解了位图的实现,布隆过滤器的原理及Python中的使用,以及布隆过滤器如何应对Redis中的缓存穿透,相信你对布隆过滤 ... [详细]
  • 从0到1搭建大数据平台
    从0到1搭建大数据平台 ... [详细]
  • Maven + Spring + MyBatis + MySQL 环境搭建与实例解析
    本文详细介绍如何使用MySQL数据库进行环境搭建,包括创建数据库表并插入示例数据。随后,逐步指导如何配置Maven项目,整合Spring框架与MyBatis,实现高效的数据访问。 ... [详细]
  • 二维码的实现与应用
    本文介绍了二维码的基本概念、分类及其优缺点,并详细描述了如何使用Java编程语言结合第三方库(如ZXing和qrcode.jar)来实现二维码的生成与解析。 ... [详细]
  • Python 日志记录模块详解
    日志记录机制是软件开发中不可或缺的一部分,它帮助开发者追踪和调试程序运行时的各种异常。Python 提供了内置的 logging 模块,使我们在代码中记录和管理日志信息变得更加方便。本文将详细介绍如何使用 Python 的 logging 模块。 ... [详细]
  • 电商高并发解决方案详解
    本文以京东为例,详细探讨了电商中常见的高并发解决方案,包括多级缓存和Nginx限流技术,旨在帮助读者更好地理解和应用这些技术。 ... [详细]
  • 整理于2020年10月下旬:总结过去,展望未来Itistoughtodayandtomorrowwillbetougher.butthedayaftertomorrowisbeau ... [详细]
author-avatar
zy一生的最爱_149
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有