首先是基本的条件概率求解的公式。
在现实生活中,我们经常会碰到已知一个条件概率,求得两个时间交换后的概率的问题。也就是在已知P(A|B)的情况下,如何求得P(B|A). 其中P(A|B)表示在事件B发生的前提下, 事件A发生的概率。
其中P(A)指的是先验概率或者叫做边缘概率。因为他不需要考虑任何B方面的影响。
P(A|B)指的是B发生后A的条件概率。
P(B)是B的先验概率或者是边缘概率,也被称为标准化常量(normalized constant)。
这里给出贝叶斯的计算公式:
对于给定的分类项,求解此分类项在各个类别下出现的概率。哪个最大,哪个就被认为是分好类的类型。
假设 A={a1,a2,a3…}为待分类项,a1,a2是每个待分类项的属性。
有一个类别的集合B={y1,y2….}.我们需要计算各个类别的先验概率,并取对数,计算公式如下.
p(i)=log(p(yi))
=log((i类别的次数+平滑因子)/(总次数+类别数*平滑因子))
我们需要计算P(y1|A),P(y2|A),P(y3|A)等等。求得计算结果的最大值,我们可以认为这个特征属性的类别为y。
首先我们需要计算在每一个类别下的各个特征属性的概率,即P(a1|y1),P(a2|y1),P(a3|y1),
P(a1|y2),P(a2|y2),P(a3|y2,P(a1|y3),P(a2|y3),P(a3|y3)等。
在各个类别下的各个特征属性的条件概率估计,并取对数。
theta(i)(j)=log(p(aj|yi))
=log(sumTermFreqs(j)+平滑因子)-thetaLogDenom.
其中theta(i)(j)是i类别下j的特征的概率,sumTermFreqs(j)是特征j出现的次数,thetaLogDenom 分为如下的两个类型:
假设各个特征属性的概率是相互独立的,有贝叶斯定理,我们有,
由于分母为常数,所以只需要分子求值即可。又因为各个属性相互独立。可以得到:
对上式同取log。
spark 的源码实现分析:
分布式的实现方法大致如下:首先对样本进行聚合操作,统计所有标签出现的次数,和对应的特征之和。即对labledPoint(label, features)这样子的一个元祖采用一个combineByKey聚合函数:对同一标签数据进行聚合统计操作。
通过聚合操作后,可以通过聚合结果先验概率,条件概率。然后返回一个贝叶斯模型。
预测的时候,将需要预测的样本向量乘以theta矩阵,即条件概率矩阵。然后计算每个样本属于每个类别的概率,然后选取其中的最大项作为类别。
NaiveBayes 源码大致框架:
好了接下来我们来看看NaiveBayes的源代码,首先是其伴生对象。
1,object NaiveBayes:
伴生对象主要定义了训练静态贝叶斯的分类模型的train方法。train方法通过设置训练参数进行模型训练,主要有如下的参数:
input:训练样本,其格式为RDD[LabeledPoint],其中labeledPoint格式为(label,features)。
lamda:平滑因子,防止分母出现0.
/***朴素贝叶斯的伴生对象。*/
@Since("0.9.0")
object NaiveBayes {private[spark] val Multinomial: String = "multinomial" //表示的是多项式类型private[spark] val Bernoulli: String = "bernoulli" // 表示的是伯努利类型private[spark] val supportedModelTypes = Set(Multinomial, Bernoulli)//训练贝叶斯模型,根据训练样本,类型为RDD[LabeledPoint]//LabeledPointoink,其格式为(label,features)def train(input: RDD[LabeledPoint]): NaiveBayesModel = {new NaiveBayes().run(input)}@Since("0.9.0")//这个train 方法,除了上一个的基本参数之外,传入了一个平滑因子,lamda//def train(input: RDD[LabeledPoint], lambda: Double): NaiveBayesModel = {new NaiveBayes(lambda, Multinomial).run(input)}@Since("1.4.0")//输入样本,平滑因子,还有模型的类别,分别为多项式类型,和伯努利类型。def train(input: RDD[LabeledPoint], lambda: Double, modelType: String): NaiveBayesModel = {require(supportedModelTypes.contains(modelType),s"NaiveBayes was created with an unknown modelType: $modelType.")new NaiveBayes(lambda, modelType).run(input)}}
2, 接下来是NaiveBayes的主类:
class NaiveBayes:我们首先来看看它的基本的构造器和基本方法:
/***朴素贝叶斯分类器的类,训练一个朴素贝叶斯模型,根据rdd样本数据,* 其格式为(label,features).**训练朴素贝叶斯分类器模型,可以通过TF-IDF 生成向量。用于文档分类,*如果让向量为0-1模式,则可以应用于bernoulli NB,输入的特征必须是非负的。**/class NaiveBayes private (private var lambda: Double,private var modelType: String) extends Serializable with Logging {import NaiveBayes.{Bernoulli, Multinomial}@Since("1.4.0")def this(lambda: Double) = this(lambda, NaiveBayes.Multinomial)@Since("0.9.0")def this() = this(1.0, NaiveBayes.Multinomial) //在这个参数里面默认的平滑因子是1/** Set the smoothing parameter. Default: 1.0. */@Since("0.9.0")def setLambda(lambda: Double): NaiveBayes = { //设置平滑因子。this.lambda = lambdathis}
接下来我们来看看整个朴素贝叶斯最为重要的run方法,所有的核心代码都在这个里面。
1 NaiveBayes的run方法用来训练模型,这个方法主要用于计算先验概率和条件概率。这个方法的实现的逻辑是:首先对样本进行聚合。以label为key,调用combineByKey方法,聚合里面的同一个label的features,得到所有的label的统计(label,(count, features之和))。
2 根据先验概率的计算公式p=log((n+lamda)/(numDocuments+numLabels*lamda))计算每个label的先验概率。根据条件概率log((sumTermFreqs(j)+lamda)/thetaLogDenom)计算每个label的先验概率。
3 根据条件概率log((sumTermFreqs(j)+lamda)/thetaLogDenom)计算在各个label下面的各个features的条件概率,返回的是一个二维数组。
4 最后通过标签列表,类别先验概率,特征的条件概率,类型生成一个朴素贝叶斯模型。
run方法的代码如下:
def run(data: RDD[LabeledPoint]): NaiveBayesModel = {//在这个里面定义了一个函数,来判断输入的特征向量的值。要求所有的向量值非负val requireNonnegativeValues: Vector => Unit = (v: Vector) => {val values = v match { //判断向量的类型,是密集向量还是稀疏向量case sv: SparseVector => sv.values // 根据不同类型的向量,得到其value值。case dv: DenseVector => dv.values}//判断是不是所有的值都大于0,否则抛出一个错误。多项式型需要每一个value值得大于0.if (!values.forall(_ >= 0.0)) {throw new SparkException(s"Naive Bayes requires nonnegative feature values but found $v.")}}//检测所有的伯努利的值,要求所有的向量值为0或者是1.val requireZeroOneBernoulliValues: Vector => Unit = (v: Vector) => {val values = v match {case sv: SparseVector => sv.valuescase dv: DenseVector => dv.values}if (!values.forall(v => v == 0.0 || v == 1.0)) {throw new SparkException(s"Bernoulli naive Bayes requires 0 or 1 feature values but found $v.")}}//从这个地方开始对数据进行计算。//对于每一个特征进行聚合,求得每一个标签的对应的特征的频数,//aggretaded表示通过label为key,聚合同一个label的features特征。他的返回格式是 (label,(计数,features之和))//注意这个combineByKey 算子: 其中,createCombiner表示,当combineByKey第一次遇到值为k的Key时,调用createCombiner函数,将v转换为c//然后是第二个mergeValue: combineByKey不是第一次遇到值为k的Key时,调用mergeValue函数,将v累加到c中//mergeCombiners:将两个c,合并成一个。///注意首先计算的是条件概率,返回个是为(label,(计数,features之和)),//表示每个样本标签的数量的和,和其对应的样本向量的特征之和。val aggregated = data.map(p => (p.label, p.features)).combineByKey[(Long, DenseVector)](//完成从V->C类型的转换,(v:Vector)=》(c:(long,vector))createCombiner = (v: Vector) => { // 根据上面所说的,输入的是一个vector,通过createCombiner函数将if (modelType == Bernoulli) { //将这个v转换成c的格式。格式为(1,densevector)类型。第二步需要做的就是每次遇到一个v,将其合并为c。requireZeroOneBernoulliValues(v)} else {requireNonnegativeValues(v)}(1L, v.copy.toDense) //这个是第一个函数的返回值,即将(c:Vector)=》(v:(Long,DenseVector))},//mergeValue指的是。当接下来遇到vector的时候,将vector合并到c中去。前提是他们的key必须相同。//过程如下:(c:(Long,DenseVector),v:Vector => (c:(Long,DenseVector))) mergeValue = (c: (Long, DenseVector), v: Vector) => {requireNonnegativeValues(v) //判断向量的是否符合条件BLAS.axpy(1.0, v, c._2) //c._2=c._2+v(c._1 + 1L, c._2) //这个地方继续返回一个元祖,其中对于第一个值进行加1操作。这里的c._2表示的是c._2=c._2+v},//接下来对根据相同的key来合并多个c。//mergeCombiners。过程如下(c1:(Long, DenseVector),c2: (Long, DenseVector)) => c:(c:(Long,DenseVector))mergeCombiners = (c1: (Long, DenseVector), c2: (Long, DenseVector)) => {BLAS.axpy(1.0, c2._2, c1._2) //c1._2=c1._2+c2._2(c1._1 + c2._1, c1._2) // 返回一个c。表示在一个key的情况下的每一个特征的数量之和加上其向量之和。}).collect().sortBy(_._1) //有一个排序操作是根据key来排序val numLabels = aggregated.length //aggregated的长度表示的是类别标签的个数var numDocuments = 0L //这个表示文档的数量//注意这个aggredaged的格式。首先它是一个数组。数组的元素的元祖的第二个值的格式为(Long,DenseVector)格式。//下面的这个n表示的是某一个特征下的数量。这个方法主要是对其进行汇总。总文档的数量进行汇总aggregated.foreach { case (_, (n, _)) =>numDocuments += n}//这个是用于计算特征的数量val numFeatures = aggregated.head match { case (_, (_, v)) => v.size }//labels类别标签的列表。val labels = new Array[Double](numLabels)//pi类别的先验概率val pi = new Array[Double](numLabels)//这个表示theta这个特征在各个类别下的概率。是个二维数组,theta[i][j],i表示第i个类别,j表示第j个特征。val theta = Array.fill(numLabels)(new Array[Double](numFeatures))//计算总文档数量的对数值,用于计算theta值。在下面使用到。 val piLogDenom = math.log(numDocuments + numLabels * lambda)var i = 0aggregated.foreach { case (label, (n, sumTermFreqs)) =>//aggregated的每一行表示label,和计数还有其特征向量之和。labels(i) = label //表示取出第一个标签放在lebels(i)里面。pi(i) = math.log(n + lambda) - piLogDenom //计算先验概率,并取log。log((n+lamda)/(numDocuments+numlabels*lamda))val thetaLogDenom = modelType match {case Multinomial => math.log(sumTermFreqs.values.sum + numFeatures * lambda) //多项式模型,比如说计算类a下的文章的总数case Bernoulli =>math.log(n + 2.0 * lambda) //贝努力模型,比如说用于计算类a下的文章的总数。case _ =>throw new UnknownError(s"Invalid modelType: $modelType.")}var j = 0 //每一个i类别下,都需要计算j类别的条件概率,每次j从0开始while (j
}
3 最后是贝叶斯分类模型 class NaiveBayesModel
训练完成后,会生成贝叶斯分类模型。其包含如下参数:labels–类别标签列表. pi–每个类别的先验概率,theta–各个特征在各个类别下的先验概率,modelType–多项式或者是伯努利模型。
模型类主要包含一下的方法,即predict方法,load方法和save方法。
首先我们来看看其构造器:
class NaiveBayesModel private[spark] (@Since("1.0.0") val labels: Array[Double], //这个表示的是labels数组@Since("0.9.0") val pi: Array[Double], //这个表示的是先验概率数组@Since("0.9.0") val theta: Array[Array[Double]], //这个表示theta这个特征在各个类别下的概率。条件概率数组,二维数组。@Since("1.4.0") val modelType: String) //这个表示类别extends ClassificationModel with Serializable with Saveable private[mllib] def this(labels: Array[Double], pi: Array[Double], theta: Array[Array[Double]]) =this(labels, pi, theta, NaiveBayes.Multinomial)
接下来是predict方法,默认的输入参数的类型是RDD[Vector],这个方法内部调用的是predict(testData:Vector)方法。该方法的返回值是一个double类型。
//这个方法会将输入的rdd转换为向量,然后调用下一个predict方法
override def predict(testData: RDD[Vector]): RDD[Double] = {val bcModel = testData.context.broadcast(this) //广播一下模型,testData.mapPartitions { iter => //采用mappartition进行操作,对每一个分区进行操作。val model = bcModel.value //在每个分区里面获取广播变量值,iter.map(model.predict) //对分区的每一个元素,调用predict方法。调用的是下面一个方法。}}override def predict(testData: Vector): Double = {modelType match {case Multinomial => //如果是多项式类型的,则调用multinomialCalculationlabels(multinomialCalculation(testData).argmax) //注意这个地方,会调用求最大值,下面类似case Bernoulli => //如果是伯努利类型的,bernoulliCalculationlabels(bernoulliCalculation(testData).argmax)}}
下面是两个计算方法,分别是multinomialCalculation和bernoulliCalculation。
private def multinomialCalculation(testData: Vector) = {val prob = thetaMatrix.multiply(testData) //用条件概率矩阵,乘以样本向量。 theta*testDataBLAS.axpy(1.0, piVector, prob) //prob=1.0*piVector+prob (本来是相乘的,但是取log之后变成相加,结果是一样的。)prob //得到结果之后,去向量的最大值。}private def bernoulliCalculation(testData: Vector) = {testData.foreachActive((_, value) =>if (value != 0.0 && value != 1.0) { 如果不满足条件的话throw new SparkException(s"Bernoulli naive Bayes requires 0 or 1 feature values but found $testData.")})val prob = thetaMinusNegTheta.get.multiply(testData) //用条件概率矩阵,乘以样本向量。theta*testDataBLAS.axpy(1.0, piVector, prob) //prob=1.0*piVector+probBLAS.axpy(1.0, negThetaSum.get, prob)prob}
接下来就是最基本的加载和保存的方法了。分为save方法和load方法。
def load(sc: SparkContext, path: String): NaiveBayesModel = {val sqlContext = SQLContext.getOrCreate(sc)// Load Parquet data.val dataRDD = sqlContext.read.parquet(dataPath(path))// Check schema explicitly since erasure makes it hard to use match-case for checking.checkSchema[Data](dataRDD.schema)val dataArray = dataRDD.select("labels", "pi", "theta", "modelType").take(1)assert(dataArray.length == 1, s"Unable to load NaiveBayesModel data from: ${dataPath(path)}")val data = dataArray(0)val labels = data.getAs[Seq[Double]](0).toArrayval pi = data.getAs[Seq[Double]](1).toArrayval theta = data.getAs[Seq[Seq[Double]]](2).map(_.toArray).toArrayval modelType = data.getString(3)new NaiveBayesModel(labels, pi, theta, modelType)}
def save(sc: SparkContext, path: String, data: Data): Unit = {val sqlContext = SQLContext.getOrCreate(sc)import sqlContext.implicits._// Create JSON metadata.val metadata = compact(render(("class" -> thisClassName) ~ ("version" -> thisFormatVersion) ~("numFeatures" -> data.theta(0).length) ~ ("numClasses" -> data.pi.length)))sc.parallelize(Seq(metadata), 1).saveAsTextFile(metadataPath(path))// Create Parquet data.val dataRDD: DataFrame = sc.parallelize(Seq(data), 1).toDF()dataRDD.write.parquet(dataPath(path))}
好了,谢谢大家,以上就是我对于spark mllib的朴素贝叶斯的代码的解析。