作者:结婚吧结婚吧结婚吧_813 | 来源:互联网 | 2023-09-07 16:31
代码packagecom.badou.sqlimportorg.apache.spark.SparkConfimportorg.apache.spark.SparkContex
代码
package com.badou.sqlimport org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Time, Seconds, StreamingContext}
import org.apache.spark.util.IntParam
import org.apache.spark.sql.SQLContext
import org.apache.spark.storage.StorageLevelcase class Record(word: String)object SQLContextSingleton {@transient private var instance: SQLContext = _def getInstance(sparkContext: SparkContext): SQLContext = {if (instance == null) {instance = new SQLContext(sparkContext)}instance}
}object sqlAndStreamingWC {def main(args: Array[String]) {if (args.length <2) {System.err.println("Usage: NetworkWordCount ")System.exit(1)}val sparkConf &#61; new SparkConf().setMaster("local[2]").setAppName("sqlAndStreamingWC")val sc &#61; new SparkContext(sparkConf)val ssc &#61; new StreamingContext(sc, Seconds(30))val lines &#61; ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)val words &#61; lines.flatMap(_.split(" "))words.foreachRDD((rdd: RDD[String], time: Time) &#61;> {val sqlContext &#61; SQLContextSingleton.getInstance(rdd.sparkContext)import sqlContext.implicits._val wordsDataFrame &#61; rdd.map(w &#61;> Record(w)).toDF()wordsDataFrame.registerTempTable("words")val wordCountsDataFrame &#61;sqlContext.sql("select word, count(*) as total from words group by word")println(s"&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61; $time &#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;")wordCountsDataFrame.show()})ssc.start()ssc.awaitTermination()}
}
测试&#xff1a;
结果&#xff1a;