1. 启动hadoop sh start-dfs.sh
sh start-yarn.sh
2. 启动spark cd /appl/spark-1.4.0/
sbin/start-all.sh
3. 准备数据 hadoop fs -put /mk/test/kmeans_data.txt /test/
4. 编写程序 包
Java
import org.apache.spark.api.java.*;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.mllib.clustering.KMeans;
import org.apache.spark.mllib.clustering.KMeansModel;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors;
import org.apache.spark.SparkConf;/* Test:* sh start-dfs.sh* sh start-yarn.sh* cd /appl/spark-1.4.0/* sbin/start-all.sh* hadoop fs -put /mk/test/kmeans_data.txt /test/* ./bin/spark-submit /mk/test/KMeansSim.jar*/
public class KMeansSim {public static void main(String[] args) {// environment initializationSparkConf conf = new SparkConf().setAppName("K-means Example");JavaSparkContext sc = new JavaSparkContext(conf);// Load and parse data (${SPARK_HOME}/data/mllib/kmeans_data.txt)String path = "/test/kmeans_data.txt";JavaRDD data = sc.textFile(path);JavaRDD parsedData = data.map(new Function() {public Vector call(String s) {return Vectors.dense(toDoubleArray(s));}});parsedData.cache();// Cluster the data into two classes using KMeansint numClusters = 2;int numIterations = 20;KMeansModel clusters = KMeans.train(parsedData.rdd(), numClusters, numIterations);// Evaluate clustering by computing Within Set Sum of Squared Errorsdouble WSSSE = clusters.computeCost(parsedData.rdd());System.out.println("Within Set Sum of Squared Errors = " + WSSSE);// Save and load modelclusters.save(sc.sc(), "myModelPath");KMeansModel sameModel = KMeansModel.load(sc.sc(), "myModelPath");// predict testSystem.out.println("~~~predict:" + clusters.predict(Vectors.dense(toDoubleArray("1.0 2.1 3.8"))));// endingsc.stop();}// String to double[]public static double[] toDoubleArray(String s) {String[] sarray = s.split(" ");double[] values = new double[sarray.length];for (int i = 0; i }
5. 运行
./bin/spark-submit /mk/test/KMeansSim.jar