热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Spark学习笔记(26)在DStream的Action操作之外也可能产生Job操作

本期内容:1.SparkStreaming产生Job的机制2.SparkStreaming的其它产生Job的方式
本期内容:
1. Spark Streaming产生Job的机制
2. Spark Streaming的其它产生Job的方式 

1. Spark Streaming产生Job的机制

Scala程序中,函数可以作为参数传递,因为函数也是对象。有函数对象不意味着函数马上就运行。Spark Streaming中,常利用线程的run来调用函数,从而导致函数的最终运行。
Spark Streaming中,Job对象包含函数成员。

NetworkWordCount程序中,DStream.print导致了Job的产生。
DStream.print:

  def print(num: Int): Unit = ssc.withScope {
    def foreachFunc: (RDD[T], Time) => Unit = {
      (rdd: RDD[T], time: Time) => {
        val firstNum =  rdd.take (num + 1)
        // scalastyle:off println
        println("-------------------------------------------")
        println("Time: " + time)
        println("-------------------------------------------")
        firstNum.take(num).foreach(println)
        if (firstNum.length > num) println("...")
        println()
        // scalastyle:on println
      }
    }
     foreachRDD (context.sparkContext.clean(foreachFunc), displayInnerRDDOps = false)
  }

Spark Streaming应用程序中,除了print,saveAsObjectFiles、saveAsTextFiles 等也能调用foreachRDD,生成ForEachDStream,才能在后面产生Job。
DStream.foreachRDD:

 private def foreachRDD(
                          foreachFunc: (RDD[T], Time) => Unit,
                          displayInnerRDDOps: Boolean): Unit = {
     new ForEachDStream (this,
      context.sparkContext.clean( foreachFunc , false), displayInnerRDDOps). register ()
  }

通过register注册,新生成的ForEachDStream加入到DStreamGraph的成员outputDStreams中。
如果没有print、count、saveAsObjectFiles、saveAsTextFiles等这样的代码,DStreamGraph中outputDStreams就为空,那么DStreamGraph.generateJobs就产生结果呢?
DStreamGraph.generateJobs:

  def generateJobs(time: Time): Seq[Job] = {
    logDebug("Generating jobs for time " + time)
    val jobs = this.synchronized {
       outputStreams.flatMap  { outputStream =>
        val jobOption = outputStream.generateJob(time)
        jobOption.foreach(_.setCallSite(outputStream.creationSite))
        jobOption
      }
    }
    logDebug("Generated " + jobs.length + " jobs for time " + time)
    jobs
  }

DStreamGraph.generateJobs就会产生空的Job序列。

通过对DStream(或其子类)定制自己的方法,可以使foreachFunc的定义中不含有RDD.take这样的语句。
这样的话,foreachRDD中的foreachFunc不一定会产生Job。如果其中的函数foreachFunc里面没有Action操作,就不会触发Job。

2. Spark Streaming的其它产生Job的方式 

一定要action才会有Job吗?不是。ForEachDStream.transform就可能产生Job。ForEachDStream.transform有两个定义,是调用关系。

ForEachDStream.transform:

  /**
   * Return a new DStream in which each RDD is generated by applying a function
   * on each RDD of 'this' DStream.
   */
  def transform[U: ClassTag]( transformFunc : RDD[T] => RDD[U]): DStream[U] = ssc.withScope {
    // because the DStream is reachable from the outer object here, and because
    // DStreams can't be serialized with closures, we can't proactively check
    // it for serializability and so we pass the optional false to SparkContext.clean
    val cleanedF = context.sparkContext.clean( transformFunc , false)
    transform((r: RDD[T], t: Time) => cleanedF(r))
  }

  /**
   * Return a new DStream in which each RDD is generated by applying a function
   * on each RDD of 'this' DStream.
   */
  def transform[U: ClassTag](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U] = ssc.withScope {
    // because the DStream is reachable from the outer object here, and because
    // DStreams can't be serialized with closures, we can't proactively check
    // it for serializability and so we pass the optional false to SparkContext.clean
    val cleanedF = context.sparkContext.clean(transformFunc, false)
    val realTransformFunc = (rdds: Seq[RDD[_]], time: Time) => {
      assert(rdds.length == 1)
      cleanedF(rdds.head.asInstanceOf[RDD[T]], time)
    }
     new TransformedDStream [U](Seq(this), realTransformFunc)
  }

其中的函数类型的参数transformFunc是输入RDD并产生一个新的RDD。最终实际会生成TransformedDStream对象。

在第8课中提到过,一般的DStream子类的Compute方法,仅仅是调用父类DStream的getOrCompute,而TransformedDStream的compte方法不是这样。
TransformedDStream.compute:

  override def compute(validTime: Time): Option[RDD[U]] = {
    val parentRDDs = parents.map { parent => parent.getOrCompute(validTime).getOrElse(
      // Guard out against parent DStream that return None instead of Some(rdd) to avoid NPE
      throw new SparkException(s"Couldn't generate RDD from parent at time $validTime"))
    }
    val transformedRDD =  transformFunc (parentRDDs, validTime)
    if (transformedRDD == null) {
      throw new SparkException("Transform function must not return null. " +
        "Return SparkContext.emptyRDD() instead to represent no element " +
        "as the result of transformation.")
    }
    Some(transformedRDD)
  }

