原始数据=》数据特征工程(训练数据和测试数据)=》模型=》模型评估=》判断模型是否合格=》模型应用
加入依赖
<dependency><groupId>org.apache.spark</groupId><artifactId>spark-mllib_2.11</artifactId><version>2.4.5</version></dependency>
package com.mllib
import org.apache.spark.ml.feature.LabeledPoint
import org.apache.spark.ml.linalg
import org.apache.spark.ml.linalg.Vectorsobject Demo1_Vector {def main(args: Array[String]): Unit &#61; {/*** 稠密向量* 主要用于保存数据特征&#xff0c;*///代表一行数据的特征val vector: linalg.Vector &#61; Vectors.dense(Array(0.0,0.1,0.2,0.3,0.0,0.4,0.0))println(vector)/*** 稀疏向量* 如果数据中0比较多&#xff0c;可以节省资源,前面表示索引为几的地方后面表示对应的值*/val vector1: linalg.Vector &#61; Vectors.sparse(11, Array(1, 2, 5, 9), Array(0.1, 0.2, 0.2, 0.2))println(vector1)/*** 可以通过to相互转换*/println("转为稀疏向量"&#43;vector.toSparse)println("转为稠密向量"&#43;vector1.toDense)/*** labeledPoint:代表一条特征训练集* 由 y 和多个x 组成* 前面为标签&#xff0c;后面为该特征向量*/val pos: LabeledPoint &#61; LabeledPoint(1.0,Vectors.dense(0.0,2.0,0.0,0.3,0.4))println(pos)}}
libsvm用于训练数据和检验数据
训练和测试数据的格式必须相同&#xff0c;测试数据中的目标值是为了计算误差用
label 目标值&#xff0c;就是说class&#xff08;属于哪一类&#xff09;&#xff0c;就是你要分类的种类&#xff0c;通常是一些整数。
index 是有顺序的索引&#xff0c;通常是连续的整数。就是指特征编号&#xff0c;必须按照升序排列
value 就是特征值&#xff0c;用来train的数据&#xff0c;通常是一堆实数组成
读取libsvm数据
package com.mllibimport org.apache.spark.sql.{DataFrame, SparkSession}object Demo2_libsvm {def main(args: Array[String]): Unit &#61; {val spark&#61;SparkSession.builder().master("local").appName("libsvm").config("spark.sql.shuffle.partitions",2).getOrCreate()val df: DataFrame &#61;spark.read.format("libsvm").load("spark/data/人体指标.txt")//默认如果列太长&#xff0c;会不展示全部&#xff0c;设置为false可以展示全部df.show(false)}}
package com.mllib
import org.apache.spark.ml.classification.{LogisticRegression, LogisticRegressionModel}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}/*** 机器学习过程* 1 特征工程* 2 将数据切分成训练集和测试集* 3 将训练集带入算法,训练模型 ( spark 迭代计算 )* 4 模型评估, 是用测试集评估模型* 5 保存模型* 6 模型使用**/
object Demo2_libsvm {def main(args: Array[String]): Unit &#61; {val spark&#61;SparkSession.builder().master("local").appName("libsvm").config("spark.sql.shuffle.partitions",2).getOrCreate()val df: DataFrame &#61;spark.read.format("libsvm").load("spark/data/人体指标.txt")//切分训练集和测试集val array: Array[Dataset[Row]] &#61; df.randomSplit(Array(0.7,0.3))//训练集val train: Dataset[Row] &#61; array(0)//测试集val test: Dataset[Row] &#61; array(1)/**** 选择算法*///逻辑回归val regression: LogisticRegression &#61; new LogisticRegression()/*** 训练模型&#xff0c;spark 迭代计算&#xff0c;收敛*/val model: LogisticRegressionModel &#61; regression.fit(train)/**** 模型评估*/val frame: DataFrame &#61; model.transform(test)/*** 计算准确率* 正确的数量/总数* prediction*/frame.show()import spark.implicits._import org.apache.spark.sql.functions._val result&#61;frame.select(sum(when($"label"&#61;&#61;&#61;$"prediction",1).otherwise(0))/ count($"label") as "p")result.show()/*** 保存模型**/model.save("spark/data/model")/*** 在其他地方加载模型,使用模型**///val model1: LogisticRegressionModel &#61; LogisticRegressionModel.load("spark/data/model")}}
模型评估的表结构
评估的结果&#xff0c;准确率
package com.shujia.spark.mllibimport org.apache.spark.ml.classification.{LogisticRegression, LogisticRegressionModel}
import org.apache.spark.ml.linalg
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}object Demo5ImageTrain {def main(args: Array[String]): Unit &#61; {val spark: SparkSession &#61; SparkSession.builder().master("local[8]").appName("mllib").config("spark.sql.shuffle.partitions", 2).getOrCreate()import spark.implicits._import org.apache.spark.sql.functions._//单独下去数据val data: DataFrame &#61; spark.read.format("libsvm").load("spark/data/image/train")data.printSchema()val newdata: DataFrame &#61; data.as[(Double, linalg.Vector)].map(kv &#61;> {val label: Double &#61; kv._1val features: linalg.Vector &#61; kv._2val array: Array[Double] &#61; features.toArray.map(i &#61;> {if (i < 0) {1.0} else if (i > 1) {1.0} else {0.0}})(label, Vectors.dense(array))}).toDF("label", "features")//切分训练集和测试集val array: Array[Dataset[Row]] &#61; newdata.randomSplit(Array(0.7, 0.3))//训练集val train: Dataset[Row] &#61; array(0)//测试集val test: Dataset[Row] &#61; array(1)//构建算法val logisticRegression: LogisticRegression &#61; new LogisticRegression()//训练模型val model: LogisticRegressionModel &#61; logisticRegression.fit(train)///测试模型val frame: DataFrame &#61; model.transform(test)val result: DataFrame &#61; frame.select(sum(when($"label" &#61;&#61;&#61; $"prediction", 1).otherwise(0)) / count($"label") as "p")result.show()model.save("spark/data/image/model")}
}
package com.mllibimport org.apache.spark.ml.classification.LogisticRegressionModel
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.sql.{DataFrame, SparkSession}object Demo6_Modelus {def main(args: Array[String]): Unit &#61; {val spark&#61;SparkSession.builder().master("local[8]").appName("mllib").config("spark.sql.shuffle.partitions",2).getOrCreate()import spark.implicits._import org.apache.spark.sql.functions._//加载模型val model: LogisticRegressionModel &#61; LogisticRegressionModel.load("spark/data/image/model")//读取图片val df&#61;spark.read.format("image").load("spark/data/image/10.jpg")val test: DataFrame &#61; df.select($"image.data").as[Array[Byte]].map(data&#61;>{val features&#61;data.map(byte&#61;>{val int: Int &#61; byte.toIntif(int&#61;&#61;0){0.0}else {1.0}})(1,Vectors.dense(features))}).toDF("label","features")//预测val frame: DataFrame &#61; model.transform(test)frame.show(false)}}
package com.mllibimport java.io.StringReaderimport org.wltea.analyzer.core.{IKSegmenter, Lexeme}import scala.collection.mutable.ListBufferobject Demo7_IK {def main(args: Array[String]): Unit &#61; {val text: String &#61; "你好&#xff0c;我是一名学生"println(fit(text))}import org.apache.spark.sql.functions._def fit(text:String):List[String]&#61;{val words: ListBuffer[String] &#61; new ListBuffer[String]val reader: StringReader &#61; new StringReader(text)val segmenter: IKSegmenter &#61; new IKSegmenter(reader,true)//取第一个var lexeme: Lexeme &#61; segmenter.next()while (lexeme!&#61;null){val word: String &#61; lexeme.getLexemeTextwords &#43;&#61; word//取下一个lexeme&#61;segmenter.next()}words.toList}}
import org.apache.spark.ml.classification.{LogisticRegression, LogisticRegressionModel, NaiveBayes, NaiveBayesModel}
import org.apache.spark.ml.feature.{HashingTF, IDF, IDFModel, Tokenizer}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}object Demo8TextClaster {def main(args: Array[String]): Unit &#61; {/*** 文本分类**/val spark: SparkSession &#61; SparkSession.builder().master("local[8]").appName("mllib").config("spark.sql.shuffle.partitions", 8).getOrCreate()import spark.implicits._import org.apache.spark.sql.functions._val data: DataFrame &#61; spark.read.format("csv").option("sep", "\t").schema("label double, text string").load("spark/data/train.txt").repartition(8)//s使用ik分词器对数据进行分词val wordsData: DataFrame &#61; data.as[(Double, String)].map(kv &#61;> {//分词(kv._1, Demo7IK.fit(kv._2))}).filter(_._2.nonEmpty) //去掉空数据.map(kv &#61;> (kv._1, kv._2.mkString(" "))) //按空格拼接数据.toDF("label", "text")/*** 将数据转换成向量&#xff0c;加上tf-idf*///英文分词器&#xff0c;默认安装空格分割val tokenizer: Tokenizer &#61; new Tokenizer().setInputCol("text").setOutputCol("words")val tokenizerDF: DataFrame &#61; tokenizer.transform(wordsData)// 加上tf&#xff0c; 转换成一个稀疏向量val hashingTF: HashingTF &#61; new HashingTF().setInputCol("words").setOutputCol("rawFeatures")val hashingTFDF: DataFrame &#61; hashingTF.transform(tokenizerDF)/*** 增加idf**/val idf: IDF &#61; new IDF().setInputCol("rawFeatures").setOutputCol("features")//训练idf模型val idfModel: IDFModel &#61; idf.fit(hashingTFDF)//计算idfval idfDF: DataFrame &#61; idfModel.transform(hashingTFDF)//将数据切分成训练集和测试集val array: Array[Dataset[Row]] &#61; idfDF.randomSplit(Array(0.7, 0.3))val train: Dataset[Row] &#61; array(0)val test: Dataset[Row] &#61; array(1)/*** 贝叶斯分类* 适用于文本分类&#xff08;垃圾邮件分类&#xff09;**/val naiveBayes: NaiveBayes &#61; new NaiveBayes()//将数据带入算法训练模型val model: NaiveBayesModel &#61; naiveBayes.fit(train)// val logisticRegression: LogisticRegression &#61; new LogisticRegression()// val model: LogisticRegressionModel &#61; logisticRegression.fit(train)//模型评估val frame: DataFrame &#61; model.transform(test)frame.cache()frame.show(1000, false)//计算准确率val result: DataFrame &#61; frame.select(sum(when($"label" &#61;&#61;&#61; $"prediction", 1).otherwise(0)) / count($"label") as "p")result.show()//保存模型idfModel.save("spark/data/text/idfmodel")model.save("spark/data/text/model")}
}