文章地址:http://www.haha174.top/article/details/253584
项目源码:https://github.com/haha174/spark.git
开启spark 从入门到放弃/笑哭。下面不多说来写一个hello world 压压惊。
之前搭建集群用的是 spark 2.2 hadoop 2.9
所以开发的需要引入如下的依赖
<properties><project.build.sourceEncoding>UTF-8project.build.sourceEncoding><spark.version>2.2.1spark.version><java.version>1.8java.version><hadoop.version>2.9.0hadoop.version>properties><dependencies><dependency><groupId>junitgroupId><artifactId>junitartifactId><version>4.12version><scope>testscope>dependency><dependency><groupId>org.apache.sparkgroupId><artifactId>spark-core_2.10artifactId><version>${spark.version}version>dependency><dependency><groupId>org.apache.sparkgroupId><artifactId>spark-sql_2.10artifactId><version>${spark.version}version>dependency><dependency><groupId>org.apache.sparkgroupId><artifactId>spark-hive_2.10artifactId><version>${spark.version}version>dependency><dependency><groupId>org.apache.sparkgroupId><artifactId>spark-streaming_2.10artifactId><version>${spark.version}version>dependency><dependency><groupId>org.apache.hadoopgroupId><artifactId>hadoop-clientartifactId><version>${hadoop.version}version>dependency><dependency><groupId>org.apache.sparkgroupId><artifactId>spark-streaming-kafka_2.11artifactId><version>1.6.2version>dependency>dependencies>
/*** 第一步 &#xff0c;创建SparkConf* setMaster 设置 集群 master的url 如果设置为local 表示在本地运行*/SparkConf conf&#61;new SparkConf().setAppName("WorldCountLocal").setMaster("local");
/**
* 第二步 创建SparkContext 对象
* 在spark 中SparkContext 是spark 所有功能的入口 无论使用的是java scala 甚至py 编写都必须有一个SparkContext
* 它的主要作用包括初始化spark 应用程序所需要的一些核心组件&#xff0c;包括调度器&#xff08;DAGSchedule,taskScheduler&#xff09;,他还会去spark master 节点上去注册等等
* 但是呢&#xff0c;在spark中编写不同类型的spark 应用程序&#xff0c;使用的SparkContext
* 如果使用scala 使用 使用原生的SparkContext
* 如果使用java 那么就是用JavaSparkContext
* 如果使用Spark Sql 程序 那么就是 SQLContext,HiveContext
* 如果开发Spark Streaming 程序 那么就是它独有的SparkContext
* 以此类推
*/
JavaSparkContext sc&#61;new JavaSparkContext(conf);
/**
* 第三步&#xff1a; 要针对输入源&#xff08;hdfs文件,本地文件,等等&#xff09;创建一个初始的RDD
* 输入源中的数据会被打散&#xff0c;分配到RDD的每个partition 中从而形成一个初始的分布式数据集
* 本次测试 所以针对本地文件
* SparkContext 中用于根据文件 类型的输入源创建RDD 的方法&#xff0c;叫做textFile()
* 在我们这里呢 RDD 中 有元素这种概念 如果是 hdfs 或者本地文件呢 创建RDD 每一个文件就相当于文件里面的一行
*/
JavaRDD lines&#61;sc.textFile("C:\\Users\\haha174\\Desktop\\data\\world-count.txt");
JavaRDD<String> words &#61; lines.flatMap(new FlatMapFunction<String, String>() {
public Iterator<String> call(String s) throws Exception {
return Arrays.asList(s.split("-")) .iterator();
}
});
JavaPairRDD pairs &#61; words.mapToPair(new PairFunction() {private static final long serialVersionUID &#61; 1L;public Tuple2 call(String word) throws Exception {return new Tuple2(word, 1);}});
/*** public interface Function2 extends Serializable {*R call(T1 var1, T2 var2) throws Exception;*}* 第三个参数表示返回类型*/
JavaPairRDD wordCounts &#61; pairs.reduceByKey(new Function2() {private static final long serialVersionUID &#61; 1L;public Integer call(Integer v1, Integer v2) throws Exception {return v1 &#43; v2;}});
wordCounts.foreach(new VoidFunction>() {private static final long serialVersionUID &#61; 1L;public void call(Tuple2 wordCount) throws Exception {System.out.println(wordCount._1 &#43; " appeared " &#43; wordCount._2 &#43; " times.");}});sc.close();
}