为什么80%的码农都做不了架构师?>>>
Spark单机运行时,都是跑Main方法,那如何集成到Spring Boot实现http调用呢?
接下实现一个从一个文本里排序出频次最高的前10名
项目环境:
JDK:1.8;
Spark:2.2.0
项目搭建:
pom.xml 依赖:
org.springframework.bootspring-boot-starter-parent1.5.3.RELEASE UTF-8UTF-81.82.112.2.0org.springframework.bootspring-boot-starter-aoporg.springframework.bootspring-boot-starter-weborg.apache.sparkspark-core_${scala.version}${spark.version}org.slf4jslf4j-log4j12log4jlog4jprovidedorg.apache.sparkspark-streaming_${scala.version}${spark.version}providedorg.apache.sparkspark-sql_${scala.version}${spark.version}org.springframework.bootspring-boot-starter-testtestorg.springframework.bootspring-boot-configuration-processortrue
配置类:
@Configuration
@ConfigurationProperties(prefix = "spark")
public class SparkContextBean {private String sparkHome = ".";private String appName = "sparkTest";private String master = "local";@Bean@ConditionalOnMissingBean(SparkConf.class)public SparkConf sparkConf() throws Exception {SparkConf conf = new SparkConf().setAppName(appName).setMaster(master);return conf;}@Bean@ConditionalOnMissingBean(JavaSparkContext.class)public JavaSparkContext javaSparkContext() throws Exception {return new JavaSparkContext(sparkConf());}public String getSparkHome() {return sparkHome;}public void setSparkHome(String sparkHome) {this.sparkHome = sparkHome;}public String getAppName() {return appName;}public void setAppName(String appName) {this.appName = appName;}public String getMaster() {return master;}public void setMaster(String master) {this.master = master;}
}
实现类:
@Service
public class SparkTestService {private static final Logger logger &#61; LoggerFactory.getLogger(SparkTestService.class);private static final Pattern SPACE &#61; Pattern.compile(" ");&#64;Autowiredprivate JavaSparkContext sc;public Map calculateTopTen() {Map result &#61; new HashMap();JavaRDD lines &#61; sc.textFile("src/test/java/test.txt").cache();System.out.println();System.out.println("-------------------------------------------------------");System.out.println(lines.count());JavaRDD words &#61; lines.flatMap(str -> Arrays.asList(SPACE.split(str)).iterator());JavaPairRDD ones &#61; words.mapToPair(str -> new Tuple2(str, 1));JavaPairRDD counts &#61; ones.reduceByKey((Integer i1, Integer i2) -> (i1 &#43; i2));JavaPairRDD temp &#61; counts.mapToPair(tuple -> new Tuple2(tuple._2, tuple._1));JavaPairRDD sorted &#61; temp.sortByKey(false).mapToPair(tuple -> new Tuple2(tuple._2, tuple._1));System.out.println();System.out.println("-------------------------------------------------------");System.out.println(sorted.count());//List> output &#61; sorted.collect();//List> output &#61; sorted.take(10);List> output &#61; sorted.top(10);for (Tuple2 tuple : output) {result.put(tuple._1(), tuple._2());}return result;}/*** 练习demo&#xff0c;熟悉其中API*/public void sparkExerciseDemo() {List data &#61; Lists.newArrayList(1,2,3,4,5,6);JavaRDD rdd01 &#61; sc.parallelize(data);rdd01 &#61; rdd01.map(num ->{return num * num; });//data map :1,4,9,16,25,36logger.info("data map :{}",Joiner.on(",").skipNulls().join(rdd01.collect()).toString());rdd01 &#61; rdd01.filter(x -> x <6);//data filter :1,4logger.info("data filter :{}",Joiner.on(",").skipNulls().join(rdd01.collect()).toString());rdd01 &#61; rdd01.flatMap( x ->{Integer[] test &#61; {x,x&#43;1,x&#43;2};return Arrays.asList(test).iterator();});//flatMap :1,2,3,4,5,6logger.info("flatMap :{}",Joiner.on(",").skipNulls().join(rdd01.collect()).toString());JavaRDD unionRdd &#61; sc.parallelize(data);rdd01 &#61; rdd01.union(unionRdd);//union :1,2,3,4,5,6,1,2,3,4,5,6logger.info("union :{}",Joiner.on(",").skipNulls().join(rdd01.collect()).toString());List result &#61; Lists.newArrayList();result.add(rdd01.reduce((Integer v1,Integer v2) -> {return v1&#43;v2;}));//reduce :42logger.info("reduce :{}",Joiner.on(",").skipNulls().join(result).toString());result.forEach(System.out::print);JavaPairRDD> groupRdd &#61; rdd01.groupBy(x -> {logger.info("&#61;&#61;&#61;&#61;&#61;&#61;grouby&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#xff1a;{}",x);if (x > 10) return 0;else return 1;});List>> resul &#61; groupRdd.collect();//group by key:1 value:1,2,3,4,5,6,1,2,3,4,5,6resul.forEach(x -> {logger.info("group by key:{} value:{}",x._1,Joiner.on(",").skipNulls().join(x._2).toString());});}/*** spark streaming 练习*/public void sparkStreaming() throws InterruptedException {JavaStreamingContext jsc &#61; new JavaStreamingContext(sc,Durations.seconds(10));//批间隔时间JavaReceiverInputDStream lines &#61; jsc.receiverStream(new CustomReceiver(StorageLevel.MEMORY_AND_DISK_2()));JavaDStream count &#61; lines.count();count &#61; count.map(x -> {logger.info("这次批一共多少条数据&#xff1a;{}",x);return x; });count.print();jsc.start();jsc.awaitTermination();jsc.stop();}}/*** 自定义接收streaming类*/
public class CustomReceiver extends Receiver{private static Logger logger &#61; LoggerFactory.getLogger(CustomReceiver.class);/*** * &#64;author hz15041240 * &#64;date 2018年1月18日 下午4:37:22* &#64;version */ private static final long serialVersionUID &#61; 5817531198342629801L;public CustomReceiver(StorageLevel storageLevel) {super(storageLevel);}&#64;Overridepublic void onStart() {new Thread(this::doStart).start();logger.info("开始启动Receiver...");//doStart();}public void doStart() {while(!isStopped()) {int value &#61; RandomUtils.nextInt(100);if(value <20) {try {Thread.sleep(1000);}catch (Exception e) {logger.error("sleep exception",e);restart("sleep exception", e);}}store(String.valueOf(value));}}&#64;Overridepublic void onStop() {logger.info("即将停止Receiver...");}}&#64;RestController
public class DemoController {&#64;Autowiredprivate SparkTestService sparkTestService;&#64;RequestMapping("/demo/top10")public Map calculateTopTen() {return sparkTestService.calculateTopTen();}&#64;RequestMapping("/demo/exercise")public void exercise() {sparkTestService.sparkExerciseDemo();}&#64;RequestMapping("/demo/stream")public void streamingDemo() throws InterruptedException {sparkTestService.sparkStreaming();}
}
application.yml&#xff1a;
server:port: 8054spark: spark-home: .app-name: sparkTestmaster: local[4]
在项目的 src/test/java 目录下新建一个test.txt文件&#xff0c;立面随便一堆随机的字符就可以了。
启动项目&#xff0c;访问&#xff1a;http://localhost:8054/demo/top10 就能得到前10频率词汇了。