热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

大数据Spark中决策树模型Pipeline的建立和两种验证方法(完整版)

文章目录[toc]一、数据预处理1、加载数据2、SparkSession读取CSV格式文件3、清洗数据4、特征处理4.1、StringIndexer4.2、OneHotEncode

文章目录

    • @[toc]
  • 一、数据预处理
        • 1、加载数据
        • 2、SparkSession读取CSV格式文件
        • 3、清洗数据
        • 4、特征处理
            • 4.1、StringIndexer
            • 4.2、OneHotEncoder
            • 4.3、VectorAssembler
  • 二、建模
      • 分类决策树DecisionTreeClassifier
  • 三、评估(ROC曲线)
  • 四、打包(ML Pipeline)
        • Step 1. 创建流程中 转换器和 模型学习器
        • Step 2. 创建Pipeline实例对象
        • step3. Pipeline 数据处理与训练模型
        • Step 4. PipelineModel模型预测
        • step5、PipelineModel模型保存于加载
        • step6、调用
  • 五、验证选择最优模型
        • 5.1、创建 TrainValidationSplit 实例对象
        • 5.1、Cross-Validation交叉验证
  • 六、提升:随即森林(RF算法)

数据链接

一、数据预处理

1、加载数据

# 导入包
import os
import time
from pyspark.sql import SparkSession# 实例化SparkSession对象,以本地模式是运行Spark程序
spark = SparkSession \.builder \.appName("PySpark_ML_Pipeline") \.master("local[4]")\.getOrCreate()print spark
print spark.sparkContext
'''


'''

2、SparkSession读取CSV格式文件

help(spark.read.csv)
# 读取数据集,
raw_df = spark.read.csv('./datas/train.tsv', header='true', sep='\t',\inferSchema='true')
# 显示条目数
print raw_df.count()
==>7395
raw_df.printSchema()# 由于字段太多,选择某些字段值
raw_df.select('url', 'alchemy_category', 'alchemy_category_score', \'label').show(10)

3、清洗数据

# 定义函数转换 ?转换为 0
def replace_question_func(x):return '0' if x == '?' else x# 注册函数
from pyspark.sql.functions import udf
replace_question = udf(replace_question_func)# col函数将 一个字符串转换为DataFrame中列, 获取对应DataFrame中此列的值
from pyspark.sql.functions import col# 使用自定义的函数,转换数据
df = raw_df.select(['url', 'alchemy_category'] +\[ replace_question(col(column)).cast('double')\.alias(column) for column in raw_df.columns[4:]])df.printSchema()df.select('url', 'alchemy_category', 'alchemy_category_score', \'label').show(10)

这里写图片描述

# 将数据集分为 训练集和测试集
train_df, test_df = df.randomSplit([0.7, 0.3])print train_df.cache().count()
print test_df.cache().count()
"""
5216
2179
"""



4、特征处理

1、alchemy_category类别特征数据转换第一特征转换器、StringIndexer将文字的类别特征 转换 数字第二特征转换器、OneHotEncoder将数值的 类别特征字段 转换为 多个字段的Vector
2、特征的组合第二特征转换器、VectorAssembler将多个特征整合到一起

4.1、StringIndexer

网址:http://spark.apache.org/docs/2.2.0/ml-features.html#stringindexer

# 导入模块
from pyspark.ml.feature import StringIndexer
help(StringIndexer)# 创建StringIndexer实例对象
"""参数说明:inputCol -> 要转换的字段名称outputCol -> 转换后的字段名称
"""
categoryIndexer = StringIndexer(inputCol='alchemy_category',\outputCol='alchemy_category_index')print type(categoryIndexer)
"""
==>
"""

调用StringIndexer类中的 fit 方法,获取到转换器Transformer

categoryTransformer = categoryIndexer.fit(df)
print type(categoryTransformer)# 使用 categoryTransformer 转换器 将所有的 train_df 进行转换
df1 = categoryTransformer.transform(train_df)df1.select('alchemy_category', 'alchemy_category_index').show(10)
"""
+------------------+----------------------+
| alchemy_category|alchemy_category_index|
+------------------+----------------------+
| ?| 0.0|
|arts_entertainment| 2.0|
| ?| 0.0|
| business| 3.0|
|arts_entertainment| 2.0|
| ?| 0.0|
| ?| 0.0|
| recreation| 1.0|
| business| 3.0|
|arts_entertainment| 2.0|
+------------------+----------------------+
only showing top 10 rows
"""df1.printSchema() #查看结构数据