和别的DStream子类不同,TransformedDStream的compute方法还调用了transformFunc,函数transformFunc是被马上执行的。这就不会等到JobScheduler调度后再执行。
transformFunc 其中如果有count、print等action操作,就也会触发这个Job的执行。这其实可以理解为是个漏洞。
此前说的各种操作是lazy级别,不能马上拿到结果。而由于transformFunc不接受Spark的统一调度,这样可以根据计算结果做出判断再后续操作。不会因为lazy级别而不能必须做后续的transform。

推荐阅读
  • GetWindowLong函数
    今天在看一个代码里头写了GetWindowLong(hwnd,0),我当时就有点费解,靠,上网搜索函数原型说明,死活找不到第 ... [详细]
  • 向QTextEdit拖放文件的方法及实现步骤
    本文介绍了在使用QTextEdit时如何实现拖放文件的功能,包括相关的方法和实现步骤。通过重写dragEnterEvent和dropEvent函数,并结合QMimeData和QUrl等类,可以轻松实现向QTextEdit拖放文件的功能。详细的代码实现和说明可以参考本文提供的示例代码。 ... [详细]
  • Java序列化对象传给PHP的方法及原理解析
    本文介绍了Java序列化对象传给PHP的方法及原理,包括Java对象传递的方式、序列化的方式、PHP中的序列化用法介绍、Java是否能反序列化PHP的数据、Java序列化的原理以及解决Java序列化中的问题。同时还解释了序列化的概念和作用,以及代码执行序列化所需要的权限。最后指出,序列化会将对象实例的所有字段都进行序列化,使得数据能够被表示为实例的序列化数据,但只有能够解释该格式的代码才能够确定数据的内容。 ... [详细]
  • Spring特性实现接口多类的动态调用详解
    本文详细介绍了如何使用Spring特性实现接口多类的动态调用。通过对Spring IoC容器的基础类BeanFactory和ApplicationContext的介绍,以及getBeansOfType方法的应用,解决了在实际工作中遇到的接口及多个实现类的问题。同时,文章还提到了SPI使用的不便之处,并介绍了借助ApplicationContext实现需求的方法。阅读本文,你将了解到Spring特性的实现原理和实际应用方式。 ... [详细]
  • 标题: ... [详细]
  • Html5-Canvas实现简易的抽奖转盘效果
    本文介绍了如何使用Html5和Canvas标签来实现简易的抽奖转盘效果,同时使用了jQueryRotate.js旋转插件。文章中给出了主要的html和css代码,并展示了实现的基本效果。 ... [详细]
  • 怎么在PHP项目中实现一个HTTP断点续传功能发布时间:2021-01-1916:26:06来源:亿速云阅读:96作者:Le ... [详细]
  • 本文介绍了在处理不规则数据时如何使用Python自动提取文本中的时间日期,包括使用dateutil.parser模块统一日期字符串格式和使用datefinder模块提取日期。同时,还介绍了一段使用正则表达式的代码,可以支持中文日期和一些特殊的时间识别,例如'2012年12月12日'、'3小时前'、'在2012/12/13哈哈'等。 ... [详细]
  • 本文介绍了在iOS开发中使用UITextField实现字符限制的方法,包括利用代理方法和使用BNTextField-Limit库的实现策略。通过这些方法,开发者可以方便地限制UITextField的字符个数和输入规则。 ... [详细]
  • 本文介绍了在MFC下利用C++和MFC的特性动态创建窗口的方法,包括继承现有的MFC类并加以改造、插入工具栏和状态栏对象的声明等。同时还提到了窗口销毁的处理方法。本文详细介绍了实现方法并给出了相关注意事项。 ... [详细]
  • 本文介绍了一种在PHP中对二维数组根据某个字段进行排序的方法,以年龄字段为例,按照倒序的方式进行排序,并给出了具体的代码实现。 ... [详细]
  • Todayatworksomeonetriedtoconvincemethat:今天在工作中有人试图说服我:{$obj->getTableInfo()}isfine ... [详细]
  • php缓存ri,浅析ThinkPHP缓存之快速缓存(F方法)和动态缓存(S方法)(日常整理)
    thinkPHP的F方法只能用于缓存简单数据类型,不支持有效期和缓存对象。S()缓存方法支持有效期,又称动态缓存方法。本文是小编日常整理有关thinkp ... [详细]
  • Oracle seg,V$TEMPSEG_USAGE与Oracle排序的关系及使用方法
    本文介绍了Oracle seg,V$TEMPSEG_USAGE与Oracle排序之间的关系,V$TEMPSEG_USAGE是V_$SORT_USAGE的同义词,通过查询dba_objects和dba_synonyms视图可以了解到它们的详细信息。同时,还探讨了V$TEMPSEG_USAGE的使用方法。 ... [详细]
  • 集合的遍历方式及其局限性
    本文介绍了Java中集合的遍历方式,重点介绍了for-each语句的用法和优势。同时指出了for-each语句无法引用数组或集合的索引的局限性。通过示例代码展示了for-each语句的使用方法,并提供了改写为for语句版本的方法。 ... [详细]
author-avatar
扈菁雄佳纯
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有