热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

【SparkStreaming】概述、DS入门、DS创建

文章目录一SparkStreaming概述0基本概念(1)离线和实时概念(2)批量和流式概念1SparkStreaming是


文章目录

  • 一 SparkStreaming概述
    • 0 基本概念
      • (1)离线和实时概念
      • (2)批量和流式概念
    • 1 Spark Streaming是什么
    • 2 Spark Streaming的特点
    • 3 Spark Streaming架构
      • (1)架构图
      • (2)背压机制
  • 二 Dstream入门
    • 1 WordCount案例实操
      • (1)添加依赖
      • (2)编写代码
      • (3)启动程序并通过netcat发送数据
    • 2 WordCount解析
    • 3 一些注意点
  • 三 DStream创建
    • 1 RDD队列
      • (1)用法及说明
      • (2)案例实操
    • 2 自定义数据源
      • (1)用法及说明
      • (2)案例实操
        • 自定义数据源
        • 使用自定义的数据源采集数据
    • 3 Kafka数据源
      • (1)版本选型
      • (2)Kafka 0-8 Receive模式
        • 需求
        • 导入依赖
        • 编写代码
        • 生产数据


一 SparkStreaming概述


0 基本概念


(1)离线和实时概念

数据处理的延迟


  • 离线计算

在计算开始前已知所有输入数据,输入数据不会产生变化,一般计算量级较大,计算时间也较长。例如今天早上一点,把昨天累积的日志,计算出所需结果。最经典的就是Hadoop的MapReduce方式;


  • 实时计算

输入数据是可以以序列化的方式一个一个地输入并进行处理,也就是说在开始的时候并不需要知道所有的输入数据。与离线计算相比,运行时间短,计算量级相对较小。强调计算过程的时间要短,即所查当下给出结果。


(2)批量和流式概念

数据处理的方式


  • 批:处理离线数据,冷数据。单个处理数据量大,处理速度比流慢。
  • 流:在线,实时产生的数据。单次处理的数据量小,但处理速度更快。

近年来,在Web应用、网络监控、传感监测等领域,兴起了一种新的数据密集型应用——流数据,即数据以大量、快速、时变的流形式持续到达。实例:PM2.5检测、电子商务网站用户点击流。

流数据具有如下特征:


  • 数据快速持续到达,潜在大小也许是无穷无尽的
  • 数据来源众多,格式复杂
  • 数据量大,但是不十分关注存储,一旦经过处理,要么被丢弃,要么被归档存储

注重数据的整体价值,不过分关注个别数据


1 Spark Streaming是什么

Spark流使得构建可扩展的容错流应用程序变得更加容易。

Spark Streaming用于流式数据的处理。Spark Streaming支持的数据输入源很多,例如:Kafka、Flume、Twitter、ZeroMQ和简单的TCP套接字等等。数据输入后可以用Spark的高度抽象原语如:map、reduce、join、window等进行运算。而结果也能保存在很多地方,如HDFS,数据库等。
在这里插入图片描述

在 Spark Streaming 中,处理数据的单位是一批而不是单条,而数据采集却是逐条进行的,因此 Spark
Streaming 系统需要设置间隔使得数据汇总到一定的量后再一并操作,这个间隔就是批处理间隔。批处理间隔是Spark Streaming的核心概念和关键参数,它决定了Spark Streaming提交作业的频率和数据处理的延迟,同时也影响着数据处理的吞吐量和性能。
在这里插入图片描述

和Spark基于RDD的概念很相似,Spark Streaming使用离散化流(discretized stream)作为抽象表示,叫作DStream。DStream是随时间推移而收到的数据的序列。在内部,每个时间区间收到的数据都作为RDD 存在,而DStream是由这些RDD所组成的序列(因此得名“离散化”)。DStreams可以由来自数据源的输入数据流来创建, 也可以通过在其他的DStreams上应用一些高阶操作来得到。所以简单来讲,DStream就是对RDD在实时数据处理场景的一种封装。


2 Spark Streaming的特点


  • 易用

在这里插入图片描述


  • 容错

