热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Flink入门到实战-阶段四(时间和窗口图解)

本文主要介绍关于flink,大数据的知识点,对【Flink入门到实战-阶段四(时间和窗口图解)】和【flink入门及实战第3讲】有兴趣的朋友可以看下由【顶尖高手养成计划】投稿的技术文章,希望该技术和经

本文主要介绍关于flink,大数据的知识点,对【Flink入门到实战-阶段四(时间和窗口图解)】和【flink入门及实战第3讲】有兴趣的朋友可以看下由【顶尖高手养成计划】投稿的技术文章,希望该技术和经验能帮到你解决你所遇的flink相关技术问题。

flink入门及实战第3讲

Flink 中的时间语义

处理时间(Processing Time

处理时间的概念非常简单,就是指执行处理操作的机器的系统时间。

事件时间(Event Time)

事件时间,是指每个事件在对应的设备上发生的时间,也就是数据生成的时间。

另外,除了事件时间和处理时间, Flink 还有一个“摄入时间”( Ingestion Time )的概念, 它是指数据进入 Flink 数据流的时间,也就是 Source 算子读入数据的时间。摄入时间相当于是 事件时间和处理时间的一个中和,它是把 Source 任务的处理时间,当作了数据的产生时间添 加到数据里。这样一来,水位线( watermark )也就基于这个时间直接生成,不需要单独指定 了。这种时间语义可以保证比较好的正确性,同时又不会引入太大的延迟。它的具体行为跟事 件时间非常像,可以当作特殊的事件时间来处理。 水位线

为什么会用到水位线?

主要是解决,分布式数据处理的真确性,如果使用处理时间,那么打比方如果8:59的数据没有在9点的时候到达,那么这个时候8:59的数据就没有办法处理,用到水位线加一定的l乱序延迟那么就可以根据事件的发生时间进行正确的处理

如果出现下游有多个并行子任务的情形,我们 只要将水位线广播出去,就可以通知到所有下游任务当前的时间进度了。 有序流中的水位线(理想情况)

有序流中周期性插入水位线

乱序流中的水位线(实际情况)

乱序流

处理方式,如果后面的数据的时间要比水位线小,那么就是不用改变水位线

但是问题是处理太频繁了,所以这里我们使用的是周期性的处理一批的数据,然后得到最大的水位线用来作为现在的水位线进行往下面进行传播 

上面就会有窗口关闭的问题

 

想解决窗口什么时候关闭,那么就是用到了延迟的机制,就是水位线到2的时候,这个时候我们就是设置为0,就是2的数据到了的时候等2的水位线过来的时候再处理 

下面是一个示例,我们可以使用周期性的方式生成正确的水位线。

      第一个水位线时间戳为 7 ,它表示当前事件时间是 7 秒, 7 秒之前的数据 都已经到齐,之后再也不会有了;同样,第二个、第三个水位线时间戳分别为 12 20 ,表示 11 秒、 20 秒之前的数据都已经到齐,如果有对应的窗口就可以直接关闭了,统计的结果一定 是正确的。这里由于水位线是周期性生成的,所以插入的位置不一定是在时间戳最大的数据后 面。       另外需要注意的是,这里一个窗口所收集的数据,并不是之前所有已经到达的数据。因为 数据属于哪个窗口,是由数据本身的时间戳决定的,一个窗口只会收集真正属于它的那些数据。 也就是说,上图中尽管水位线 W(20) 之前有时间戳为 22 的数据到来, 10~20 秒的窗口中也不 会收集这个数据,进行计算依然可以得到正确的结果。关于窗口的原理,我们会在后面继续展 开讲解。 水位线的特性

      现在我们可以知道,水位线就代表了当前的事件时间时钟,而且可以在数据的时间戳基础
上加一些延迟来保证不丢数据,这一点对于乱序流的正确处理非常重要。
我们可以总结一下水位线的特性:
⚫ 水位线是插入到数据流中的一个标记,可以认为是一个特殊的数据
⚫ 水位线主要的内容是一个时间戳,用来表示当前事件时间的进展

水位线是基于数据的时间戳生成的 水位线的时间戳必须单调递增,以确保任务的事件时间时钟一直向前推进 水位线可以通过设置延迟,来保证正确处理乱序数据 一个水位线 Watermark(t) ,表示在当前流中事件时间已经达到了时间戳 t, 这代表 t 前的所有数据都到齐了,之后流中不会出现时间戳 t’ ≤ t 的数据 水位线是 Flink 流处理中保证结果正确性的核心机制,它往往会跟窗口一起配合,完成对 乱序数据的正确处理。关于这部分内容,我们会稍后进一步展开讲解。 水位线在代码中生成 有序流
stream.assignTimestampsAndWatermarks(
                WatermarkStrategy.
  
   forMonotonousTimestamps() .withTimestampAssigner(new SerializableTimestampAssigner
   
    () { @Override public long extractTimestamp(Event element, long recordTimestamp) { return element.timestamp; } }) );
   
  
乱序流
stream.assignTimestampsAndWatermarks(
                        // 针对乱序流插入水位线,延迟时间设置为 5s
                        WatermarkStrategy.
  
   forBoundedOutOfOrderness(Duration.ofSeconds(5)) .withTimestampAssigner(new SerializableTimestampAssigner
   
    () { // 抽取时间戳的逻辑 @Override public long extractTimestamp(Event element, long recordTimestamp) { return element.timestamp; } }) ).print();
   
  
水位线的传递

自己本身的水位线就是上游分区的最小的水位线对于向下游传递的时候,广播自己的水位线给下游所有的水位线 如果接收到的水位线要比现在的水位线小,那么就不改变自己的水位线 窗口

flink的窗口,根据时间的处理时间放到不同的桶里面,spark没有处理事件乱序过来的能力

窗口的分类 按照驱动类型分类

窗口本身是截取有界数据的一种方式,所以窗口一个非常重要的信息其实就是“怎样截取

数据”。换句话说,就是以什么标准来开始和结束数据的截取,我们把它叫作窗口的“驱动类 型”。 我们最容易想到的就是按照时间段去截取数据,这种窗口就叫作“时间窗口”( Time Window )。这在实际应用中最常见,之前所举的例子也都是时间窗口。除了由时间驱动之外, 窗口其实也可以由数据驱动,也就是说按照固定的个数,来截取一段数据集,这种窗口叫作“计 数窗口”( Count Window

按照窗口分配数据的规则分类

滚动窗口(Tumbling Windows

按时间,或者是信息的数量进行滚动

 滑动窗口(Sliding Windows

下面也是按时间还有信息的数量

 会话窗口(Session Windows

他是根据会话的超时时间来划分,这里就不能用计算的概念了

 全局窗口(Global Windows

要自定义一个触发器才能触发下一个窗口的操作

窗口Api 窗口分配器

时间窗口

滚动处理时间窗口
stream.keyBy(...).window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
                .aggregate(...)
滑动处理时间窗口
stream.keyBy(...).window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
                .aggregate(...)
滚动事件时间窗口
stream.keyBy(...).window(TumblingEventTimeWindows.of(Time.seconds(5))) .aggregate(...)
滑动事件时间窗口
stream.keyBy(...).window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
                .aggregate(...)

计数窗口

滚动计数窗口
stream.keyBy(...)
.countWindow(10)
滑动计数窗口
stream.keyBy(...)
.countWindow(10,3)
窗口函数

流之间的装换

增量聚合函数:就是数据来一条就把结果计算出来,时间到了的时候,直接把结果往后面传递

全量聚合函数:  就是数据一批到时间在处理 

增量聚合函数reduce

pojo

public class Event {
    public String id;

    public String name;

    public Long timeStemp;

    public Event() {
    }

    public Event(String id, String name, Long timeStemp) {
        this.id = id;
        this.name = name;
        this.timeStemp = timeStemp;
    }

    @Override
    public String toString() {
        return "Event{" +
                "id='" + id + '\'' +
                ", name='" + name + '\'' +
                ", timeStemp=" + timeStemp +
                '}';
    }
}

应用程序

public class FlinkApp {
    public static void main(String[] args) throws Exception {
        //得到执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource
  
    initData = env.addSource(new SourceFunction
   
    () { boolean flag = true; String[] names = {"a", "boy", "mary"}; String[] urls = {"/baidu", "xinlang", "google"}; Random random = new Random(); @Override public void run(SourceContext
    
      ctx) throws Exception { while (flag) { Thread.sleep(1000); ctx.collect(new Event( urls[random.nextInt(3)], names[random.nextInt(3)], Calendar.getInstance().getTimeInMillis() )); } } @Override public void cancel() { flag = false; } }); //对于初始的数据设置水位线 SingleOutputStreamOperator
     
       watermarksData = initData.assignTimestampsAndWatermarks( // 针对乱序流插入水位线,延迟时间设置为 10s WatermarkStrategy.
      
       forBoundedOutOfOrderness(Duration.ofSeconds(5)) .withTimestampAssigner(new SerializableTimestampAssigner
       
        () { // 抽取时间戳的逻辑 @Override public long extractTimestamp(Event element, long recordTimestamp) { return element.timeStemp; } }) ); //对于数据进行map操作用于后面的聚合 KeyedStream
        
         , String> keyedStream = watermarksData.map(new MapFunction
         
          >() { @Override public Tuple2
          
            map(Event value) throws Exception { return Tuple2.of(value.name, 1); } }).keyBy(new KeySelector
           
            , String>() { @Override public String getKey(Tuple2
            
              value) throws Exception { return value.f0; } }); //对于分组以后的数据进行开窗处理,滑动窗口的大小为5秒 keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(10))) .reduce(new ReduceFunction
             
              >() { @Override public Tuple2
              
                reduce(Tuple2
               
                 value1, Tuple2
                
                  value2) throws Exception { return Tuple2.of(value1.f0,value1.f1+value2.f1); } }).print(); env.execute(); } }
                
               
              
             
            
           
          
         
        
       
      
     
    
   
  
预聚合函数aggregate
public class FlinkApp {
    public static void main(String[] args) throws Exception {
        //得到执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource
  
    initData = env.addSource(new SourceFunction
   
    () { boolean flag = true; String[] names = {"a", "boy", "mary"}; String[] urls = {"/baidu", "xinlang", "google"}; Random random = new Random(); @Override public void run(SourceContext
    
      ctx) throws Exception { while (flag) { Thread.sleep(1000); ctx.collect(new Event( urls[random.nextInt(3)], names[random.nextInt(3)], Calendar.getInstance().getTimeInMillis() )); } } @Override public void cancel() { flag = false; } }); //对于初始的数据设置水位线 SingleOutputStreamOperator
     
       watermarksData = initData.assignTimestampsAndWatermarks( // 针对乱序流插入水位线,延迟时间设置为 10s WatermarkStrategy.
      
       forBoundedOutOfOrderness(Duration.ofSeconds(5)) .withTimestampAssigner(new SerializableTimestampAssigner
       
        () { // 抽取时间戳的逻辑 @Override public long extractTimestamp(Event element, long recordTimestamp) { return element.timeStemp; } }) ); //对于数据进行map操作用于后面的聚合 KeyedStream
        
         , String> keyedStream = watermarksData.map(new MapFunction
         
          >() { @Override public Tuple2
          
            map(Event value) throws Exception { return Tuple2.of(value.name, 1); } }).keyBy(new KeySelector
           
            , String>() { @Override public String getKey(Tuple2
            
              value) throws Exception { return value.f0; } }); //对于分组以后的数据进行开窗处理,滑动窗口的大小为10秒 keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(10))) //窗口规约聚合,这里也就是窗口里面数据的预聚合的功能 .aggregate(new AggregateFunction
             
              , Tuple2
              
               , Tuple2
               
                >() { //初始化中间的状态 @Override public Tuple2
                
                  createAccumulator() { return Tuple2.of("init",0); } //窗口里面每来一个数据就调用一次,第一个数据是新数据,第二个参数就是累加器 @Override public Tuple2
                 
                   add(Tuple2
                  
                    value, Tuple2
                   
                     accumulator) { return Tuple2.of(value.f0,accumulator.f1+value.f1); } //窗口触发的时候调用 @Override public Tuple2
                    
                      getResult(Tuple2
                     
                       accumulator) { return accumulator; } //会话窗口的时候会用到合并的功能,就是两个累加器合并,如果不是会话窗口就不用实现 @Override public Tuple2
                      
                        merge(Tuple2
                       
                         a, Tuple2
                        
                          b) { return Tuple2.of(a.f0,a.f1+b.f1); } }).print(); env.execute(); } }
                        
                       
                      
                     
                    
                   
                  
                 
                
               
              
             
            
           
          
         
        
       
      
     
    
   
  
全窗口函数process

就是窗口触发以后,然后所有的数据过来一块处理,上面的预聚合就是窗口里面的数据来一条处理一条,窗口触发的时候就输出结果

public class FlinkApp {
    public static void main(String[] args) throws Exception {
        //得到执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource
  
    initData = env.addSource(new SourceFunction
   
    () { boolean flag = true; String[] names = {"a", "boy", "mary"}; String[] urls = {"/baidu", "xinlang", "google"}; Random random = new Random(); @Override public void run(SourceContext
    
      ctx) throws Exception { while (flag) { Thread.sleep(1000); ctx.collect(new Event( urls[random.nextInt(3)], names[random.nextInt(3)], Calendar.getInstance().getTimeInMillis() )); } } @Override public void cancel() { flag = false; } }); //对于初始的数据设置水位线 SingleOutputStreamOperator
     
       watermarksData = initData.assignTimestampsAndWatermarks( // 针对乱序流插入水位线,延迟时间设置为 10s WatermarkStrategy.
      
       forBoundedOutOfOrderness(Duration.ofSeconds(5)) .withTimestampAssigner(new SerializableTimestampAssigner
       
        () { // 抽取时间戳的逻辑 @Override public long extractTimestamp(Event element, long recordTimestamp) { return element.timeStemp; } }) ); //对于数据进行map操作用于后面的聚合 KeyedStream
        
         , String> keyedStream = watermarksData.map(new MapFunction
         
          >() { @Override public Tuple2
          
            map(Event value) throws Exception { return Tuple2.of(value.name, 1); } }).keyBy(new KeySelector
           
            , String>() { @Override public String getKey(Tuple2
            
              value) throws Exception { return value.f0; } }); //对于分组以后的数据进行开窗处理,滑动窗口的大小为10秒 keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(10))) // IN, OUT, KEY, W .process(new ProcessWindowFunction
             
              , String, String, TimeWindow>() { //第一个参数是key第二个参数是窗口触发以后过来的所有的数据,第三个参数是上下文对象 //第四个参数是传递给下游的收集器 @Override public void process(String s, Context context, Iterable
              
               > elements, Collector
               
                 out) throws Exception { Integer sum=0; //对于窗口里面过来的数据进行累加 for (Tuple2
                
                  element : elements) { sum += element.f1; } //得到窗口的开始时间和结束的时间 long start = context.window().getStart(); long end = context.window().getEnd(); String res="key: "+s+" start: "+start+" end: "+end+" "+"sum: "+sum; out.collect(res); } }).print(); env.execute(); } }
                
               
              
             
            
           
          
         
        
       
      
     
    
   
  

输出的结果

11> key: a start: 1657974850000 end: 1657974860000 sum: 4
14> key: boy start: 1657974850000 end: 1657974860000 sum: 1
11> key: mary start: 1657974850000 end: 1657974860000 sum: 2
aggregate和process结合使用
public class FlinkApp {
    public static void main(String[] args) throws Exception {
        //得到执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource
  
    initData = env.addSource(new SourceFunction
   
    () { boolean flag = true; String[] names = {"a", "boy", "mary"}; String[] urls = {"/baidu", "xinlang", "google"}; Random random = new Random(); @Override public void run(SourceContext
    
      ctx) throws Exception { while (flag) { Thread.sleep(1000); ctx.collect(new Event( urls[random.nextInt(3)], names[random.nextInt(3)], Calendar.getInstance().getTimeInMillis() )); } } @Override public void cancel() { flag = false; } }); //对于初始的数据设置水位线 SingleOutputStreamOperator
     
       watermarksData = initData.assignTimestampsAndWatermarks( // 针对乱序流插入水位线,延迟时间设置为 10s WatermarkStrategy.
      
       forBoundedOutOfOrderness(Duration.ofSeconds(5)) .withTimestampAssigner(new SerializableTimestampAssigner
       
        () { // 抽取时间戳的逻辑 @Override public long extractTimestamp(Event element, long recordTimestamp) { return element.timeStemp; } }) ); //对于数据进行map操作用于后面的聚合 KeyedStream
        
         , String> keyedStream = watermarksData.map(new MapFunction
         
          >() { @Override public Tuple2
          
            map(Event value) throws Exception { return Tuple2.of(value.name, 1); } }).keyBy(new KeySelector
           
            , String>() { @Override public String getKey(Tuple2
            
              value) throws Exception { return value.f0; } }); //对于分组以后的数据进行开窗处理,滑动窗口的大小为10秒 keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(10))) .aggregate(new AggregateFunction
             
              , Integer, Integer>() { @Override public Integer createAccumulator() { return 0; } @Override public Integer add(Tuple2
              
                value, Integer accumulator) { return accumulator + value.f1; } @Override public Integer getResult(Integer accumulator) { return accumulator; } @Override public Integer merge(Integer a, Integer b) { return a + b; } //第一个参数就是前面aggregate的增量聚合函数过来的值 //第二个参数就是返回的值 //第三个参数就是key //第四个参数默认TimeWindow }, new ProcessWindowFunction
               
                () { @Override public void process(String s, ProcessWindowFunction
                
                 .Context context, Iterable
                 
                   elements, Collector
                  
                    out) throws Exception { //因为窗口触发以后就是在aggregate的作用下,过来的数据就是一个那么我们直接next Integer next = elements.iterator().next(); //得到窗口的开始时间和结束的时间 long start = context.window().getStart(); long end = context.window().getEnd(); String res="key: "+s+" start: "+start+" end: "+end+" "+"sum: "+next; out.collect(res); } }).print(); env.execute(); } }
                  
                 
                
               
              
             
            
           
          
         
        
       
      
     
    
   
  
处理迟到数据(加深窗口和水位线的认识)

应用测试

public class FlinkApp {
    public static void main(String[] args) throws Exception {
        //得到执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource
  
    socketTextStream = env.socketTextStream("master", 9999); //并行度设置为1才能看到效果,因为如果不为1,那么有些分区的水位线就是负无穷 //由于自己的水位线是分区里面最小的水位线,那么自己的一直都是负无穷 //就触发不了水位线的上升 env.setParallelism(1); //第一个参数就一个名字,第二个参数用来表示事件时间 SingleOutputStreamOperator
   
    > initData = socketTextStream.map(new MapFunction
    
     >() { @Override public Tuple2
     
       map(String value) throws Exception { String[] s = value.split(" "); //假设我们在控制台输入的参数是a 15s,那么我们要15*1000才能得到时间戳的毫秒时间 return Tuple2.of(s[0], Long.parseLong(s[1]) * 1000L); } }); //设置水位线 SingleOutputStreamOperator
      
       > watermarks = initData.assignTimestampsAndWatermarks( // 针对乱序流插入水位线,延迟时间设置为 2s WatermarkStrategy.
       
        >forBoundedOutOfOrderness(Duration.ofSeconds(2)) .withTimestampAssigner(new SerializableTimestampAssigner
        
         >() { @Override public long extractTimestamp(Tuple2
         
           element, long recordTimestamp) { //指定事件时间 return element.f1; } }) ); //定义一个侧输出流的标识 OutputTag
          
           > outputTag = new OutputTag
           
            >("late") { }; SingleOutputStreamOperator
            
             > result = watermarks.keyBy(data -> data.f0) //窗口的大小为10s,注意这里是事件时间 .window(TumblingEventTimeWindows.of(Time.seconds(10))) //定义窗口关闭延迟,就是允许最大的迟到数据,由于上面设置的最大延迟为2s,在加上这个2s那么就是 //允许最大的迟到数据为4秒 .allowedLateness(Time.seconds(2)) //使用定义的标识 .sideOutputLateData(outputTag) .reduce(new ReduceFunction
             
              >() { @Override public Tuple2
              
                reduce(Tuple2
               
                 value1, Tuple2
                
                  value2) throws Exception { return Tuple2.of(value1.f0, value2.f1 + value1.f1); } }); // .aggregate();下面就可以定义处理函数进行处理 result.print("result"); //得到侧输出流的数据 DataStream
                 
                  > sideOutput = result.getSideOutput(outputTag); sideOutput.print("late"); env.execute(); } } 
                 
                
               
              
             
            
           
          
         
        
       
      
     
    
   
  

在linux里面使用nc

nc -lk 9999

然后输入

a 1
a 15
a 2

得到的结果

result> (a,1000)
late> (a,2000)

原理图

 

本文《Flink入门到实战-阶段四(时间和窗口图解)》版权归顶尖高手养成计划所有,引用Flink入门到实战-阶段四(时间和窗口图解)需遵循CC 4.0 BY-SA版权协议。


推荐阅读
  • SpringBoot uri统一权限管理的实现方法及步骤详解
    本文详细介绍了SpringBoot中实现uri统一权限管理的方法,包括表结构定义、自动统计URI并自动删除脏数据、程序启动加载等步骤。通过该方法可以提高系统的安全性,实现对系统任意接口的权限拦截验证。 ... [详细]
  • 本文介绍了Redis的基础数据结构string的应用场景,并以面试的形式进行问答讲解,帮助读者更好地理解和应用Redis。同时,描述了一位面试者的心理状态和面试官的行为。 ... [详细]
  • 本文介绍了如何在给定的有序字符序列中插入新字符,并保持序列的有序性。通过示例代码演示了插入过程,以及插入后的字符序列。 ... [详细]
  • Spring特性实现接口多类的动态调用详解
    本文详细介绍了如何使用Spring特性实现接口多类的动态调用。通过对Spring IoC容器的基础类BeanFactory和ApplicationContext的介绍,以及getBeansOfType方法的应用,解决了在实际工作中遇到的接口及多个实现类的问题。同时,文章还提到了SPI使用的不便之处,并介绍了借助ApplicationContext实现需求的方法。阅读本文,你将了解到Spring特性的实现原理和实际应用方式。 ... [详细]
  • C# 7.0 新特性:基于Tuple的“多”返回值方法
    本文介绍了C# 7.0中基于Tuple的“多”返回值方法的使用。通过对C# 6.0及更早版本的做法进行回顾,提出了问题:如何使一个方法可返回多个返回值。然后详细介绍了C# 7.0中使用Tuple的写法,并给出了示例代码。最后,总结了该新特性的优点。 ... [详细]
  • 个人学习使用:谨慎参考1Client类importcom.thoughtworks.gauge.Step;importcom.thoughtworks.gauge.T ... [详细]
  • 开发笔记:Java是如何读取和写入浏览器Cookies的
    篇首语:本文由编程笔记#小编为大家整理,主要介绍了Java是如何读取和写入浏览器Cookies的相关的知识,希望对你有一定的参考价值。首先我 ... [详细]
  • 本文介绍了iOS数据库Sqlite的SQL语句分类和常见约束关键字。SQL语句分为DDL、DML和DQL三种类型,其中DDL语句用于定义、删除和修改数据表,关键字包括create、drop和alter。常见约束关键字包括if not exists、if exists、primary key、autoincrement、not null和default。此外,还介绍了常见的数据库数据类型,包括integer、text和real。 ... [详细]
  • 本文介绍了在MFC下利用C++和MFC的特性动态创建窗口的方法,包括继承现有的MFC类并加以改造、插入工具栏和状态栏对象的声明等。同时还提到了窗口销毁的处理方法。本文详细介绍了实现方法并给出了相关注意事项。 ... [详细]
  • 使用eclipse创建一个Java项目的步骤
    本文介绍了使用eclipse创建一个Java项目的步骤,包括启动eclipse、选择New Project命令、在对话框中输入项目名称等。同时还介绍了Java Settings对话框中的一些选项,以及如何修改Java程序的输出目录。 ... [详细]
  • 什么是大数据lambda架构
    一、什么是Lambda架构Lambda架构由Storm的作者[NathanMarz]提出,根据维基百科的定义,Lambda架构的设计是为了在处理大规模数 ... [详细]
  • 本文分享了一个关于在C#中使用异步代码的问题,作者在控制台中运行时代码正常工作,但在Windows窗体中却无法正常工作。作者尝试搜索局域网上的主机,但在窗体中计数器没有减少。文章提供了相关的代码和解决思路。 ... [详细]
  • 本文介绍了OpenStack的逻辑概念以及其构成简介,包括了软件开源项目、基础设施资源管理平台、三大核心组件等内容。同时还介绍了Horizon(UI模块)等相关信息。 ... [详细]
  • 本文介绍了使用Spark实现低配版高斯朴素贝叶斯模型的原因和原理。随着数据量的增大,单机上运行高斯朴素贝叶斯模型会变得很慢,因此考虑使用Spark来加速运行。然而,Spark的MLlib并没有实现高斯朴素贝叶斯模型,因此需要自己动手实现。文章还介绍了朴素贝叶斯的原理和公式,并对具有多个特征和类别的模型进行了讨论。最后,作者总结了实现低配版高斯朴素贝叶斯模型的步骤。 ... [详细]
  • 本文介绍了利用ARMA模型对平稳非白噪声序列进行建模的步骤及代码实现。首先对观察值序列进行样本自相关系数和样本偏自相关系数的计算,然后根据这些系数的性质选择适当的ARMA模型进行拟合,并估计模型中的位置参数。接着进行模型的有效性检验,如果不通过则重新选择模型再拟合,如果通过则进行模型优化。最后利用拟合模型预测序列的未来走势。文章还介绍了绘制时序图、平稳性检验、白噪声检验、确定ARMA阶数和预测未来走势的代码实现。 ... [详细]
author-avatar
人走茶凉
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有