作者:土豆小妈姐_645 | 来源:互联网 | 2023-08-31 19:38
由于Scala的学习成本有些高,所以,以下的demo都是基于python的。如果想了解spark架构,可以移步到博客点击打开链接Wordcount介绍作为类似于hello
由于Scala的学习成本有些高,所以,以下的demo都是基于python的。如果想了解spark架构,可以移步到博客点击打开链接
Wordcount介绍
作为类似于hello word一样经典的入门代码,wordcount主要是完成词频统计的,在spark框架中,它仍然是map和reduce两个步骤,但是代码的很简洁。
代码如下:
#!bin/lib/python
import sys
from operator import add
from pyspark import SparkContext
import sys
reload(sys)
sys.setdefaultencoding("utf-8")
if __name__ == "__main__":
//定义上下文环境
sc = SparkContext(appName="PythonWordCount")
//从输入读取数据
lines = sc.textFile("hdfs://yz-cpu-vm001.hogpu.cc:8020/user/yichen.gong/data/mnli_data/glove.840B.300d.txt",1)
//数据处理,划分,map,reduce
counts = lines.flatMap(lambda x: x.split(' ')) \
.map(lambda x: (x,1)) \
.reduceByKey(add)
output = counts.collect()
//结果写回hdfs
counts.saveAsTextFile("hdfs://yz-cpu-vm001.hogpu.cc:8020/user/laipeng.han/output")
for (word,count) in output:
print "%s: %i" %(word,count)
sc.stop()
其提交指令为:
export HADOOP_CONF_DIR=/usr/hdp/2.5.0.0-1245/hadoop/conf
spark-submit \
--master yarn \
--deploy-mode cluster \
--queue debugqueue \
/home/users/laipeng.han/wordcount.py
以下是必要的参数,参数说明:
master:The master URL for the cluster,由于我用到架构的调度器为yarn,所以这里我们只能写yarn。
deploy-mode :Whether to deploy your driver on the worker nodes (cluster) or locally as an external client (client) (default: client) ,有两个参数,master 和 client,Driver运行的地方,如果选择client,其日志会打印在终端界面。要注意的是,如果设置这个参数,那么需要同时指定上面 master 为 yarn。
queue:提交的任务队列
K-means介绍
很经典的基于距离的聚类算法,首先,我们需要先预测类簇的个数,也就是K的值,然后基于这K个中心点,将所有的点根据距离的远近进行划分,然后对每一个类簇重新计算其质心,然后重新聚类,迭代此过程直到聚类完成。由于它是一个不停的迭代的过程,所以当数据量大的时候还是很适合Spark来处理的。
代码如下:
#!/bin/lib/python
from __future__ import print_function
from numpy import array
from math import sqrt
from pyspark import SparkContext
from pyspark.mllib.clustering import KMeans, KMeansModel
if __name__ == "__main__":
sc = SparkContext(appName="FaceKMeansTest") # SparkContext
# Load and parse the data
data = sc.textFile("hdfs://yz-cpu-vm001.hogpu.cc:8020/user/zhizhong.su/face_fea/feas.txt")
parsedData = data.map(lambda line: array([float(x) for x in line.split(' ')]))
#print data
# Build the model (cluster the data)
clusters = KMeans.train(parsedData, 2000, maxIteratiOns=10, initializatiOnMode="random")
# Evaluate clustering by computing Within Set Sum of Squared Errors
def error(point):
center = clusters.centers[clusters.predict(point)]
return sqrt(sum([x**2 for x in (point - center)]))
WSSSE = parsedData.map(lambda point: error(point)).reduce(lambda x, y: x + y)
print("Within Set Sum of Squared Error = " + str(WSSSE))
# Save and load model
#clusters.save(sc, "target/org/apache/spark/PythonKMeansExample/KMeansModel")
#sameModel = KMeansModel.load(sc, "target/org/apache/spark/PythonKMeansExample/KMeansModel")
# $example off$
sc.stop()
提交任务的指令同上。
注:kill job的指令为:yarn application –kill applicationID