在这里插入图片描述


  • 易整合到Spark体系
    在这里插入图片描述

  • 缺点

    Spark Streaming是一种“微量批处理”架构, 和其他基于“一次处理一条记录”架构的系统相比, 它的延迟会相对高一些。


3 Spark Streaming架构


(1)架构图

整体架构图

在这里插入图片描述

SparkStreaming架构图

在这里插入图片描述


(2)背压机制

Spark 1.5以前版本,用户如果要限制Receiver的数据接收速率,可以通过设置静态配制参数“spark.streaming.receiver.maxRate”的值来实现,这样虽然可以通过限制接收速率,来适配当前的处理能力,防止内存溢出,但也会引入其它问题。比如:producer数据生产高于maxRate,当前集群处理能力也高于maxRate,这就会造成资源利用率下降等问题。

为了更好的协调数据接收速率与资源处理能力,1.5版本开始Spark Streaming可以动态控制数据接收速率来适配集群数据处理能力。背压机制(即Spark Streaming Backpressure): 根据JobScheduler反馈作业的执行信息来动态调整Receiver数据接收率。

通过属性“spark.streaming.backpressure.enabled”来控制是否启用backpressure机制,默认值false,即不启用。


二 Dstream入门


1 WordCount案例实操

需求:使用netcat工具向9999端口不断的发送数据,通过SparkStreaming读取端口数据并统计不同单词出现的次数


(1)添加依赖

org.apache.sparkspark-streaming_2.123.0.0


(2)编写代码

object StreamWordCount {def main(args: Array[String]): Unit = {//初始化Spark配置信息,Streaming程序执行至少需要两个线程(采集、执行)//不能设置为localval sparkConf = new SparkConf().setMaster("local[*]").setAppName("StreamWordCount")//初始化SparkStreamingContext,程序执行入口对象//Seconds 底层调用 new Durationval ssc = new StreamingContext(sparkConf, Seconds(3))//通过监控端口创建DStream,读进来的数据为一行一行的val lineStreams = ssc.socketTextStream("hadoop101", 9999)//将每一行数据做切分,形成一个个单词val wordStreams = lineStreams.flatMap(_.split(" "))//将单词映射成元组(word,1)val wordAndOneStreams = wordStreams.map((_, 1))//将相同的单词次数做统计val wordAndCountStreams = wordAndOneStreams.reduceByKey(_+_)//打印wordAndCountStreams.print()//启动SparkStreamingContext采集器ssc.start()//默认情况下采集器不能关闭,等待采集结束之后,终止程序ssc.awaitTermination()}
}

(3)启动程序并通过netcat发送数据

# 开启SecureCRT
nc -lk 9999
输入内容

2 WordCount解析

Discretized Stream是Spark Streaming的基础抽象,代表持续性的数据流和经过各种Spark原语操作后的结果数据流。在内部实现上,DStream是一系列连续的RDD来表示。每个RDD含有一段时间间隔内的数据。

在这里插入图片描述

对数据的操作也是按照RDD为单位来进行的

在这里插入图片描述

计算过程由Spark Engine来完成

在这里插入图片描述


3 一些注意点


