自然语言处理是全球数据科学团队的重要过程之一。随着数据的不断增长,大多数组织已经转移到大数据平台,如apachehadoop和AWS、Azure和GCP等云产品。
这些平台不仅能够处理大数据,使组织能够对非结构化数据(如文本分类)进行大规模分析。但在机器学习方面,大数据系统和机器学习工具之间仍然存在差距。
流行的机器学习python库,如scikit-learn和Gensim,经过高度优化,可以在单节点计算机上执行,而不是为分布式环境设计的。
Apache Spark MLlib是许多帮助弥合这一差距的工具之一,它提供了大多数机器学习模型,如线性回归、Logistic回归、支持向量机、随机森林、K-means、LDA等,以执行最常见的机器学习任务。
除了机器学习算法,Spark MLlib还提供了大量的特征变换器,如Tokenizer、StopWordRemover、n-grams和countvector、TF-IDF和Word2Vec等。
虽然这些转换器和提取器足以构建基本的NLP管道,但是要构建一个更全面和生产级的管道,我们需要更先进的技术,如词干分析、词法化、词性标记和命名实体识别。
Spark NLP提供了各种注释器来执行高级NLP任务。有关更多信息,请在网站上查看注释器列表及其用法
https://nlp.johnsnowlabs.com/docs/en/annotators。
让我们继续看看如何在AWS EMR上设置Spark NLP。
1.在启动EMR集群之前,我们需要创建一个引导操作。引导操作用于设置其他软件或自定义群集节点的配置。以下是可用于在EMR集群上设置Spark NLP的引导操作,
#!/bin/bashsudo yum install -y python36-devel python36-pip python36-setuptools python36-virtualenvsudo python36 -m pip install --upgrade pip#sudo python36 -m pip install pandas#sudo python36 -m pip install boto3#sudo python36 -m pip install re#sudo python36 -m pip install spark-nlp==2.4.5
创建shell脚本之后,将该脚本复制到AWS S3中的一个位置。你还可以根据需要安装其他python包。
2.我们可以使用AWS控制台、API或python中的boto3库来启动EMR集群。使用Python的好处是,无论何时需要实例化集群或将其添加到工作流中,都可以重用代码。
下面是实例化EMR集群的python代码。
import boto3region_name='region_name'def get_security_group_id(group_name, region_name): ec2 = boto3.client('ec2', region_name=region_name) response = ec2.describe_security_groups(GroupNames=[group_name]) return response['SecurityGroups'][0]['GroupId']emr = boto3.client('emr', region_name=region_name)cluster_response = emr.run_job_flow( Name='cluster_name', # 更新值 ReleaseLabel='emr-5.27.0', LogUri='s3_path_for_logs', # 更新值 Instances={ 'InstanceGroups': [ { 'Name': "Master nodes", 'Market': 'ON_DEMAND', 'InstanceRole': 'MASTER', 'InstanceType': 'm5.2xlarge', # 根据要求进行变更 'InstanceCount': 1 #对于主节点高可用性,设置计数大于1 }, { 'Name': "Slave nodes", 'Market': 'ON_DEMAND', 'InstanceRole': 'CORE', 'InstanceType': 'm5.2xlarge', # 根据要求进行变更 'InstanceCount': 2 } ], 'KeepJobFlowAliveWhenNoSteps': True, 'Ec2KeyName' : 'key_pair_name', # 更新值 'EmrManagedMasterSecurityGroup': get_security_group_id('ElasticMapReduce-master', region_name=region_name) 'EmrManagedSlaveSecurityGroup': get_security_group_id('ElasticMapReduce-master', region_name=region_name) }, BootstrapActions=[ { 'Name':'install_dependencies', 'ScriptBootstrapAction':{ 'Args':[], 'Path':'path_to_bootstrapaction_on_s3' # 更新值 } }], Steps = [], VisibleToAllUsers=True, JobFlowRole='EMR_EC2_DefaultRole', ServiceRole='EMR_DefaultRole', Applications=[ { 'Name': 'hadoop' }, { 'Name': 'spark' }, { 'Name': 'hive' }, { 'Name': 'zeppelin' }, { 'Name': 'presto' } ], Configurations=[ # YARN { "Classification": "yarn-site", "Properties": {"yarn.nodemanager.vmem-pmem-ratio": "4", "yarn.nodemanager.pmem-check-enabled": "false", "yarn.nodemanager.vmem-check-enabled": "false"} }, # HADOOP { "Classification": "hadoop-env", "Configurations": [ { "Classification": "export", "Configurations": [], "Properties": {"JAVA_HOME": "/usr/lib/jvm/java-1.8.0"} } ], "Properties": {} }, # SPARK { "Classification": "spark-env", "Configurations": [ { "Classification": "export", "Configurations": [], "Properties": {"PYSPARK_PYTHON":"/usr/bin/python3", "JAVA_HOME": "/usr/lib/jvm/java-1.8.0"} } ], "Properties": {} }, { "Classification": "spark", "Properties": {"maximizeResourceAllocation": "true"}, "Configurations": [] }, { "Classification": "spark-defaults", "Properties": { "spark.dynamicAllocation.enabled": "true" #default is also true } } ] )
注意:请确保你对用于日志记录和存储引导操作脚本的S3 bucket具有正确的访问权限。
现在我们已经准备好集群了,让我们使用Spark NLP和Spark MLlib在BBC数据上构建一个简单的文本分类示例。
我们将导入所需的库并使用不同的配置参数初始化spark会话。配置值取决于我的本地环境。相应地调整参数。
# 导入Spark NLPfrom sparknlp.base import *from sparknlp.annotator import *from sparknlp.pretrained import PretrainedPipelineimport sparknlpfrom pyspark.sql import SparkSessionfrom pyspark.ml import Pipeline# 使用Spark NLP启动Spark会话#spark = sparknlp.start()spark = SparkSession.builder .appName("BBC Text Categorization") .config("spark.driver.memory","8G") change accordingly .config("spark.memory.offHeap.enabled",True) .config("spark.memory.offHeap.size","8G") .config("spark.driver.maxResultSize", "2G") .config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp_2.11:2.4.5") .config("spark.kryoserializer.buffer.max", "1000M") .config("spark.network.timeout","3600s") .getOrCreate()
我们将使用BBC的数据。你可以从这个链接下载数据。下载以下数据后,使用spark代码加载;
https://www.kaggle.com/yufengdev/bbc-text-categorization?#Get-the-data
# 文件位置和类型file_location = r'pathobbc-text.csv'file_type = "csv"# CSVinfer_schema = "true"first_row_is_header = "true"delimiter = ","df = spark.read.format(file_type) .option("inferSchema", infer_schema) .option("header", first_row_is_header) .option("sep", delimiter) .load(file_location)df.count()
与python使用scikit learn分割数据不同,Spark Dataframe有一个内置函数randomSplit()来执行相同的操作。
(trainingData, testData) = df.randomSplit([0.7, 0.3], seed = 100)
randomSplit()函数需要两个参数viz。权重数组和seed。在我们的例子中,我们将使用70/30分割,其中70%是训练数据,30%是测试数据。
让我们继续使用Spark NLP构建NLP管道。Spark NLP最大的优点之一是它与Spark MLLib模块本机集成,有助于构建由transformers和estimators组成的综合ML管道。
这个管道可以包括诸如CountVectorizer或HashingTF和IDF之类的特征提取模块。我们还可以在这个管道中包含一个机器学习模型。
下面是由具有特征提取和机器学习模型的NLP管道组成的示例;
from pyspark.ml.feature import HashingTF, IDF, StringIndexer, SQLTransformer,IndexToStringfrom pyspark.ml.classification import LogisticRegressionfrom pyspark.ml.evaluation import MulticlassClassificationEvaluator# 转换text列为nlp文件document_assembler = DocumentAssembler() .setInputCol("text") .setOutputCol("document")#将文档转换为标识数组tokenizer = Tokenizer() .setInputCols(["document"]) .setOutputCol("token")# 清理标识normalizer = Normalizer() .setInputCols(["token"]) .setOutputCol("normalized")# 删除停用词stopwords_cleaner = StopWordsCleaner() .setInputCols("normalized") .setOutputCol("cleanTokens") .setCaseSensitive(False)stemmer = Stemmer() .setInputCols(["cleanTokens"]) .setOutputCol("stem")# 将自定义文档结构转换为标识数组。finisher = Finisher() .setInputCols(["stem"]) .setOutputCols(["token_features"]) .setOutputAsArray(True) .setCleanAnnotations(False)# 生成频率hashingTF = HashingTF(inputCol="token_features", outputCol="rawFeatures", numFeatures=1000)# 生成逆文档频率idf = IDF(inputCol="rawFeatures", outputCol="features", minDocFreq=5)# 将标签(字符串)转换为整数。label_stringIdx = StringIndexer(inputCol = "category", outputCol = "label")# 定义一个简单的多项式逻辑回归模型。尝试不同的超参数组合,看看哪个更适合你的数据。你也可以尝试不同的算法来比较分数。lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.0)# 将索引(整数)转换为相应的类标签label_to_stringIdx = IndexToString(inputCol="label", outputCol="article_class")# 定义nlp管道nlp_pipeline = Pipeline( stages=[document_assembler, tokenizer, normalizer, stopwords_cleaner, stemmer, finisher, hashingTF, idf, label_stringIdx, lr, label_to_stringIdx])
现在我们的NLP管道已经准备好了,让我们根据训练数据训练我们的模型。
# 在训练数据上拟合管道pipeline_model = nlp_pipeline.fit(trainingData)
一旦训练完成,我们就可以预测测试数据上的类标签。
# 对测试数据进行预测predictions = pipeline_model.transform(testData)
对训练后的模型进行评估对于理解模型如何在看不见的数据上运行是非常重要的。我们将看到3个流行的评估指标,准确度、精确度和召回率。
# 导入evaluatorfrom pyspark.ml.evaluation import MulticlassClassificationEvaluatorevaluator = MulticlassClassificationEvaluator( labelCol="label", predictionCol="prediction", metricName="accuracy")accuracy = evaluator.evaluate(predictions)print("Accuracy = %g" % (accuracy))print("Test Error = %g " % (1.0 - accuracy))
evaluator = MulticlassClassificationEvaluator( labelCol="label", predictionCol="prediction", metricName="weightedPrecision")accuracy = evaluator.evaluate(predictions)print("Accuracy = %g" % (accuracy))print("Test Error = %g " % (1.0 - accuracy))
evaluator = MulticlassClassificationEvaluator( labelCol="label", predictionCol="prediction", metricName="weightedRecall")accuracy = evaluator.evaluate(predictions)print("Accuracy = %g" % (accuracy))print("Test Error = %g " % (1.0 - accuracy))
根据业务用例,你可以决定使用哪个度量来评估模型。
例如.如果一个机器学习模型被设计用来根据某些参数来检测癌症,那么最好使用召回率,因为公司无法承受假负例(一个患有癌症但模型没有检测到癌症的人),而如果机器学习模型旨在生成用户推荐,公司可以负担得起误报(10条建议中有8条符合用户配置文件),因此可以使用精确度作为评估指标。
在成功地训练、测试和评估模型之后,你可以将模型保存到磁盘,并在不同的Spark应用程序中使用它。要将模型保存到光盘,请使用以下代码;
pipeline_model.save('/path/to/storage_location')
Spark NLP提供了大量的注释器和转换器来构建数据预处理管道。Sparl NLP与Spark MLLib无缝集成,使我们能够在分布式环境中构建端到端的自然语言处理项目。
在本文中,我们研究了如何在AWS EMR上安装Spark NLP并实现了BBC数据的文本分类。我们还研究了Spark MLlib中的不同评估指标,并了解了如何存储模型以供进一步使用。