4.2、OneHotEncoder

OneHotEncoder可以将一个数值的类别特征字段转换为多个字段的Vector向量

from pyspark.ml.feature import OneHotEncoder
# 创建 OneHotEncoder 实例对象
encoder = OneHotEncoder(inputCol='alchemy_category_index', outputCol='alchemy_category_index_vector')print type(encoder)
"""

"""df2 = encoder.transform(df1)df2.printSchema()df2.select('alchemy_category', 'alchemy_category_index',\'alchemy_category_index_vector').show(10)

这里写图片描述

4.3、VectorAssembler

特征的组合
~~~~~~~~        第二特征转换器、VectorAssembler,将多个特征整合到一起

from pyspark.ml.feature import VectorAssembler
assembler_inputs = ['alchemy_category_index_vector'] \+ raw_df.columns[4:-1]
print assembler_inputs"""
['alchemy_category_index_vector', 'alchemy_category_score',
'avglinksize', 'commonlinkratio_1', 'commonlinkratio_2',
'commonlinkratio_3', 'commonlinkratio_4', 'compression_ratio','embed_ratio', 'framebased', 'frameTagRatio', 'hasDomainLink',
'linkwordscore', 'news_front_page', 'non_markup_alphanum_characters',
'numberOfLinks', 'numwords_in_url', 'parametrizedLinkRatio',
'spelling_errors_ratio']
"""

######创建 VectorAssembler 实例对象,传递参数,指定合并哪些字段,输出的字段名称
assembler = VectorAssembler(inputCols=assembler_inputs, outputCol='features')
df3 = assembler.transform(df2)df3.printSchema()"""
+--------------------+-----+
| features|label|
+--------------------+-----+
|(35,[0,14,15,16,1...| 1.0|
|(35,[2,13,14,15,1...| 1.0|
|(35,[0,14,15,19,2...| 0.0|
|(35,[3,13,14,15,1...| 1.0|
|(35,[2,13,14,15,1...| 0.0|
+--------------------+-----+
only showing top 5 rows
"""df3.select('features').take(1)
"""
[Row(features=SparseVector(35,
{0: 1.0, 14: 2.1446, 15: 0.7969, 16: 0.3945, 17: 0.332,
18: 0.3203, 19: 0.5022, 22: 0.028, 24: 0.1898, 25: 0.2354,26: 1.0, 27: 1.0, 28: 17.0, 30: 10588.0, 31: 256.0, 32: 5.0, 33: 0.3828, 34: 0.1368}))]
"""

二、建模

分类决策树DecisionTreeClassifier

from pyspark.ml.classification import DecisionTreeClassifier# 使用决策树分类算法
dtc = DecisionTreeClassifier(featuresCol='features', labelCol='label',impurity='gini', maxDepth=5, maxBins=32)# 将 训练数据 应用到 算法
dtc_model = dtc.fit(df3)# 使用模型预测
df4 = dtc_model.transform(df3)
df4.select('label', 'prediction', 'rawPrediction', 'probability').show(20, truncate=False)

labelpredictionrawPredictionprobability
1.01.0[909.0,1104.0][0.45156482861400893,0.5484351713859911]
1.01.0[909.0,1104.0][0.45156482861400893,0.5484351713859911]
0.00.0[38.0,1.0][0.9743589743589743,0.02564102564102564]
1.01.0[27.0,177.0][0.1323529411764706,0.8676470588235294]
0.00.0[95.0,28.0][0.7723577235772358,0.22764227642276422]
1.01.0[909.0,1104.0][0.45156482861400893,0.5484351713859911]
1.01.0[909.0,1104.0][0.45156482861400893,0.5484351713859911]
1.00.0[144.0,95.0][0.602510460251046,0.39748953974895396]
0.00.0[363.0,146.0][0.7131630648330058,0.2868369351669941]
0.00.0[86.0,23.0][0.7889908256880734,0.21100917431192662]
0.00.0[144.0,95.0][0.602510460251046,0.39748953974895396]
0.00.0[144.0,95.0][0.602510460251046,0.39748953974895396]
0.00.0[43.0,1.0][0.9772727272727273,0.022727272727272728]
1.01.0[909.0,1104.0][0.45156482861400893,0.5484351713859911]
1.01.0[909.0,1104.0][0.45156482861400893,0.5484351713859911]
1.01.0[27.0,177.0][0.1323529411764706,0.8676470588235294]
1.01.0[129.0,417.0][0.23626373626373626,0.7637362637362637]
1.01.0[909.0,1104.0][0.45156482861400893,0.5484351713859911]
0.01.0[909.0,1104.0][0.45156482861400893,0.5484351713859911]
1.01.0[909.0,1104.0][0.45156482861400893,0.5484351713859911]