  • 一旦StreamingContext已经启动, 则不能再添加新的 streaming computations
  • 一旦一个StreamingContext已经停止(StreamingContext.stop()), 他也不能再重启
  • 在一个 JVM 内, 同一时间只能启动一个StreamingContext
  • stop() 的方式停止StreamingContext, 也会把SparkContext停掉. 如果仅仅想停止StreamingContext, 则应该这样: stop(false)
  • 一个SparkContext可以重用去创建多个StreamingContext, 前提是以前的StreamingContext已经停掉,并且SparkContext没有被停掉

三 DStream创建


1 RDD队列


(1)用法及说明

测试过程中,可以通过使用ssc.queueStream(queueOfRDDs)来创建DStream,每一个推送到这个队列中的RDD,都会作为一个DStream处理


(2)案例实操

需求:循环创建几个RDD,将RDD放入队列。通过SparkStream创建Dstream,计算WordCount

object RDDStream {def main(args: Array[String]) {//1.初始化Spark配置信息val conf &#61; new SparkConf().setMaster("local[*]").setAppName("RDDStream")//2.初始化SparkStreaming上下文环境对象val ssc &#61; new StreamingContext(conf, Seconds(4))//3.创建RDD队列val rddQueue &#61; new mutable.Queue[RDD[Int]]()//4.创建QueueInputDStream&#xff0c;从队列中采集数据&#xff0c;获取DSval inputStream &#61; ssc.queueStream(rddQueue,oneAtATime &#61; false)//5.处理队列中的RDD数据val mappedStream &#61; inputStream.map((_,1))val reducedStream &#61; mappedStream.reduceByKey(_ &#43; _)//6.打印结果reducedStream.print()//7.启动采集器ssc.start()//8.循环创建并向RDD队列中放入RDDfor (i <- 1 to 5) {rddQueue &#43;&#61; ssc.sparkContext.makeRDD(1 to 300, 10)Thread.sleep(2000)}ssc.awaitTermination()}
}

结果展示

-------------------------------------------
Time: 1662277280000 ms
-------------------------------------------
(196,1)
(296,1)
(96,1)
(52,1)
(4,1)
(180,1)
(16,1)
(156,1)
(216,1)
(28,1)
...-------------------------------------------
Time: 1662277284000 ms
-------------------------------------------
(196,2)
(296,2)
(96,2)
(52,2)
(4,2)
(180,2)
(16,2)
(156,2)
(216,2)
(28,2)
...-------------------------------------------
Time: 1662277288000 ms
-------------------------------------------
(196,2)
(296,2)
(96,2)
(52,2)
(4,2)
(180,2)
(16,2)
(156,2)
(216,2)
(28,2)
...-------------------------------------------
Time: 1662277292000 ms
-------------------------------------------

2 自定义数据源


&#xff08;1&#xff09;用法及说明

需要继承Receiver&#xff0c;并实现onStart、onStop方法来自定义数据源采集


&#xff08;2&#xff09;案例实操

需求&#xff1a;自定义数据源&#xff0c;实现监控某个端口号&#xff0c;获取该端口号内容


自定义数据源

//泛型表示读取数据的类型
class CustomerReceiver(host: String, port: Int) extends Receiver[String](StorageLevel.MEMORY_ONLY) {//最初启动的时候&#xff0c;调用该方法&#xff0c;作用为&#xff1a;读数据并将数据发送给Sparkoverride def onStart(): Unit &#61; {new Thread("Socket Receiver") {override def run() {receive()}}.start()}//读数据并将数据发送给Spark&#xff0c;真正处理接收数据的逻辑def receive(): Unit &#61; {//创建一个Socket连接var socket: Socket &#61; new Socket(host, port)//定义一个变量&#xff0c;用来接收端口传过来的数据var input: String &#61; null//创建一个BufferedReader用于按行读取端口传来的数据//InputStreamReader 将字节流转换为字符流val reader &#61; new BufferedReader(new InputStreamReader(socket.getInputStream, StandardCharsets.UTF_8))//读取一行数据input &#61; reader.readLine()//当receiver没有关闭并且输入数据不为空&#xff0c;则循环发送数据给Sparkwhile (!isStopped() && input !&#61; null) {store(input)input &#61; reader.readLine()}//跳出循环则关闭资源reader.close()socket.close()//重启任务restart("restart")}override def onStop(): Unit &#61; {if(socket !&#61; null ){socket.close()socket &#61; null}}
}

使用自定义的数据源采集数据

object FileStream {def main(args: Array[String]): Unit &#61; {//1.初始化Spark配置信息
val sparkConf &#61; new SparkConf().setMaster("local[*]")
.setAppName("StreamWordCount")//2.初始化SparkStreamingContextval ssc &#61; new StreamingContext(sparkConf, Seconds(5))//3.创建自定义数据源创建receiver的Streaming
val lineStream &#61; ssc.receiverStream(new CustomerReceiver("hadoop101", 9999))//4.将每一行数据做切分&#xff0c;形成一个个单词val wordStream &#61; lineStream.flatMap(_.split("\t"))//5.将单词映射成元组&#xff08;word,1&#xff09;val wordAndOneStream &#61; wordStream.map((_, 1))//6.将相同的单词次数做统计val wordAndCountStream &#61; wordAndOneStream.reduceByKey(_ &#43; _)//7.打印wordAndCountStream.print()//8.启动SparkStreamingContextssc.start()ssc.awaitTermination()}
}

3 Kafka数据源


&#xff08;1&#xff09;版本选型

ReceiverAPI&#xff1a;需要一个专门的Executor去接收数据&#xff0c;然后发送给其他的Executor做计算。存在的问题&#xff0c;接收数据的Executor和计算的Executor速度会有所不同&#xff0c;特别在接收数据的Executor速度大于计算的Executor速度&#xff0c;会导致计算数据的节点内存溢出。早期版本中提供此方式&#xff0c;当前版本不适用。默认情况下&#xff0c;offset维护在zk中。

DirectAPI&#xff1a;是由计算的Executor来主动消费Kafka的数据&#xff0c;速度由自身控制。默认情况下&#xff0c;offseet维护在checkpoint检查点&#xff0c;需要改变SparkStreamingContext的创建方式&#xff1b;也可以手动指定offset维护位置&#xff0c;为了保证数据的精准一致性&#xff0c;一般维护在有事务的存储上。

在这里插入图片描述


&#xff08;2&#xff09;Kafka 0-8 Receive模式


需求

通过SparkStreaming从Kafka读取数据&#xff0c;并将读取过来的数据做简单计算&#xff0c;最终打印到控制台。


导入依赖

org.apache.sparkspark-streaming-kafka-0-8_2.112.4.5


编写代码

// 通过ReciverAPI连接kafka数据源&#xff0c;获取数据
object Spark04_ReceiverAPI {def main(args: Array[String]): Unit &#61; {//1.创建SparkConfval sparkConf: SparkConf &#61; new SparkConf().setAppName("Spark04_ReceiverAPI").setMaster("local[*]")//2.创建StreamingContext&#xff0c;第二个参数为采集周期val ssc &#61; new StreamingContext(sparkConf, Seconds(3))//3.使用ReceiverAPI读取Kafka数据创建DStreamval kafkaDStream: ReceiverInputDStream[(String, String)] &#61; KafkaUtils.createStream(//Streaming Contextssc,//Zookeeper地址"hadoop101:2181,hadoop102:2181,hadoop103:2181",//groupid&#xff0c;消费者组"bigdata",//k表示主题的名字&#xff0c;v表示主题的分区数Map("mybak" -> 2))//4.计算WordCount并打印 new KafkaProducer[String,String]().send(new ProducerRecord[]())//获取kafka中的消息&#xff0c;只需要v的部分val lineDStream: DStream[String] &#61; kafkaDStream.map(_._2)val word: DStream[String] &#61; lineDStream.flatMap(_.split(" "))val wordToOneDStream: DStream[(String, Int)] &#61; word.map((_, 1))val wordToCountDStream: DStream[(String, Int)] &#61; wordToOneDStream.reduceByKey(_ &#43; _)wordToCountDStream.print()//5.开启任务ssc.start()ssc.awaitTermination()}
}

生产数据

# 启动Zookeeper
zk start
# 启动kafka
kafka start
# 查看所有主题
bin/kafka-topics.sh --list --bootstrap-server hadoop101:9092
# 创建一个和上述代码中相同的主题
bin/kafka-topics.sh --create --bootstrap-server hadoop101:9092 --topic bigdata --partitions 2 --replication-factor 2
# 生产者向topic发送消息
bin/kafka-console-producer.sh --broker-list hadoop101:9092 --topic bigdata
# 启动上述代码&#xff0c;查看是否可以连接到kafka&#xff0c;并且接收到生产者传来的消息
# 发送的内容
hello word
hello scala
hello java

0-8 Receive模式&#xff0c;offset维护在zk中&#xff0c;程序停止后&#xff0c;继续生产数据&#xff0c;再次启动程序&#xff0c;仍然可以继续消费。可通过get /consumers/bigdata/offsets/主题名/分区号 查看&#xff0c;注意会存在一些延迟


推荐阅读
  • 一、Hadoop来历Hadoop的思想来源于Google在做搜索引擎的时候出现一个很大的问题就是这么多网页我如何才能以最快的速度来搜索到,由于这个问题Google发明 ... [详细]
  • Tomcat/Jetty为何选择扩展线程池而不是使用JDK原生线程池?
    本文探讨了Tomcat和Jetty选择扩展线程池而不是使用JDK原生线程池的原因。通过比较IO密集型任务和CPU密集型任务的特点,解释了为何Tomcat和Jetty需要扩展线程池来提高并发度和任务处理速度。同时,介绍了JDK原生线程池的工作流程。 ... [详细]
  • Oracle优化新常态的五大禁止及其性能隐患
    本文介绍了Oracle优化新常态中的五大禁止措施,包括禁止外键、禁止视图、禁止触发器、禁止存储过程和禁止JOB,并分析了这些禁止措施可能带来的性能隐患。文章还讨论了这些禁止措施在C/S架构和B/S架构中的不同应用情况,并提出了解决方案。 ... [详细]
  • 如何优化Webpack打包后的代码分割
    本文介绍了如何通过优化Webpack的代码分割来减小打包后的文件大小。主要包括拆分业务逻辑代码和引入第三方包的代码、配置Webpack插件、异步代码的处理、代码分割重命名、配置vendors和cacheGroups等方面的内容。通过合理配置和优化,可以有效减小打包后的文件大小,提高应用的加载速度。 ... [详细]
  • {moduleinfo:{card_count:[{count_phone:1,count:1}],search_count:[{count_phone:4 ... [详细]
  • 浅析对象 VO、DTO、DO、PO 概念
    作者|CatQi链接|cnblogs.comqixuejiap4390086.html前言由于此订阅号换了个皮肤,导致用户接受文章不及时。读者可以打开订阅号「Web项 ... [详细]
  • 什么是大数据lambda架构
    一、什么是Lambda架构Lambda架构由Storm的作者[NathanMarz]提出,根据维基百科的定义,Lambda架构的设计是为了在处理大规模数 ... [详细]
  • mapreduce源码分析总结
    这篇文章总结的非常到位,故而转之一MapReduce概述MapReduce是一个用于大规模数据处理的分布式计算模型,它最初是由Google工程师设计并实现的ÿ ... [详细]
  • 1、概述首先和大家一起回顾一下Java消息服务,在我之前的博客《Java消息队列-JMS概述》中,我为大家分析了:然后在另一篇博客《Java消息队列-ActiveMq实战》中 ... [详细]
  • 《Spark核心技术与高级应用》——1.2节Spark的重要扩展
    本节书摘来自华章社区《Spark核心技术与高级应用》一书中的第1章,第1.2节Spark的重要扩展,作者于俊向海代其锋马海平,更多章节内容可以访问云栖社区“华章社区”公众号查看1. ... [详细]
  • 我们在之前的文章中已经初步介绍了Cloudera。hadoop基础----hadoop实战(零)-----hadoop的平台版本选择从版本选择这篇文章中我们了解到除了hadoop官方版本外很多 ... [详细]
  • 讨伐Java多线程与高并发——MQ篇
    本文是学习Java多线程与高并发知识时做的笔记。这部分内容比较多,按照内容分为5个部分:多线程基础篇JUC篇同步容器和并发容器篇线程池篇MQ篇本篇 ... [详细]
  • celery 爬虫使用
    简介celery是一个基于分布式消息传输的异步任务队列,它专注于实时处理,同时也支持任务调度。它由三部分组成,消息中间件, ... [详细]
  • Zookeeper为分布式环境提供灵活的协调基础架构。ZooKeeper框架支持许多当今最好的工业应用程序。我们将在本章中讨论ZooKeeper的一些最显着的应用。雅虎ZooKee ... [详细]
  • Azkaban(三)Azkaban的使用
    界面介绍首页有四个菜单projects:最重要的部分,创建一个工程,所有flows将在工程中运行。scheduling:显示定时任务executing:显示当前运行的任务histo ... [详细]
author-avatar
a171759015_753
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有