热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

第8章SparkStreaming进阶与案例实战

第8章SparkStreaming进阶与案例实战本章节将讲解SparkStreaming如何处理带状态的数据,通过案例让大家知道SparkStreaming如何写数据到MySQL,
第8章 Spark Streaming进阶与案例实战

本章节将讲解Spark Streaming如何处理带状态的数据,通过案例让大家知道Spark Streaming如何写数据到MySQL,Spark Streaming如何整合Spark SQL进行操作

[TOC]

8-1 -课程目录

  • 带状态的算子:UpdateStateByKey
  • 实战:计算到目前为止累计出现的单词个数写入到Mysqlzhon
  • 基于window的统计
  • 实战:黑名单过滤
  • 实战:Spark Streaming整合Spark SQL实战

8-2 -实战之updateStateByKey算子的使用

带状态的算子:UpdateStateByKey

The updateStateByKey operation allows you to maintain arbitrary state while continuously updating it with new information. To use this, you will have to do two steps.

  1. Define the state – The state can be an arbitrary data type.
  2. Define the state update function – Specify with a function how to update the state using the previous state and the new values from an input stream.

updateStateByKey算子
需求:统计到目前为止累计出现的单词的个数(需要保持住以前的状态)

package com.imooc.spark
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* 使用Spark Streaming完成有状态统计
* 累计求和功能
*/
object StatefulWordCount {
def main(args: Array[String]): Unit = {
val sparkCOnf= new SparkConf().setAppName("StatefulWordCount").setMaster("local[2]")
val ssc = new StreamingContext(sparkConf, Seconds(5))
// 如果使用了stateful的算子,必须设置checkpoint
// 在生产环境中,建议大家把checkpoint设置到HDFS的某个文件夹中
ssc.checkpoint(".")
val lines = ssc.socketTextStream("hadoop000", 6789)
val result = lines.flatMap(_.split(" ").map((_, 1)))
val state = result.updateStateByKey[Int](updateFunction _)
state.print()
ssc.start()
ssc.awaitTermination()
}
/**
* 把当前的数据去更新已有的或者是老的数据
*
* @param currentValues
* @param preValues
* @return
*/
def updateFunction(currentValues: Seq[Int], preValues: Option[Int]): Option[Int] = {
val current = currentValues.sum
val pre = preValues.getOrElse(0)
Some(current + pre)
}
}

开启6789端口

$ nc -lk 6789

错误

java.lang.IllegalArgumentException: requirement failed: The checkpoint directory has not been set. Please set it by StreamingContext.checkpoint().

8-3 -实战之将统计结果写入到MySQL数据库中

实战:计算到目前为止累计出现的单词个数写入到MySQL

  • 使用Spark Streaming进行统计分析
  • Spark Streaming统计结果写入到MySQL

需求:将统计结果写入到MySQL

create table wordcount(
word varchar(50) default null,
wordcount int(10) default null
);

安装mysql8.0.11,修改密码,设置时区

java.sql.SQLException: The server time zone value 'Öйú±ê׼ʱ¼ä' is unrecognized or represents more than one time zone. You must configure either the server or JDBC driver (via the serverTimezone configuration property) to use a more specifc time zone value if you want to utilize time zone support.

set global time_zOne='+8:00';

通过该sql将统计结果写入到MySQL
insert into wordcount(word, wordcount) values(‘” + record._1 + “‘,” + record._2 + “)”

存在的问题:

  1. 对于已有的数据做更新,而是所有的数据均为insert
    改进思路:
    a) 在插入数据前先判断单词是否存在,如果存在就update,不存在则insert
    b) 工作中:HBase/Redis

  2. 每个rdd的partition创建connection,建议大家改成连接池

8-4 -实战之窗口函数的使用

  • 基于window的统计
  • http://spark.apache.org/docs/latest/streaming-programming-guide.html#window-operations

《第8章 Spark Streaming进阶与案例实战》 基于window的统计

window:定时的进行一个时间段内的数据处理

  • window length : 窗口的长度
  • sliding interval: 窗口的间隔

这2个参数和我们的batch size有关系:倍数

每隔多久计算某个范围内的数据:每隔10秒计算前10分钟的wc

==> 每隔sliding interval统计前window length的值

《第8章 Spark Streaming进阶与案例实战》 Spark Streaming滑动窗口

// Reduce last 30 seconds of data, every 10 seconds
val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(30), Seconds(10))

8-5 -实战之黑名单过滤

实战:黑名单过滤

  • transform算子的使用
  • Spark Streaming 整合RDD进行操作

需求: 黑名单过滤

访问日志 ==> DStream
20180808,zs
20180808,ls
20180808,ww
==> (zs: 20180808,zs)(ls: 20180808,ls)(ww: 20180808,ww)

黑名单列表 ==> RDD
zs
ls
==>(zs: true)(ls: true)

==> 20180808,ww

leftjoin
(zs: [<20180808,zs>, ]) x
(ls: [<20180808,ls>, ]) x
(ww: [<20180808,ww>, ]) ==> tuple 1

package com.imooc.spark
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* transform:黑名单过滤
*
*/
object TransformApp {
def main(args: Array[String]): Unit = {
val sparkCOnf= new SparkConf().setMaster("local[2]").setAppName("TransformApp")
val ssc = new StreamingContext(sparkConf, Seconds(5))
/**
* 构建黑名单
*/
val blacks = List("zs", "ls")
val blacksRDD = ssc.sparkContext.parallelize(blacks).map(x => (x, true))
val lines = ssc.socketTextStream("192.168.95.131", 6789)
val clicklog = lines.map(x => (x.split(",")(1), x)).transform(rdd => {
rdd.leftOuterJoin(blacksRDD)
.filter(x => x._2._2.getOrElse(false) != true)
.map(x => x._2._1)
//getOrElse(false) != true 留下等于false的,等于true的过滤
})
clicklog.print()
ssc.start()
ssc.awaitTermination()
}
}

