1 package com.sql.it
2 import org.apache.spark.sql.SQLContext
3 import org.apache.spark.storage.StorageLevel
4 import org.apache.spark.streaming.kafka.KafkaUtils
5 import org.apache.spark.streaming.{Seconds, StreamingContext}
6 import org.apache.spark.{SparkConf, SparkContext}
7 object StreamingSQL {
8 def main(args: Array[String]): Unit = {
9 val cOnf= new SparkConf()
10 .setAppName("StreamingWindowOfKafka22")
11 .setMaster("local[*]")
12 val sc = SparkContext.getOrCreate(conf)
13 val ssc = new StreamingContext(sc, Seconds(5))
14 // 当调用updateStateByKey函数API的时候,必须给定checkpoint dir
15 // 路径对应的文件夹不能存在
16 ssc.checkpoint("hdfs://linux-hadoop01.ibeifeng.com:8020/beifeng/spark/streaming/9421151351")
17
18 val kafkaParams = Map(
19 "group.id" -> "streaming-kafka-78912151",
20 "zookeeper.connect" -> "linux-hadoop01.ibeifeng.com:2181/kafka",
21 "auto.offset.reset" -> "smallest"
22 )
23 val topics = Map("beifeng" -> 4) // topics中value是读取数据的线程数量,所以必须大于等于1
24 val dstream = KafkaUtils.createStream[String, String, kafka.serializer.StringDecoder, kafka.serializer.StringDecoder](
25 ssc, // 给定SparkStreaming上下文
26 kafkaParams, // 给定连接kafka的参数信息 ===> 通过Kafka HighLevelConsumerAPI连接
27 topics, // 给定读取对应topic的名称以及读取数据的线程数量
28 StorageLevel.MEMORY_AND_DISK_2 // 指定数据接收器接收到kafka的数据后保存的存储级别
29 ).map(_._2)
30
31 /**
32 * transform:将DStream的操作转换为RDD的操作,调用该api最终只需要返回一个新的RDD即可
33 */
34 dstream.transform(rdd => {
35 // 使用sql统计wordcoount
36 val sqlCOntext= SQLContextSingelton.getSQLContext(rdd.sparkContext)
37 import sqlContext.implicits._
38 val procedRDD = rdd.filter(_.nonEmpty).flatMap(_.split(" ").map((_, 1)))
39 procedRDD.toDF("word", "c").registerTempTable("tb_word")
40 val resultRDD = sqlContext.sql("select word, count(c) as vc from tb_word group by word").map(row => {
41 val word = row.getAs[String]("word")
42 val count = row.getAs[Long]("vc")
43 (word, count)
44 })
45
46 resultRDD
47 }).print()
48
49 // 启动开始处理
50 ssc.start()
51 ssc.awaitTermination() // 等等结束,监控一个线程的中断操作
52 }
53 }
54
55 object SQLContextSingelton {
56 @transient private var instance: SQLCOntext= _
57
58 def getSQLContext(sc: SparkContext): SQLCOntext= {
59 if (instance == null) {
60 synchronized[SQLContext] {
61 if (instance == null) {
62 instance = new SQLContext(sc)
63 }
64 instance
65 }
66 }
67 instance
68 }
69 }