聚类算法是机器学习中的一种无监督学习算法,它在数据科学领域应用场景很广泛,比如基于用户购买行为、兴趣等来构建推荐系统。
核心思想可以理解为,在给定的数据集中(数据集中的每个元素有可被观察的n个属性),使用聚类算法将数据集划分为k个子集,并且要求每个子集内部的元素之间的差异度尽可能低,而不同子集元素的差异度尽可能高。简而言之,就是通过聚类算法处理给定的数据集,将具有相同或类似的属性(特征)的数据划分为一组,并且不同组之间的属性相差会比较大。
K-Means算法是聚类算法中应用比较广泛的一种聚类算法,比较容易理解且易于实现。
KMeans算法的基本思想是随机给定K个初始簇中心,按照最邻近原则把待分类样本点分到各个簇。然后按平均法重新计算各个簇的质心,从而确定新的簇心。一直迭代,直到簇心的移动距离小于某个给定的值或者满足已定条件。主要分为4个步骤:
示例图:
KMeans算法在做聚类分析的过程中主要有两个难题:初始聚类中心的选择和聚类个数K的选择。
Spark MLlib针对"标准"KMeans的问题,在实现自己的KMeans上主要做了如下核心优化:
1. 选择合适的初始中心点
Spark MLlib在初始中心点的选择上,有两种算法:
随机选择:依据给的种子seed,随机选择K个随机中心点
k-means||:默认的算法
val RANDOM = "random"
val K_MEANS_PARALLEL = "k-means||"
2. 计算样本属于哪一个中心点时对距离计算的优化
假设中心点是(a1,b1),要计算的点是(a2,b2),那么Spark MLlib采取的计算方法是(记为lowerBoundOfSqDist):
对比欧几里得距离(记为EuclideanDist):
可轻易证明lowerBoundOfSqDist小于或等于EuclideanDist,并且计算lowerBoundOfSqDist很方便,只需处理中心点和要计算的点的L2范数。那么在实际处理中就分两种情况:
关于fastSquaredDistance:
首先计算一个精度:
val precisionBound1 = 2.0 * EPSILON * sumSquaredNorm / (normDiff * normDiff + EPSILON)
if (precisionBound1
} else {sqDist = Vectors.sqdist(v1, v2)
}
//精度不满足要求时,则进行Vectors.sqdist(v1, v2)的处理,即原始的距离计算
基于mllib包下的KMeans相关源码涉及的类和方法(ml包下与下面略有不同,比如涉及到的fit方法):
主要看一下train和runAlgorithm的核心源码:
def train(// 数据样本data: RDD[Vector],// 聚类数量k: Int,// 最大迭代次数maxIterations: Int,// 初始化中心,支持"random"或者"k-means||"initializationMode: String,// 初始化时的随机种子seed: Long): KMeansModel = {new KMeans().setK(k).setMaxIterations(maxIterations).setInitializationMode(initializationMode).setSeed(seed).run(data)
}
/*** Implementation of K-Means algorithm.*/private def runAlgorithm( data: RDD[VectorWithNorm],instr: Option[Instrumentation]): KMeansModel = {val sc = data.sparkContextval initStartTime = System.nanoTime()val distanceMeasureInstance = DistanceMeasure.decodeFromString(this.distanceMeasure)val centers = initialModel match {case Some(kMeansCenters) =>kMeansCenters.clusterCenters.map(new VectorWithNorm(_))case None =>if (initializationMode == KMeans.RANDOM) {// randominitRandom(data)} else {// k-means||initKMeansParallel(data, distanceMeasureInstance)}}val initTimeInSeconds = (System.nanoTime() - initStartTime) / 1e9logInfo(f"Initialization with $initializationMode took $initTimeInSeconds%.3f seconds.")var converged = falsevar cost = 0.0var iteration = 0val iterationStartTime = System.nanoTime()instr.foreach(_.logNumFeatures(centers.head.vector.size))// Execute iterations of Lloyd's algorithm until converged// Kmeans迭代执行,计算每个样本属于哪个中心点,中心点累加的样本值以及计数。然后根据中心点的所有样本数据进行中心点的更新,并且比较更新前的数值,根据两者距离判断是否完成//迭代次数小于最大迭代次数,并行计算的中心点还没有收敛while (iteration
诺丹姆吉本斯主教中学(Notre Dame-Bishop Gibbons School) 71 0 0 283047.0 13289.0
海景基督高中(Ocean View Christian Academy) 45 0 0 276403.0 13289.0
卡弗里学院(Calvary Baptist Academy) 58 0 0 227567.0 13289.0
...
2. 示例代码
//将加载的rdd数据,每一条变成一个向量,整个数据集变成矩阵
val parsedata &#61; rdd.map { case Row(schoolid, schoolname, locationid, school_type, zs, fee, byj) &#61;>//"特征因子":学校位置id,学校类型,住宿方式,学费,备用金val features &#61; Array[Double](locationid.toString.toDouble, school_type.toString.toDouble, zs.toString.toDouble, fee.toString.toDouble, byj.toString.toDouble)//将数组变成机器学习中的向量Vectors.dense(features)}.cache() //默认缓存到内存中&#xff0c;可以调用persist()指定缓存到哪//用kmeans对样本向量进行训练得到模型//聚类中心val numclusters &#61; List(3, 6, 9)//指定最大迭代次数val numIters &#61; List(10, 15, 20)var bestModel: Option[KMeansModel] &#61; Nonevar bestCluster &#61; 0var bestIter &#61; 0val bestRmse &#61; Double.MaxValuefor (c <- numclusters; i <- numIters) {val model &#61; KMeans.train(parsedata, c, i)//集内均方差总和(WSSSE)&#xff0c;一般可以通过增加类簇的个数 k 来减小误差&#xff0c;一般越小越好&#xff08;有可能出现过拟合&#xff09;val d &#61; model.computeCost(parsedata)println("选择K:" &#43; (c, i, d))if (d
上述示例只是一个简单的demo&#xff0c;实际应用中会更复杂&#xff0c;牵涉到数据的预处理&#xff0c;比如对数据进行量化、归一化&#xff0c;以及如何调参以获取最优训练模型。
推荐文章&#xff1a;
Spark实现推荐系统中的相似度算法
关于一些技术点的随笔记录&#xff08;二&#xff09;
Spark存储Parquet数据到Hive&#xff0c;对map、array、struct字段类型的处理
Kafka中sequence IO、PageCache、SendFile的应用详解
对Spark硬件配置的建议mp.weixin.qq.com关注微信公众号&#xff1a;大数据学习与分享&#xff0c;获取更多技术干货