only showing top 20 rows

三、评估(ROC曲线)

from pyspark.ml.evaluation import BinaryClassificationEvaluator
# 创建 实例对象, 传递参数值
evaluator = BinaryClassificationEvaluator(labelCol='label',rawPredictionCol='rawPrediction')
# 计算指标 metricName="areaUnderROC"
auc = evaluator.evaluate(df4)
print auc
"""
0.6087142511
"""

总结上述开发流程:1、从原始数据 提取特征数据2、特征数据应用到算法,得到模型3、使用模型预测数据4、评估模型Pipeline:相当于一个“算法” -> 模型学习器包含两部分内容;-a. Estimator 模型学习器fit()-b. transformers 转换器transformer()
pipeline = Pipeline(Stages(.....))pipeline.fit().....
model.transfor().....

四、打包(ML Pipeline)

Step 1. 创建流程中 转换器和 模型学习器

# 1. 导入全部需要 模块
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.classification import DecisionTreeClassifier
# a. StringIndexer
string_indexer = StringIndexer(inputCol='alchemy_category',\outputCol='alchemy_category_index')# b. OneHotEncoding
one_hot_encoder = OneHotEncoder(inputCol='alchemy_category_index',\outputCol='alchemy_category_index_vector')# c. VectorAessmbler
assembler_inputs = ['alchemy_category_index_vector'] \+ raw_df.columns[4:-1]
vector_assembler = VectorAssembler(inputCols=assembler_inputs,\outputCol='features')# d. DecisionTreeClassifier 模型学习器
dt = DecisionTreeClassifier(featuresCol='features', labelCol='label',\impurity='gini', maxDepth=5, maxBins=32)

Step 2. 创建Pipeline实例对象

# 按照数据处理顺序
pipeline = Pipeline(stages=[string_indexer,one_hot_encoder, vector_assembler, dt])
pipeline.getStages()"""
[StringIndexer_43e8b50676a58dad4d05,OneHotEncoder_4bf2a31a6b4b12aebd78,VectorAssembler_4429bf16ed1cc6c14207,DecisionTreeClassifier_451682088ef8fcaa79ae]"""

step3. Pipeline 数据处理与训练模型

# 调用fit方法学,
pipleline_model = pipeline.fit(train_df)type(pipleline_model) #pyspark.ml.pipeline.PipelineModel
pipleline_model.stages[3]

Step 4. PipelineModel模型预测

predict_df = pipleline_model.transform(test_df)

step5、PipelineModel模型保存于加载

# 保存 模型
pipleline_model.save('./datas/dtc-model')

step6、调用

# 加载模型
from pyspark.ml.pipeline import PipelineModelload_pipeline_model = PipelineModel.load('./datas/dtc-model')
load_pipeline_model.stages[3]# 预测
load_pipeline_model.transform(test_df) \.select('label', 'prediction', 'rawPrediction',\'probability').show(20, truncate=False)

