热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Spark(十)SparkStreamingAPI编程

本文测试的Spark版本是1.3.1SparkStreaming编程模型:第一步:需要一个StreamingContext对象,该对象是SparkStreaming操作的入口,而

本文测试的Spark版本是1.3.1

Spark Streaming编程模型:

第一步:
需要一个StreamingContext对象,该对象是Spark Streaming操作的入口 ,而构建一个StreamingContext对象需要两个参数:
1、SparkConf对象:该对象是配置Spark 程序设置的,例如集群的Master节点,程序名等信息
2、Seconds对象:该对象设置了StreamingContext多久读取一次数据流

第二步:
构建好入口对象之后,直接调用该入口的方法读取各种不同方式传输过来的数据流,如:Socket,HDFS等方式。并会将数据转换成DStream对象进行统一操作

第三步:
DStream本身是一种RDD序列,Streaming接受数据流之后会进行切片,每个片都是一个RDD,而这些RDD最后都会包装到一个DStream对象中统一操作。在这个步骤中,进行对数据的业务处理

第四步:
调用入口对象的start和awaitTermination开始读取数据流

下面分别使用不同的Spark Streaming 处理方式完成WordCount单词计数

HDFS文件测试

object HDFSWordCount {
  def main(args: Array[String]) {
    //参数设置
    if (args.length <2) {
      System.err.println("Usgae :  ")
      System.exit(1)
    }

    //第一步:创建StreamingContext入口
    val sparkCOnf= new SparkConf().setMaster(args(0)).setAppName("HDFSWordCount")
    val streaming = new StreamingContext(sparkConf,Seconds(10))
    //第二步:调用textFileStream读取指定路径的文件
    val data = streaming.textFileStream(args(1))
    //第三步,数据业务处理
    //使用flatMap将数据map之后的分切压成一个DStream
    val words = data.flatMap(_.split(" "))
    val wordCount = words.map(x => (x,1)).reduceByKey(_+_)
    wordCount.print()
    //第四步
    streaming.start()
    streaming.awaitTermination()
}

Socket数据流测试

object NetworkWordCount {
  def main(args: Array[String]) {
    if (args.length <3) {
      System.err.println("Usage :   ")
      System.exit(1)
    }
    val sparkCOnf= new SparkConf().setMaster(args(0)).setAppName("NetworkWordCount")
    val streaming = new StreamingContext(sparkConf,Seconds(10))
    //参数:1、主机名;2、端口号;3、存储级别
    val data = 
 streaming.socketTextStream(args(1),args(2).toInt,StorageLevel.MEMORY_AND_DISK_SER)
    val words = lines.flatMap(_.split(" "))
    val wordCount = words.map(x => (x, 1)).reduceByKey(_ + _)
    wordCount.print()
    streaming.start()
    streaming.awaitTermination()
}

可以看到,对于同一中业务处理逻辑来说,不同的数据来源只要调用不同的方法接收即可,转换成DStream之后的处理步骤是一模一样的

下面的代码时配合测试Socket数据的,使用java命令执行jar包,传入参数:1、端口号;2、产生数据的频率(毫秒)
即可在指定的端口上产生数据提供Spark Streaming接收

package Streaming

import java.net.ServerSocket
import java.io.PrintWriter

object Logger {
  def generateContent(index: Int): String = {
    import scala.collection.mutable.ListBuffer
    val charList = ListBuffer[Char]()
    for (i <- 65 to 90) {
      charList += i.toChar
    }
    val charArray = charList.toArray
    charArray(index).toString()
  }

  def index = {
    import java.util.Random
    val ran = new Random
    ran.nextInt(7)
  }

