热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

SparkStreaming之一:StreamingContext解析

1.0前言目前SparkStreaming编程指南地址:http:spark.apache.orgdocslateststreaming-programming-guide.html1.
1.0 前言

目前Spark Streaming编程指南地址:

http://spark.apache.org/docs/latest/streaming-programming-guide.html


1.1 创建StreamingContext对象

1.1.1通过SparkContext创建

源码如下:

def this(sparkContext:SparkContext, batchDuration: Duration) = {

   this(sparkContext,null,batchDuration)

  }

第一参数为sparkContext对象,第二个参数为批次时间;

创建实例:

val ssc = new StreamingContext(sc, Seconds(5))

 

1.1.2通过SparkConf创建

源码如下:

 defthis(conf:SparkConf, batchDuration: Duration) = {

   this(StreamingContext.createNewSparkContext(conf),null,batchDuration)

  }

第一参数为SparkConf对象,第二个参数为批次时间;

创建实例:

   valconf =newSparkConf().setAppName("StreamTest")

val ssc = newStreamingContext(conf,Seconds(5))

 

1.1.3通过SparkConf参数创建

源码如下:

 defthis(

     master: String,

     appName: String,

     batchDuration: Duration,

     sparkHome: String = null,

     jars: Seq[String] = Nil,

     environment: Map[String, String] = Map()) = {

   this(StreamingContext.createNewSparkContext(master,appName, sparkHome, jars, environment),

         null, batchDuration)

  }

第一参数为需要创建SparkConf对象的详细参数,master-spark地址,appName-对象名称,sparkHome- sparkHome环境变量,jars, environment,第二个参数为批次时间;

创建实例:

val ssc = newStreamingContext(“ spark://host:port”, "StreamTest", Seconds(5),      System.getenv("SPARK_HOME"),StreamingContext.jarOfClass(this.getClass))

 

1.1.4通过checkpointfile参数创建

源码如下:

 defthis(path:String, hadoopConf: Configuration) =

    this(null, CheckpointReader.read(path,new SparkConf(), hadoopConf).get,null)

第一参数为checkpoint file的路径,第二个参数为haoop的配置

源码如下:

def this(path:String) = this(path,newConfiguration)

第一参数为checkpoint file的路径

 

1.2创建Dstream监听对象

1.2.1 fileStream

源码如下:

/**

  * Create a input stream that monitors a Hadoop-compatible filesystem

  * for new files and reads them using the given key-value types and inputformat.

  * Files must be written to the monitored directory by "moving"them from another

  * location within the same file system. File names starting with . areignored.

  * @param directory HDFS directory to monitor for new file

  * @tparam K Key type for reading HDFS file

  * @tparam V Value type for reading HDFS file

  * @tparam F Input format for reading HDFS file

  */

 deffileStream[

   K: ClassTag,

   V: ClassTag,

   F <: NewInputFormat[K, V]: ClassTag

 ] (directory: String): InputDStream[(K, V)] = {

   newFileInputDStream[K, V, F](this, directory)

 }

 

 /**

  * Create a input stream that monitors a Hadoop-compatible filesystem

  * for new files and reads them using the given key-value types and inputformat.

  * Files must be written to the monitored directory by "moving"them from another

  * location within the same file system.

  * @param directory HDFS directory to monitor for new file

  * @param filter Function to filter paths to process

  * @param newFilesOnly Should process only new files and ignoreexisting files in the directory

  * @tparam K Key type for reading HDFS file

  * @tparam V Value type for reading HDFS file

  * @tparam F Input format for reading HDFS file

  */

 deffileStream[

   K: ClassTag,

   V: ClassTag,

   F <: NewInputFormat[K, V]: ClassTag

 ] (directory: String, filter: Path => Boolean, newFilesOnly:Boolean): InputDStream[(K, V)] = {

   newFileInputDStream[K, V, F](this, directory, filter, newFilesOnly)

 }

参数:K-读入HDFS的Key的类型,V-读入HDFS的Value的类型,F-读入HDFS     的类型;directory-监听HDFS的路径,filter-对监听HDFS的文件进行过滤的函数,newFilesOnly-是否只监听新增文件;

fileStream可以通过设置filter函数,对监听目录下的文件进行过滤,只对满足条件的文件进行监听和处理;

默认过滤方法:

 defdefaultFilter(path: Path): Boolean = !path.getName().startsWith(".")

该方法是过滤以隐藏文件。

fileStream可以通过设置newFilesOnly为TRUE或者FALES,是否处理监听目录下已存在的文件,默认是不处理已存在文件,只处理新增加文件,如果设置为FALES,可以处理前一个窗口时间内的老文件。

源码如下:

 privatevalinitialModTimeIgnoreThreshold =if (newFilesOnly) System.currentTimeMillis()else0L

 

val modTimeIgnoreThreshold = math.max(

        initialModTimeIgnoreThreshold,   // initialthreshold based on newFilesOnly setting

        currentTime -durationToRemember.milliseconds  // trailing end of the remember window

      )

modTimeIgnoreThreshold是时间窗口过滤条件,通过newFilesOnly值来取的是当前时间或者前一个窗口时间。

创建实例:

// 创建新过滤函数

   defmyFilter(path:Path): Boolean = path.getName().contains("data")

// 创建fileStream

val data1 = ssc.fileStream[LongWritable,Text, TextInputFormat](Spath1, pa => myFilter(pa),false).map(_._2.toString)

 

1.2.2 textFileStream

源码如下:

/**

  * Create a input stream that monitors a Hadoop-compatible filesystem

  * for new files and reads them as text files (using key as LongWritable,value

  * as Text and input format as TextInputFormat). Files must be written tothe

  * monitored directory by "moving" them from another locationwithin the same

  * file system. File names starting with . are ignored.

  * @param directory HDFS directory to monitor for new file

  */

 deftextFileStream(directory: String): DStream[String] = {

   fileStream[LongWritable, Text,TextInputFormat](directory).map(_._2.toString)

 }

参数:directory监听的目录;

其实textFileStream是fileStream的一个实例。

创建实例:

   valStreamFile1=ssc.textFileStream(Spath1)

 

1.2.3 socketTextStream

源码如下:

 /**

  * Create a input stream from TCP source hostname:port. Data is receivedusing

  * a TCP socket and the receive bytes is interpreted as UTF8 encoded `\n`delimited

  * lines.

  * @param hostname     Hostname to connect to for receiving data

  * @param port          Portto connect to for receiving data

  * @param storageLevel Storage level to use for storing the received objects

  *                      (default:StorageLevel.MEMORY_AND_DISK_SER_2)

  */

 defsocketTextStream(

     hostname: String,

     port: Int,

     storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2

   ): ReceiverInputDStream[String] = {

   socketStream[String](hostname, port, SocketReceiver.bytesToLines,storageLevel)

  }

参数:hostname是主机IP,port是端口号,storageLevel数据的存储级别,默认2份MEMORY_AND_DISK;

创建实例:

val lines = ssc.socketTextStream(serverIP, serverPort);

 

1.2.4 rawSocketStream

源码如下:

 /**

  * Create a input stream from network source hostname:port, where data isreceived

  * as serialized blocks (serialized using the Spark's serializer) thatcan be directly

  * pushed into the block manager without deserializing them. This is themost efficient

  * way to receive data.

  * @param hostname     Hostname to connect to for receiving data

  * @param port          Portto connect to for receiving data

  * @param storageLevel Storage level to use for storing the received objects

  *                      (default:StorageLevel.MEMORY_AND_DISK_SER_2)

  * @tparam T            Typeof the objects in the received blocks

  */

 defrawSocketStream[T: ClassTag](

     hostname: String,

     port: Int,

     storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2

   ): ReceiverInputDStream[T] = {

   newRawInputDStream[T](this, hostname, port, storageLevel)

  }

rawSocketStream类似于socketTextStream;参照socketTextStream。

 

1.2.5 networkStream

源码如下:

 /**

  * Create an input stream with any arbitrary user implemented receiver.

  * Find more details at:http://spark.apache.org/docs/latest/streaming-custom-receivers.html

  * @param receiver Custom implementation of Receiver

  */

 @deprecated("Use receiverStream","1.0.0")

 defnetworkStream[T: ClassTag](

   receiver: Receiver[T]): ReceiverInputDStream[T] = {

   receiverStream(receiver)

  }

创建实例:

参照:http://spark.apache.org/docs/latest/streaming-custom-receivers.html

 

1.2.6 receiverStream

源码如下:

 /**

  * Create an input stream with any arbitrary user implemented receiver.

  * Find more details at:http://spark.apache.org/docs/latest/streaming-custom-receivers.html

  * @param receiver Custom implementation of Receiver

  */

 defreceiverStream[T: ClassTag](

   receiver: Receiver[T]): ReceiverInputDStream[T] = {

   newPluggableInputDStream[T](this, receiver)

  }

创建实例:

    val StreamFile1 = ssc.receiverStream (newCustomReceiver(host, port))

参照:http://spark.apache.org/docs/latest/streaming-custom-receivers.html

 

1.2.7 actorStream

源码如下:

 /**

  * Create an input stream with any arbitrary user implemented actorreceiver.

  * Find more details at:http://spark.apache.org/docs/latest/streaming-custom-receivers.html

  * @param props Props object defining creation of the actor

  * @param name Name of the actor

  * @param storageLevel RDD storage level (default:StorageLevel.MEMORY_AND_DISK_SER_2)

  *

  * @note An important point to note:

  *       Since Actor may exist outsidethe spark framework, It is thus user's responsibility

  *       to ensure the type safety,i.e parametrized type of data received and actorStream

  *       should be same.

  */

 defactorStream[T: ClassTag](

     props: Props,

     name: String,

     storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2,

     supervisorStrategy: SupervisorStrategy =ActorSupervisorStrategy.defaultStrategy

   ): ReceiverInputDStream[T] = {

   receiverStream(new ActorReceiver[T](props, name, storageLevel, supervisorStrategy))

  }

创建实例:

val StreamFile1 = ssc.actorStream[String](Props(newCustomActor()),"CustomReceiver")

参照:http://spark.apache.org/docs/latest/streaming-custom-receivers.html

 

1.2.8 queueStream

源码如下:

/**

  * Create an input stream from a queue of RDDs. In each batch,

  * it will process either one or all of the RDDs returned by the queue.

  * @param queue      Queueof RDDs

  * @param oneAtATime Whether only one RDD should be consumed fromthe queue in every interval

  * @tparam T         Type ofobjects in the RDD

  */

 defqueueStream[T: ClassTag](

     queue: Queue[RDD[T]],

     oneAtATime: Boolean = true

   ): InputDStream[T] = {

   queueStream(queue, oneAtATime, sc.makeRDD(Seq[T](), 1))

 }

 

 /**

  * Create an input stream from a queue of RDDs. In each batch,

  * it will process either one or all of the RDDs returned by the queue.

  * @param queue      Queueof RDDs

  * @param oneAtATime Whether only one RDD should be consumed fromthe queue in every interval

  * @param defaultRDD Default RDD is returned by the DStream whenthe queue is empty.

  *                   Set as null ifno RDD should be returned when empty

  * @tparam T         Type ofobjects in the RDD

  */

 defqueueStream[T: ClassTag](

     queue: Queue[RDD[T]],

     oneAtATime: Boolean,

     defaultRDD: RDD[T]

   ): InputDStream[T] = {

   newQueueInputDStream(this, queue, oneAtATime, defaultRDD)

 }

 

1.2.9 union DStream

源码如下:

 /**

  * Create a unified DStream from multiple DStreams of the same type andsame slide duration.

  */

 defunion[T: ClassTag](streams: Seq[DStream[T]]): DStream[T] = {

   newUnionDStream[T](streams.toArray)

 }

对同一类型的DStream进行合并,生成一个新的DStream,其中要求DStream的数据格式一致,批次时间间隔一致。

 

1.2.10 transform DStream

源码如下:

 /**

  * Create a new DStream in which each RDD is generated by applying afunction on RDDs of

  * the DStreams.

  */

 deftransform[T: ClassTag](

     dstreams: Seq[DStream[_]],

     transformFunc: (Seq[RDD[_]], Time) => RDD[T]

   ): DStream[T] = {

   newTransformedDStream[T](dstreams, sparkContext.clean(transformFunc))

  }

对Dstream进行transform操作生成一个新的Dstream。

 

1.3 Checkpointing

状态的操作是基于多个批次的数据的。它包括基于window的操作和updateStateByKey。因为状态的操作要依赖于上一个批次的数据,所以它要根据时间,不断累积元数据。为了清空数据,它支持周期性的检查点,通过把中间结果保存到hdfs上。因为检查操作会导致保存到hdfs上的开销,所以设置这个时间间隔,要很慎重。对于小批次的数据,比如一秒的,检查操作会大大降低吞吐量。但是检查的间隔太长,会导致任务变大。通常来说,5-10秒的检查间隔时间是比较合适的。

实例:

   ssc.checkpoint("hdfs://192.168.1.100:9000/check")

   valStreamFile1=ssc.textFileStream(Spath1)

StreamFile1.checkpoint(Seconds(30))

 

转载请注明出处:

http://blog.csdn.net/sunbow0/article/details/42966467




推荐阅读
  • 纠正网上的错误:自定义一个类叫java.lang.System/String的方法
    本文纠正了网上关于自定义一个类叫java.lang.System/String的错误答案,并详细解释了为什么这种方法是错误的。作者指出,虽然双亲委托机制确实可以阻止自定义的System类被加载,但通过自定义一个特殊的类加载器,可以绕过双亲委托机制,达到自定义System类的目的。作者呼吁读者对网上的内容持怀疑态度,并带着问题来阅读文章。 ... [详细]
  • 基于Socket的多个客户端之间的聊天功能实现方法
    本文介绍了基于Socket的多个客户端之间实现聊天功能的方法,包括服务器端的实现和客户端的实现。服务器端通过每个用户的输出流向特定用户发送消息,而客户端通过输入流接收消息。同时,还介绍了相关的实体类和Socket的基本概念。 ... [详细]
  • ***byte(字节)根据长度转成kb(千字节)和mb(兆字节)**parambytes*return*publicstaticStringbytes2kb(longbytes){ ... [详细]
  • 向QTextEdit拖放文件的方法及实现步骤
    本文介绍了在使用QTextEdit时如何实现拖放文件的功能,包括相关的方法和实现步骤。通过重写dragEnterEvent和dropEvent函数,并结合QMimeData和QUrl等类,可以轻松实现向QTextEdit拖放文件的功能。详细的代码实现和说明可以参考本文提供的示例代码。 ... [详细]
  • 个人学习使用:谨慎参考1Client类importcom.thoughtworks.gauge.Step;importcom.thoughtworks.gauge.T ... [详细]
  • 本文介绍了UVALive6575题目Odd and Even Zeroes的解法,使用了数位dp和找规律的方法。阶乘的定义和性质被介绍,并给出了一些例子。其中,部分阶乘的尾零个数为奇数,部分为偶数。 ... [详细]
  • CF:3D City Model(小思维)问题解析和代码实现
    本文通过解析CF:3D City Model问题,介绍了问题的背景和要求,并给出了相应的代码实现。该问题涉及到在一个矩形的网格上建造城市的情景,每个网格单元可以作为建筑的基础,建筑由多个立方体叠加而成。文章详细讲解了问题的解决思路,并给出了相应的代码实现供读者参考。 ... [详细]
  • [大整数乘法] java代码实现
    本文介绍了使用java代码实现大整数乘法的过程,同时也涉及到大整数加法和大整数减法的计算方法。通过分治算法来提高计算效率,并对算法的时间复杂度进行了研究。详细代码实现请参考文章链接。 ... [详细]
  • 本文介绍了南邮ctf-web的writeup,包括签到题和md5 collision。在CTF比赛和渗透测试中,可以通过查看源代码、代码注释、页面隐藏元素、超链接和HTTP响应头部来寻找flag或提示信息。利用PHP弱类型,可以发现md5('QNKCDZO')='0e830400451993494058024219903391'和md5('240610708')='0e462097431906509019562988736854'。 ... [详细]
  • 前景:当UI一个查询条件为多项选择,或录入多个条件的时候,比如查询所有名称里面包含以下动态条件,需要模糊查询里面每一项时比如是这样一个数组条件:newstring[]{兴业银行, ... [详细]
  • 学习Java异常处理之throws之抛出并捕获异常(9)
    任务描述本关任务:在main方法之外创建任意一个方法接收给定的两个字符串,把第二个字符串的长度减1生成一个整数值,输出第一个字符串长度是 ... [详细]
  • 本文介绍了Android中的assets目录和raw目录的共同点和区别,包括获取资源的方法、目录结构的限制以及列出资源的能力。同时,还解释了raw目录中资源文件生成的ID,并说明了这些目录的使用方法。 ... [详细]
  • 本文介绍了使用Spark实现低配版高斯朴素贝叶斯模型的原因和原理。随着数据量的增大,单机上运行高斯朴素贝叶斯模型会变得很慢,因此考虑使用Spark来加速运行。然而,Spark的MLlib并没有实现高斯朴素贝叶斯模型,因此需要自己动手实现。文章还介绍了朴素贝叶斯的原理和公式,并对具有多个特征和类别的模型进行了讨论。最后,作者总结了实现低配版高斯朴素贝叶斯模型的步骤。 ... [详细]
  • 大数据Hadoop生态(20)MapReduce框架原理OutputFormat的开发笔记
    本文介绍了大数据Hadoop生态(20)MapReduce框架原理OutputFormat的开发笔记,包括outputFormat接口实现类、自定义outputFormat步骤和案例。案例中将包含nty的日志输出到nty.log文件,其他日志输出到other.log文件。同时提供了一些相关网址供参考。 ... [详细]
  • 本文讨论了如何使用GStreamer来删除H264格式视频文件中的中间部分,而不需要进行重编码。作者提出了使用gst_element_seek(...)函数来实现这个目标的思路,并提到遇到了一个解决不了的BUG。文章还列举了8个解决方案,希望能够得到更好的思路。 ... [详细]
author-avatar
珠珠VS胖胖
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有