数据处理的延迟
在计算开始前已知所有输入数据,输入数据不会产生变化,一般计算量级较大,计算时间也较长。例如今天早上一点,把昨天累积的日志,计算出所需结果。最经典的就是Hadoop的MapReduce方式;
输入数据是可以以序列化的方式一个一个地输入并进行处理,也就是说在开始的时候并不需要知道所有的输入数据。与离线计算相比,运行时间短,计算量级相对较小。强调计算过程的时间要短,即所查当下给出结果。
数据处理的方式
近年来,在Web应用、网络监控、传感监测等领域,兴起了一种新的数据密集型应用——流数据,即数据以大量、快速、时变的流形式持续到达。实例:PM2.5检测、电子商务网站用户点击流。
流数据具有如下特征:
注重数据的整体价值,不过分关注个别数据
Spark流使得构建可扩展的容错流应用程序变得更加容易。
Spark Streaming用于流式数据的处理。Spark Streaming支持的数据输入源很多,例如:Kafka、Flume、Twitter、ZeroMQ和简单的TCP套接字等等。数据输入后可以用Spark的高度抽象原语如:map、reduce、join、window等进行运算。而结果也能保存在很多地方,如HDFS,数据库等。
在 Spark Streaming 中,处理数据的单位是一批而不是单条,而数据采集却是逐条进行的,因此 Spark
Streaming 系统需要设置间隔使得数据汇总到一定的量后再一并操作,这个间隔就是批处理间隔。批处理间隔是Spark Streaming的核心概念和关键参数,它决定了Spark Streaming提交作业的频率和数据处理的延迟,同时也影响着数据处理的吞吐量和性能。
和Spark基于RDD的概念很相似,Spark Streaming使用离散化流(discretized stream)作为抽象表示,叫作DStream。DStream是随时间推移而收到的数据的序列。在内部,每个时间区间收到的数据都作为RDD 存在,而DStream是由这些RDD所组成的序列(因此得名“离散化”)。DStreams可以由来自数据源的输入数据流来创建, 也可以通过在其他的DStreams上应用一些高阶操作来得到。所以简单来讲,DStream就是对RDD在实时数据处理场景的一种封装。
易整合到Spark体系
缺点
Spark Streaming是一种“微量批处理”架构, 和其他基于“一次处理一条记录”架构的系统相比, 它的延迟会相对高一些。
整体架构图
SparkStreaming架构图
Spark 1.5以前版本,用户如果要限制Receiver的数据接收速率,可以通过设置静态配制参数“spark.streaming.receiver.maxRate”的值来实现,这样虽然可以通过限制接收速率,来适配当前的处理能力,防止内存溢出,但也会引入其它问题。比如:producer数据生产高于maxRate,当前集群处理能力也高于maxRate,这就会造成资源利用率下降等问题。
为了更好的协调数据接收速率与资源处理能力,1.5版本开始Spark Streaming可以动态控制数据接收速率来适配集群数据处理能力。背压机制(即Spark Streaming Backpressure): 根据JobScheduler反馈作业的执行信息来动态调整Receiver数据接收率。
通过属性“spark.streaming.backpressure.enabled”来控制是否启用backpressure机制,默认值false,即不启用。
需求:使用netcat工具向9999端口不断的发送数据,通过SparkStreaming读取端口数据并统计不同单词出现的次数
object StreamWordCount {def main(args: Array[String]): Unit = {//初始化Spark配置信息,Streaming程序执行至少需要两个线程(采集、执行)//不能设置为localval sparkConf = new SparkConf().setMaster("local[*]").setAppName("StreamWordCount")//初始化SparkStreamingContext,程序执行入口对象//Seconds 底层调用 new Durationval ssc = new StreamingContext(sparkConf, Seconds(3))//通过监控端口创建DStream,读进来的数据为一行一行的val lineStreams = ssc.socketTextStream("hadoop101", 9999)//将每一行数据做切分,形成一个个单词val wordStreams = lineStreams.flatMap(_.split(" "))//将单词映射成元组(word,1)val wordAndOneStreams = wordStreams.map((_, 1))//将相同的单词次数做统计val wordAndCountStreams = wordAndOneStreams.reduceByKey(_+_)//打印wordAndCountStreams.print()//启动SparkStreamingContext采集器ssc.start()//默认情况下采集器不能关闭,等待采集结束之后,终止程序ssc.awaitTermination()}
}
# 开启SecureCRT
nc -lk 9999
输入内容
Discretized Stream是Spark Streaming的基础抽象,代表持续性的数据流和经过各种Spark原语操作后的结果数据流。在内部实现上,DStream是一系列连续的RDD来表示。每个RDD含有一段时间间隔内的数据。
对数据的操作也是按照RDD为单位来进行的
计算过程由Spark Engine来完成
测试过程中,可以通过使用ssc.queueStream(queueOfRDDs)来创建DStream,每一个推送到这个队列中的RDD,都会作为一个DStream处理
需求:循环创建几个RDD,将RDD放入队列。通过SparkStream创建Dstream,计算WordCount
object RDDStream {def main(args: Array[String]) {//1.初始化Spark配置信息val conf &#61; new SparkConf().setMaster("local[*]").setAppName("RDDStream")//2.初始化SparkStreaming上下文环境对象val ssc &#61; new StreamingContext(conf, Seconds(4))//3.创建RDD队列val rddQueue &#61; new mutable.Queue[RDD[Int]]()//4.创建QueueInputDStream&#xff0c;从队列中采集数据&#xff0c;获取DSval inputStream &#61; ssc.queueStream(rddQueue,oneAtATime &#61; false)//5.处理队列中的RDD数据val mappedStream &#61; inputStream.map((_,1))val reducedStream &#61; mappedStream.reduceByKey(_ &#43; _)//6.打印结果reducedStream.print()//7.启动采集器ssc.start()//8.循环创建并向RDD队列中放入RDDfor (i <- 1 to 5) {rddQueue &#43;&#61; ssc.sparkContext.makeRDD(1 to 300, 10)Thread.sleep(2000)}ssc.awaitTermination()}
}
结果展示
-------------------------------------------
Time: 1662277280000 ms
-------------------------------------------
(196,1)
(296,1)
(96,1)
(52,1)
(4,1)
(180,1)
(16,1)
(156,1)
(216,1)
(28,1)
...-------------------------------------------
Time: 1662277284000 ms
-------------------------------------------
(196,2)
(296,2)
(96,2)
(52,2)
(4,2)
(180,2)
(16,2)
(156,2)
(216,2)
(28,2)
...-------------------------------------------
Time: 1662277288000 ms
-------------------------------------------
(196,2)
(296,2)
(96,2)
(52,2)
(4,2)
(180,2)
(16,2)
(156,2)
(216,2)
(28,2)
...-------------------------------------------
Time: 1662277292000 ms
-------------------------------------------
需要继承Receiver&#xff0c;并实现onStart、onStop方法来自定义数据源采集
需求&#xff1a;自定义数据源&#xff0c;实现监控某个端口号&#xff0c;获取该端口号内容
//泛型表示读取数据的类型
class CustomerReceiver(host: String, port: Int) extends Receiver[String](StorageLevel.MEMORY_ONLY) {//最初启动的时候&#xff0c;调用该方法&#xff0c;作用为&#xff1a;读数据并将数据发送给Sparkoverride def onStart(): Unit &#61; {new Thread("Socket Receiver") {override def run() {receive()}}.start()}//读数据并将数据发送给Spark&#xff0c;真正处理接收数据的逻辑def receive(): Unit &#61; {//创建一个Socket连接var socket: Socket &#61; new Socket(host, port)//定义一个变量&#xff0c;用来接收端口传过来的数据var input: String &#61; null//创建一个BufferedReader用于按行读取端口传来的数据//InputStreamReader 将字节流转换为字符流val reader &#61; new BufferedReader(new InputStreamReader(socket.getInputStream, StandardCharsets.UTF_8))//读取一行数据input &#61; reader.readLine()//当receiver没有关闭并且输入数据不为空&#xff0c;则循环发送数据给Sparkwhile (!isStopped() && input !&#61; null) {store(input)input &#61; reader.readLine()}//跳出循环则关闭资源reader.close()socket.close()//重启任务restart("restart")}override def onStop(): Unit &#61; {if(socket !&#61; null ){socket.close()socket &#61; null}}
}
object FileStream {def main(args: Array[String]): Unit &#61; {//1.初始化Spark配置信息
val sparkConf &#61; new SparkConf().setMaster("local[*]")
.setAppName("StreamWordCount")//2.初始化SparkStreamingContextval ssc &#61; new StreamingContext(sparkConf, Seconds(5))//3.创建自定义数据源创建receiver的Streaming
val lineStream &#61; ssc.receiverStream(new CustomerReceiver("hadoop101", 9999))//4.将每一行数据做切分&#xff0c;形成一个个单词val wordStream &#61; lineStream.flatMap(_.split("\t"))//5.将单词映射成元组&#xff08;word,1&#xff09;val wordAndOneStream &#61; wordStream.map((_, 1))//6.将相同的单词次数做统计val wordAndCountStream &#61; wordAndOneStream.reduceByKey(_ &#43; _)//7.打印wordAndCountStream.print()//8.启动SparkStreamingContextssc.start()ssc.awaitTermination()}
}
ReceiverAPI&#xff1a;需要一个专门的Executor去接收数据&#xff0c;然后发送给其他的Executor做计算。存在的问题&#xff0c;接收数据的Executor和计算的Executor速度会有所不同&#xff0c;特别在接收数据的Executor速度大于计算的Executor速度&#xff0c;会导致计算数据的节点内存溢出。早期版本中提供此方式&#xff0c;当前版本不适用。默认情况下&#xff0c;offset维护在zk中。
DirectAPI&#xff1a;是由计算的Executor来主动消费Kafka的数据&#xff0c;速度由自身控制。默认情况下&#xff0c;offseet维护在checkpoint检查点&#xff0c;需要改变SparkStreamingContext的创建方式&#xff1b;也可以手动指定offset维护位置&#xff0c;为了保证数据的精准一致性&#xff0c;一般维护在有事务的存储上。
通过SparkStreaming从Kafka读取数据&#xff0c;并将读取过来的数据做简单计算&#xff0c;最终打印到控制台。
// 通过ReciverAPI连接kafka数据源&#xff0c;获取数据
object Spark04_ReceiverAPI {def main(args: Array[String]): Unit &#61; {//1.创建SparkConfval sparkConf: SparkConf &#61; new SparkConf().setAppName("Spark04_ReceiverAPI").setMaster("local[*]")//2.创建StreamingContext&#xff0c;第二个参数为采集周期val ssc &#61; new StreamingContext(sparkConf, Seconds(3))//3.使用ReceiverAPI读取Kafka数据创建DStreamval kafkaDStream: ReceiverInputDStream[(String, String)] &#61; KafkaUtils.createStream(//Streaming Contextssc,//Zookeeper地址"hadoop101:2181,hadoop102:2181,hadoop103:2181",//groupid&#xff0c;消费者组"bigdata",//k表示主题的名字&#xff0c;v表示主题的分区数Map("mybak" -> 2))//4.计算WordCount并打印 new KafkaProducer[String,String]().send(new ProducerRecord[]())//获取kafka中的消息&#xff0c;只需要v的部分val lineDStream: DStream[String] &#61; kafkaDStream.map(_._2)val word: DStream[String] &#61; lineDStream.flatMap(_.split(" "))val wordToOneDStream: DStream[(String, Int)] &#61; word.map((_, 1))val wordToCountDStream: DStream[(String, Int)] &#61; wordToOneDStream.reduceByKey(_ &#43; _)wordToCountDStream.print()//5.开启任务ssc.start()ssc.awaitTermination()}
}
# 启动Zookeeper
zk start
# 启动kafka
kafka start
# 查看所有主题
bin/kafka-topics.sh --list --bootstrap-server hadoop101:9092
# 创建一个和上述代码中相同的主题
bin/kafka-topics.sh --create --bootstrap-server hadoop101:9092 --topic bigdata --partitions 2 --replication-factor 2
# 生产者向topic发送消息
bin/kafka-console-producer.sh --broker-list hadoop101:9092 --topic bigdata
# 启动上述代码&#xff0c;查看是否可以连接到kafka&#xff0c;并且接收到生产者传来的消息
# 发送的内容
hello word
hello scala
hello java
0-8 Receive模式&#xff0c;offset维护在zk中&#xff0c;程序停止后&#xff0c;继续生产数据&#xff0c;再次启动程序&#xff0c;仍然可以继续消费。可通过get /consumers/bigdata/offsets/主题名/分区号 查看&#xff0c;注意会存在一些延迟