  def main(args: Array[String]): Unit = {
    if (args.length != 2) {
      System.err.println("Usage: ")
      System.exit(1)
    }
    val listener = new ServerSocket(args(0).toInt)
    while (true) {
      val socket = listener.accept()
      new Thread() {
        override def run = {
          println("Get client connected from:" + socket.getInetAddress)
          val out = new PrintWriter(socket.getOutputStream(), true)
          while (true) {
            Thread.sleep(args(1).toLong)
            val cOntent= generateContent(index)
            println(content)
            out.write(content + ‘\n‘)
            out.flush()
          }
          socket.close()
        }
      }.start()
    }
  }
}

在上述的例子中,文中使用的是Seconds(10)
也就是说每10秒钟处理一次数据
第一个10秒处理的结果是不会影响到第二个10秒的
但是有时候我们需要进行汇通统计,要用到之前几个10秒阶段的数据怎么办?

这里要用到一个updateStateByKey方法,该方法会保存上次计算数据的状态,以供下次计算使用。
上代码:

object StatefulWordCount {
  def main(args: Array[String]) {
    if (args.length <3) {
      System.err.println("Usage :   ")
      System.exit(1)
    }
    //定义一个匿名函数,并赋值给updateFunc
    //该函数是updateStateByKey方法的参数,该方法要求传入一个匿名参数且参数格式为values:Seq[Int],state:Option[Int]
    //其中values是当前的数据,state表示之前的数据
    //这个匿名函数的作用就是将各个10秒阶段的结果累加汇总
    val updateFunc = (values:Seq[Int],state:Option[Int]) => {
    val now = values.foldLeft(0)(_+_)
    val old = state.getOrElse(0)
    Some(now + old)
}
    val cOnf= new SparkConf().setAppName("StatefulWordCount").setMaster(args(0))
    val streaming = new StreamingContext(conf, Seconds(10))
    //checkpoint会将数据放在指定的路径上,这个操作是必须的,为了保护数据,如果不设置会报异常
    streaming.checkpoint(".")
    val lines = streaming.socketTextStream(args(1), args(2).toInt, StorageLevel.MEMORY_AND_DISK_SER)
    val words = lines.flatMap(_.split(" "))
    val wordDStream = words.map(x => (x, 1))
    //在这里将updateFunc传入
    val stateDStream = wordDStream.updateStateByKey(updateFunc)
    stateDStream.print()
    streaming.start()
    streaming.awaitTermination()
}

在Spark Streaming中还有一个window的概念,即滑动窗体

下图是官方文档中给出的解释:

技术分享

使用滑动窗体要设置两个指定参数:
1、窗体长度
2、滑动时间
例如,设置一个窗体长度为5,滑动时间为2,意味着,每2秒处理上一个5秒内的数据流
这样的处理可以应用在例如微博统计最热搜索词
每2秒钟统计一次过去5秒内的最热搜索词

统计最热搜索词实例代码:

object WindowWordCount {
  def main(args: Array[String]) {
    if (args.length <3) {
      System.err.println("Usage :      ")
      System.exit(1)
    }
    val cOnf= new SparkConf().setAppName("WindowWordCount").setMaster(args(0))
    val streaming = new StreamingContext(conf, Seconds(args(3).toInt))
    //checkpoint会将数据放在指定的路径上,这个操作是必须的,为了保护数据,如果不设置会报异常
    streaming.checkpoint(".")
    val lines = streaming.socketTextStream(args(1), args(2).toInt, StorageLevel.MEMORY_ONLY)
    val words = lines.flatMap(_.split(" "))
    //map操作之后数据的格式为:
    //(a,1)(b,1)...(n,1)格式
    //调用reduceByKeyAndWindow替代普通的reduceByKey
    //最后两个参数分别是窗体长度和滑动时间
    val wordCount = words.map(x => (x, 1)).reduceByKeyAndWindow(_ + _, _ - _, Seconds(args(4).toInt),
      Seconds(args(5).toInt))
    //对结果进行降序排序
    //由于DStream本身不具备RDD的一些操作,调用transform方法可以让RDD的一些操作(例如sortByKey等)作用在其之上,返回的仍然是一个DStream对象
    val sorted = wordCount.map { case (char, count) => (count, char) }.transform(_.sortByKey(false)).map
    { case (count, char) => (char, count) }
    sorted.print()
    streaming.start()
    streaming.awaitTermination()
  }
}

reduceByKeyAndWindow有两种使用方法:
1、educeByKeyAndWindow(_ + _, Seconds(5),seconds(1))
2、reduceByKeyAndWindow(_ + , - _, Seconds(5),seconds(1))

二者的区别见下图:

技术分享

第一种是简单粗暴的直接累加
而第二种方式就显得比较文雅和高效了
例如现在计算t+4的累积数据
第一种方式是,直接从t+…+(t+4)
第二种处理为,用已经计算好的(t+3)的数据加上(t+4)的数据,在减去(t-1)的数据,就可以得到和第一种方式一样的结果,但是中间复用了三个数据(t+1,t+2,t+3)

以上为Spark Streaming API的简单使用

Spark(十) -- Spark Streaming API编程


推荐阅读
  • 后台获取视图对应的字符串
    1.帮助类后台获取视图对应的字符串publicclassViewHelper{将View输出为字符串(注:不会执行对应的ac ... [详细]
  • 本文介绍了C#中数据集DataSet对象的使用及相关方法详解,包括DataSet对象的概述、与数据关系对象的互联、Rows集合和Columns集合的组成,以及DataSet对象常用的方法之一——Merge方法的使用。通过本文的阅读,读者可以了解到DataSet对象在C#中的重要性和使用方法。 ... [详细]
  • 本文介绍了OC学习笔记中的@property和@synthesize,包括属性的定义和合成的使用方法。通过示例代码详细讲解了@property和@synthesize的作用和用法。 ... [详细]
  • 本文介绍了在SpringBoot中集成thymeleaf前端模版的配置步骤,包括在application.properties配置文件中添加thymeleaf的配置信息,引入thymeleaf的jar包,以及创建PageController并添加index方法。 ... [详细]
  • 本文详细介绍了Linux中进程控制块PCBtask_struct结构体的结构和作用,包括进程状态、进程号、待处理信号、进程地址空间、调度标志、锁深度、基本时间片、调度策略以及内存管理信息等方面的内容。阅读本文可以更加深入地了解Linux进程管理的原理和机制。 ... [详细]
  • 1,关于死锁的理解死锁,我们可以简单的理解为是两个线程同时使用同一资源,两个线程又得不到相应的资源而造成永无相互等待的情况。 2,模拟死锁背景介绍:我们创建一个朋友 ... [详细]
  • 本文内容为asp.net微信公众平台开发的目录汇总,包括数据库设计、多层架构框架搭建和入口实现、微信消息封装及反射赋值、关注事件、用户记录、回复文本消息、图文消息、服务搭建(接入)、自定义菜单等。同时提供了示例代码和相关的后台管理功能。内容涵盖了多个方面,适合综合运用。 ... [详细]
  • 本文介绍了lua语言中闭包的特性及其在模式匹配、日期处理、编译和模块化等方面的应用。lua中的闭包是严格遵循词法定界的第一类值,函数可以作为变量自由传递,也可以作为参数传递给其他函数。这些特性使得lua语言具有极大的灵活性,为程序开发带来了便利。 ... [详细]
  • 基于layUI的图片上传前预览功能的2种实现方式
    本文介绍了基于layUI的图片上传前预览功能的两种实现方式:一种是使用blob+FileReader,另一种是使用layUI自带的参数。通过选择文件后点击文件名,在页面中间弹窗内预览图片。其中,layUI自带的参数实现了图片预览功能。该功能依赖于layUI的上传模块,并使用了blob和FileReader来读取本地文件并获取图像的base64编码。点击文件名时会执行See()函数。摘要长度为169字。 ... [详细]
  • HDU 2372 El Dorado(DP)的最长上升子序列长度求解方法
    本文介绍了解决HDU 2372 El Dorado问题的一种动态规划方法,通过循环k的方式求解最长上升子序列的长度。具体实现过程包括初始化dp数组、读取数列、计算最长上升子序列长度等步骤。 ... [详细]
  • Mac OS 升级到11.2.2 Eclipse打不开了,报错Failed to create the Java Virtual Machine
    本文介绍了在Mac OS升级到11.2.2版本后,使用Eclipse打开时出现报错Failed to create the Java Virtual Machine的问题,并提供了解决方法。 ... [详细]
  • 在说Hibernate映射前,我们先来了解下对象关系映射ORM。ORM的实现思想就是将关系数据库中表的数据映射成对象,以对象的形式展现。这样开发人员就可以把对数据库的操作转化为对 ... [详细]
  • 知识图谱——机器大脑中的知识库
    本文介绍了知识图谱在机器大脑中的应用,以及搜索引擎在知识图谱方面的发展。以谷歌知识图谱为例,说明了知识图谱的智能化特点。通过搜索引擎用户可以获取更加智能化的答案,如搜索关键词"Marie Curie",会得到居里夫人的详细信息以及与之相关的历史人物。知识图谱的出现引起了搜索引擎行业的变革,不仅美国的微软必应,中国的百度、搜狗等搜索引擎公司也纷纷推出了自己的知识图谱。 ... [详细]
  • XML介绍与使用的概述及标签规则
    本文介绍了XML的基本概念和用途,包括XML的可扩展性和标签的自定义特性。同时还详细解释了XML标签的规则,包括标签的尖括号和合法标识符的组成,标签必须成对出现的原则以及特殊标签的使用方法。通过本文的阅读,读者可以对XML的基本知识有一个全面的了解。 ... [详细]
  • 本文讨论了一个关于cuowu类的问题,作者在使用cuowu类时遇到了错误提示和使用AdjustmentListener的问题。文章提供了16个解决方案,并给出了两个可能导致错误的原因。 ... [详细]
author-avatar
万花_筒乙姬睦美
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有