1 高级功能
- 带状态的算子: UpdateStatByKey
package com.myspark.comimport org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}object StatefulWordCount {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setAppName("StatefulWordCount").setMaster("local[2]")val ssc = new StreamingContext(sparkConf, Seconds(5))ssc.checkpoint(".")val lines = ssc.socketTextStream("localhost", 6789)val result = lines.flatMap(_.split(" ")).map((_, 1))val state = result.updateStateByKey[Int](updateFunction _)state.print()ssc.start()ssc.awaitTermination()}def updateFunction(currentValues: Seq[Int], preValues: Option[Int]): Option[Int] = {val current = currentValues.sumval pre = preValues.getOrElse(0)Some(current + pre)}}
2 实战
计算到目前为止出现的单词个数写入到 MySQL
2.1 创建表
mysql> use spark_test;
Database changed
mysql> create table wordcount(
word varchar(50) default null,
wordcount int(10) default null
);
Query OK, 0 rows affected (0.04 sec)mysql>
2.2 源码
修改 pom 文件
<dependency><groupId>mysqlgroupId><artifactId>mysql-connector-javaartifactId><version>5.1.38version>dependency>
package com.myspark.comimport java.sql.DriverManagerimport org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
object ForeachRDDApp {def main(args: Array[String]): Unit &#61; {val sparkConf &#61; new SparkConf().setAppName("ForeachRDDApp").setMaster("local[2]")val ssc &#61; new StreamingContext(sparkConf, Seconds(5))val lines &#61; ssc.socketTextStream("localhost", 6789)val result &#61; lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ &#43; _)result.print()result.foreachRDD(rdd &#61;> {rdd.foreachPartition(partitionOfRecords &#61;> {val connection &#61; createConnection()partitionOfRecords.foreach(record &#61;> {val sql &#61; "insert into wordcount(word,wordcount) values(&#39;" &#43; record._1 &#43; "&#39;," &#43; record._2 &#43; ")"connection.createStatement().execute(sql)})connection.close()})})ssc.start()ssc.awaitTermination()}def createConnection() &#61; {Class.forName("com.mysql.jdbc.Driver")DriverManager.getConnection("jdbc:mysql://localhost:3306/spark_test", "root", "root")}}
2.3 运行
3 窗口函数
- 定时的进行一个时间段内的数据处理
window length
: 窗口的长度sliding interval
: 窗口的间隔- 这2个参数和 batch size 有关系&#xff1a; 倍数
- 每隔多久计算某个范围内的数据&#xff08;每隔 sliding interval 统计前 window length的值&#xff09;
4 黑名单过滤 (
transform
)
4.1 例子
- 访问日志&#xff08;&#61;>
DStream
&#xff09;
001,aa
002,bb
003,cc
&#61;>(aa:001,aa) (bb:002,bb) (cc:003,cc)
2. 黑名单列表&#xff08;&#61;>RDD
&#xff09;
aa
cc
&#61;> (aa:true) (cc:true)
3. leftjoin
(aa:[<001,aa>,])
(bb:[<002,bb>,])
(cc:[<003,cc>,])
4.2 源码
package com.myspark.comimport org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
object TransformApp {def main(args: Array[String]): Unit &#61; {val sparkConf &#61; new SparkConf().setMaster("local[2]").setAppName("TransformApp")val ssc &#61; new StreamingContext(sparkConf, Seconds(5))val blacks &#61; List("aa", "cc")val blacksRDD &#61; ssc.sparkContext.parallelize(blacks).map(x &#61;> (x, true))val lines &#61; ssc.socketTextStream("localhost", 6789)val clicklog &#61; lines.map(x &#61;> (x.split(",")(1), x)).transform(rdd &#61;> {rdd.leftOuterJoin(blacksRDD).filter(x &#61;> x._2._2.getOrElse(false) !&#61; true).map(x &#61;> x._2._1)})clicklog.print()ssc.start()ssc.awaitTermination()}
}
4.3 结果