labelpredictionrawPredictionprobability
0.00.0[361.0,300.0][0.546142208774584,0.45385779122541603]
1.00.0[144.0,95.0][0.602510460251046,0.39748953974895396]
0.01.0[0.0,8.0][0.0,1.0]
1.01.0[129.0,417.0][0.23626373626373626,0.7637362637362637]
0.00.0[363.0,146.0][0.7131630648330058,0.2868369351669941]
0.00.0[363.0,146.0][0.7131630648330058,0.2868369351669941]
1.01.0[909.0,1104.0][0.45156482861400893,0.5484351713859911]
1.01.0[129.0,417.0][0.23626373626373626,0.7637362637362637]
1.01.0[27.0,177.0][0.1323529411764706,0.8676470588235294]
1.01.0[27.0,177.0][0.1323529411764706,0.8676470588235294]
1.01.0[27.0,177.0][0.1323529411764706,0.8676470588235294]
1.01.0[27.0,177.0][0.1323529411764706,0.8676470588235294]
1.01.0[27.0,177.0][0.1323529411764706,0.8676470588235294]
1.01.0[909.0,1104.0][0.45156482861400893,0.5484351713859911]
0.00.0[363.0,146.0][0.7131630648330058,0.2868369351669941]
1.01.0[909.0,1104.0][0.45156482861400893,0.5484351713859911]
1.01.0[909.0,1104.0][0.45156482861400893,0.5484351713859911]
1.01.0[909.0,1104.0][0.45156482861400893,0.5484351713859911]
1.00.0[361.0,300.0][0.546142208774584,0.45385779122541603]
0.00.0[86.0,23.0][0.7889908256880734,0.21100917431192662]

only showing top 20 rows

五、验证选择最优模型

5.1、创建 TrainValidationSplit 实例对象

(训练检验分离选择最优)
导入模块

from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder

构建一个 决策树分类算法 网格参数

"""调整三个参数:-1. 不纯度度量-2. 最多深度-3. 最大分支数
"""
param_grid = ParamGridBuilder() \.addGrid(dt.impurity, ['gini', 'entropy']) \.addGrid(dt.maxDepth, [5, 10, 20]) \.addGrid(dt.maxBins, [8, 16, 32]) \.build()print type(param_grid)
for param in param_grid:print param

针对二分类创建模型评估器

binary_class_evaluator = BinaryClassificationEvaluator(labelCol='label',\rawPredictionCol='rawPrediction')

创建 TrainValidationSplit 实例对象

"""__init__(self, estimator=None, estimatorParamMaps=None, evaluator=None, trainRatio=0.75, seed=None)参数解释:estimator:模型学习器,针对哪个算法进行调整超参数,这里是DTestimatorParamMaps:算法调整的参数组合evaluator:评估模型的评估器,比如二分类的话,使用auc面积trainRatio:训练集与验证集 所占的比例,此处的值表示的是 训练集比例
"""train_validataion_split = TrainValidationSplit(estimator=dt,evaluator=binary_class_evaluator, estimatorParamMaps=param_grid, trainRatio=0.8)type(train_validataion_split)
#pyspark.ml.tuning.TrainValidationSplit

建立新的Pipeline实例对象

#使用 train_validataion_split 代替 原先 dt
tvs_pipeline = Pipeline(stages=[string_indexer, \one_hot_encoder, vector_assembler, \train_validataion_split])
# tvs_pipeline 进行数据处理、模型训练(找到最佳模型)
tvs_pipeline_model = tvs_pipeline.fit(train_df)best_model = tvs_pipeline_model.stages[3].bestModel
"""
DecisionTreeClassificationModel (uid=DecisionTreeClassifier_\
451682088ef8fcaa79ae) of depth 20 with 1851 nodes
"""

评估最佳模型

predictions_df = tvs_pipeline_model.transform(test_df)model_auc = binary_class_evaluator.evaluate(predictions_df)
print model_auc0.649609702764

5.1、Cross-Validation交叉验证

"""__init__(self, estimator=None, estimatorParamMaps=None, \evaluator=None, numFolds=3, seed=None)假设 K-Fold的CrossValidation交叉验证 K = 3,将数据分为3个部分:1、A + B作为训练,C作为验证2、B + C作为训练,A作为验证3、A + C最为训练,B作为验证"""# 导入模块
from pyspark.ml.tuning import CrossValidator
# 构建 CrossValidator实例对象,设置相关参数
cross_validator = CrossValidator(estimator=dt, \evaluator=binary_class_evaluator,\estimatorParamMaps=param_grid, numFolds=3)# 创建Pipeline
cv_pipeline = Pipeline(stages=[string_indexer, one_hot_encoder, \vector_assembler, cross_validator])

使用 cv_pipeline 进行训练与验证(交叉)

cv_pipeline_model = cv_pipeline.fit(train_df)

查看最佳模型