8-6 -实战之Spark Streaming整合Spark SQL操作

package com.imooc.spark
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.{Seconds, StreamingContext, Time}
/**
* Spark Streaming 完成Spark SQL完成词频统计操作
*/
object SqlNetworkWordCount {
def main(args: Array[String]): Unit = {
val sparkCOnf= new SparkConf().setMaster("local[2]").setAppName("TransformApp")
val ssc = new StreamingContext(sparkConf, Seconds(5))
val lines = ssc.socketTextStream("192.168.95.131", 6789)
val words = lines.flatMap(_.split(" "))
// Convert RDDs of the words DStream to DataFrame and run SQL query
words.foreachRDD { (rdd: RDD[String], time: Time) =>
// Get the singleton instance of SparkSession
val spark = SparkSessionSingleton.getInstance(rdd.sparkContext.getConf)
import spark.implicits._
// Convert RDD[String] to RDD[case class] to DataFrame
val wordsDataFrame = rdd.map(w => Record(w)).toDF()
// Creates a temporary view using the DataFrame
wordsDataFrame.createOrReplaceTempView("words")
// Do word count on table using SQL and print it
val wordCountsDataFrame = spark.sql("select word, count(*) as total from words group by word")
println(s"========= $time =========")
wordCountsDataFrame.show()
}
ssc.start()
ssc.awaitTermination()
}
/** Case class for converting RDD to DataFrame */
case class Record(word: String)
/** Lazily instantiated singleton instance of SparkSession */
object SparkSessionSingleton {
@transient private var instance: SparkSession = _
def getInstance(sparkConf: SparkConf): SparkSession = {
if (instance == null) {
instance = SparkSession
.builder
.config(sparkConf)
.getOrCreate()
}
instance
}
}
}

Boy-201806015


推荐阅读
  • 本文详细解析了Python中的os和sys模块,介绍了它们的功能、常用方法及其在实际编程中的应用。 ... [详细]
  • 使用Vultr云服务器和Namesilo域名搭建个人网站
    本文详细介绍了如何通过Vultr云服务器和Namesilo域名搭建一个功能齐全的个人网站,包括购买、配置服务器以及绑定域名的具体步骤。文章还提供了详细的命令行操作指南,帮助读者顺利完成建站过程。 ... [详细]
  • 本文探讨了 Objective-C 中的一些重要语法特性,包括 goto 语句、块(block)的使用、访问修饰符以及属性管理等。通过实例代码和详细解释,帮助开发者更好地理解和应用这些特性。 ... [详细]
  • 本文探讨了如何在给定整数N的情况下,找到两个不同的整数a和b,使得它们的和最大,并且满足特定的数学条件。 ... [详细]
  • 从 .NET 转 Java 的自学之路:IO 流基础篇
    本文详细介绍了 Java 中的 IO 流,包括字节流和字符流的基本概念及其操作方式。探讨了如何处理不同类型的文件数据,并结合编码机制确保字符数据的正确读写。同时,文中还涵盖了装饰设计模式的应用,以及多种常见的 IO 操作实例。 ... [详细]
  • 本实验主要探讨了二叉排序树(BST)的基本操作,包括创建、查找和删除节点。通过具体实例和代码实现,详细介绍了如何使用递归和非递归方法进行关键字查找,并展示了删除特定节点后的树结构变化。 ... [详细]
  • andr ... [详细]
  • Hadoop入门与核心组件详解
    本文详细介绍了Hadoop的基础知识及其核心组件,包括HDFS、MapReduce和YARN。通过本文,读者可以全面了解Hadoop的生态系统及应用场景。 ... [详细]
  • 本教程涵盖OpenGL基础操作及直线光栅化技术,包括点的绘制、简单图形绘制、直线绘制以及DDA和中点画线算法。通过逐步实践,帮助读者掌握OpenGL的基本使用方法。 ... [详细]
  • Scala 实现 UTF-8 编码属性文件读取与克隆
    本文介绍如何使用 Scala 以 UTF-8 编码方式读取属性文件,并实现属性文件的克隆功能。通过这种方式,可以确保配置文件在多线程环境下的一致性和高效性。 ... [详细]
  • 解读MySQL查询执行计划的详细指南
    本文旨在帮助开发者和数据库管理员深入了解如何解读MySQL查询执行计划。通过详细的解析,您将掌握优化查询性能的关键技巧,了解各种访问类型和额外信息的含义。 ... [详细]
  • 扫描线三巨头 hdu1928hdu 1255  hdu 1542 [POJ 1151]
    学习链接:http:blog.csdn.netlwt36articledetails48908031学习扫描线主要学习的是一种扫描的思想,后期可以求解很 ... [详细]
  • 本文探讨了如何优化和正确配置Kafka Streams应用程序以确保准确的状态存储查询。通过调整配置参数和代码逻辑,可以有效解决数据不一致的问题。 ... [详细]
  • MySQL缓存机制深度解析
    本文详细探讨了MySQL的缓存机制,包括主从复制、读写分离以及缓存同步策略等内容。通过理解这些概念和技术,读者可以更好地优化数据库性能。 ... [详细]
  • 在维护公司项目时,发现按下手机的某个物理按键后会激活相应的服务,并在屏幕上模拟点击特定坐标点。本文详细介绍了如何使用ADB Shell Input命令来模拟各种输入事件,包括滑动、按键和点击等。 ... [详细]
author-avatar
鲁有军_644
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有