best_model = cv_pipeline_model.stages[3].bestModel
"""
DecisionTreeClassificationModel (uid=DecisionTreeClassifier_ \
451682088ef8fcaa79ae) of depth 10 with 527 nodes
"""

使用测试集评估最佳模型

cv_predictions = cv_pipeline_model.transform(test_df)
cv_model_auc = binary_class_evaluator.evaluate(cv_predictions)
print cv_model_auc

六、提升:随即森林(RF算法)

# 导入随机森林分类算法模块
from pyspark.ml.classification import RandomForestClassifier# 创建RFC实例对象
rfc = RandomForestClassifier(labelCol='label', \featuresCol='features',\numTrees=10, \featureSubsetStrategy="auto",\maxDepth=5, \maxBins=32, \impurity="gini")# 创建Pipeline实例对象
rfc_pipeline = Pipeline(stages=[string_indexer, one_hot_encoder, \vector_assembler, rfc])# 使用训练数据训练模型
rfc_pipeline_model = rfc_pipeline.fit(train_df)# 预测
rfc_predictions = rfc_pipeline_model.transform(test_df)rfc_model_auc = binary_class_evaluator.evaluate(rfc_predictions)
print rfc_model_auc
"""
0.716242043615
"""


推荐阅读
  • Mysql MySqlBulkLoader在.NET平台下的批量插入
    批量导入publicboolTranBatchImpo ... [详细]
  • (一)javax.mail.Session:Session类代表JavaMail中的一次邮件会话.每个基于JavaMail的应用程序至少有一次会话,也可以产生多次会话.发送邮件之前 ... [详细]
  • #import挂载对象所需要的参数(UIAlertView挂载对象)staticconstcharkRepresente ... [详细]
  • 我正在使用数组列表通过构建一个交互式菜单供用户选择来存储来自用户输入的值。到目前为止,我的两个选择是为用户提供向列表输入数据和读取列表的全部内容。到目前为止,我创建的代码由两个类组成。 ... [详细]
  • 在这一期的SendMessage函数应用中,我将向大家介绍如何利用消息函数来扩展树型列表(TreeView)控件的功能相信对于树型列表控件大家十分的熟悉, ... [详细]
  • IDEA实用插件Lombok
    LombokLombok是一个可以通过简单的注解形式来帮助我们简化消除一些必须有但显得很臃肿的Java代码的工具,通过使用对应的注解,可以在编译源码的时候生成对应的方法。通常,我们所定义的对象和b ... [详细]
  • 使用RSACryptoServiceProvider进行公钥加密我已经在CodeProject上发表了一篇文章,解释了如何使用RSA提供程序进行加密和解密:RSA私钥加密虽然200 ... [详细]
  • Java发布webservice应用并发送SOAP请求调用
    webservice框架有很多,比如axis、axis2、cxf、xFire等等,做服务端和做客户端都可行,个人感觉使用这些框架的好处是减少了对于接口信息的解析,最主要的是减少了对于传递于网络中XML ... [详细]
  • C#的Type对象的简单应用
    通过Type对象可以获取类中所有的公有成员直接贴代码:classMyClass{privatestringname;privateintid;publicstringcity;pu ... [详细]
  • 字符串匹配: BF与KMP算法
    文章目录一.BF算法1.算法思想2.代码实现二.KMP算法1.算法思想概述2.理解基于最长相等前后缀进行匹配3.代码中如何实现next数组5.代码实现6.next数组的优化一.BF ... [详细]
  • Flex中使用filter过滤数据 ... [详细]
  • 1.什么是hashcode方法?hashcode方法返回对象的哈希码值在应用程序的执行期间,只要对象的equals方法的比较操作所用到的信息没有改变& ... [详细]
  • IOSUITableView解析(一)
    UITableView的作用由于Iphone的大小有限,所以UITableView的作用是巨大的。比如QQ,微博等应用都用到了该控件。UITableVi ... [详细]
  • 本文分析HashMap的实现原理。数据结构(散列表)HashMap是一个散列表(也叫哈希表),用来存储键值对( ... [详细]
  • 第38天:Python decimal 模块
    by程序员野客在我们开发工作中浮点类型的使用还是比较普遍的,对于一些涉及资金金额的计算更是不能有丝毫误差,Python的decimal模块为浮点型精确计算提供了支持。1简介deci ... [详细]
author-avatar
starry-night--_848
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有