热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

dataframe添加一列_在PySpark数据框中添加新列的5种方法

每天都在生成太多数据。尽管有时我们可以使用Rapids或Parallelization等工具来管理大数据,但如果您使用的是TB级数据,Spark是一个很

c5ea66f3cac895a9b1e333869da97714.png

每天都在生成太多数据。尽管有时我们可以使用Rapids或Parallelization等工具来管理大数据,但如果您使用的是TB级数据,Spark是一个很好的工具。尽管这篇文章解释了如何使用RDD和基本的Dataframe操作,但是我在使用PySpark Dataframes时错过了很多东西。只有当我需要更多功能时,我才阅读并提出多种解决方案来做一件事情。如何在Spark中创建新列?现在,这听起来微不足道,但请相信我,事实并非如此。您可能想要处理这么多数据,所以我很确定您最终将在工作流中使用大多数这些列创建过程。有时使用Pandas功能,有时使用基于RDD的分区,有时使用成熟的python生态系统。这篇文章将是关于"在Pyspark Dataframe中创建新列的多种方法"。如果您安装了PySpark,则可以跳过下面的"入门"部分。Spark入门我知道很多人不会在系统中安装Spark来尝试和学习。但是安装Spark本身就是一件令人头疼的事情。由于我们想了解它是如何工作的以及如何使用它,因此建议您在此处与社区版一起在线使用Databricks上的Spark。不用担心,它是免费的,尽管资源较少,但是对于我们来说,出于学习目的,它现在就适用。

9f3b4fdcd6209517666cce5bec3e6286.png

一旦注册并登录,将显示以下屏幕。

4e367d1c6dc499e7750d88ac9050b45c.png

您可以在此处启动新笔记本。选择Python笔记本,并为笔记本命名。启动新笔记本并尝试执行任何命令后,笔记本将询问您是否要启动新群集。做吧下一步将检查sparkcontext是否存在。要检查sparkcontext是否存在,您必须运行以下命令:sc

8ca4445bc6eac68cd33206b25609ea2b.png

这意味着我们已经设置了可以运行Spark的笔记本。

数据在这里,我将处理Movielens ml-100k.zip数据集。1000位用户观看1700部电影时获得100,000个评分。在此压缩文件夹中,我们将专门使用的文件是评估文件。该文件名保留为" u.data"如果要上载此数据或任何数据,可以单击左侧的"数据"选项卡,然后使用提供的GUI添加数据。

262c4da761365a05c22bdca643d5659b.png

然后,我们可以使用以下命令加载数据:

ratings = spark.read.load("/FileStore/tables/u.data",format="csv", sep="\t", inferSchema="true", header="false")ratings = ratings.toDF(*['user_id', 'movie_id', 'rating', 'unix_timestamp'])外观如下:

ratings.show()

cc07af946e1182fce92a1074aba222b4.png

好的,现在我们准备开始我们感兴趣的部分。如何在PySpark Dataframe中创建一个新列?

使用Spark本机函数在PySpark DataFrame中创建新列的最pysparkish方法是使用内置函数。这是创建新列的最高效的编程方式,因此,这是我想进行某些列操作时首先要去的地方。我们可以将.withcolumn与PySpark SQL函数一起使用来创建新列。本质上,您可以找到已经使用Spark函数实现的String函数,Date函数和Math函数。我们可以将spark函数导入为:

import pyspark.sql.functions as F我们的第一个函数F.col函数使我们可以访问列。因此,如果我们想将一栏乘以2,可以将F.col用作:

ratings_with_scale10 = ratings.withColumn("ScaledRating", 2*F.col("rating"))ratings_with_scale10.show()

812ed54dc2c470da4674db84e360790e.png

我们还可以使用数学函数,例如F.exp函数:

ratings_with_exp = ratings.withColumn("expRating", 2*F.exp("rating"))ratings_with_exp.show()

c970864c65576fa02280d1124eec2356.png

此模块中提供了许多其他功能,足以满足大多数简单的用例。您可以在此处查看功能列表。

Spark UDF有时我们想对一列或多列做复杂的事情。可以将其视为对PySpark数据框到单列或多列的映射操作。尽管Spark SQL函数确实解决了许多有关创建列的用例,但只要我想使用更成熟的Python功能时,我都会使用Spark UDF。要使用Spark UDF,我们需要使用F.udf函数将常规的python函数转换为Spark UDF。我们还需要指定函数的返回类型。在此示例中,返回类型为StringType()

import pyspark.sql.functions as Ffrom pyspark.sql.types import *defsomefunc(value): if value <3: return &#39;low&#39; else: return &#39;high&#39;#convert to a UDF Function by passing in the function and return type of function udfsomefunc &#61; F.udf(somefunc, StringType())ratings_with_high_low &#61; ratings.withColumn("high_low", udfsomefunc("rating"))ratings_with_high_low.show()

e9b4fd96f8e2809c73d753fcc4b707a8.png

使用RDD有时&#xff0c;Spark UDF和SQL函数对于特定用例而言都是不够的。您可能想利用Spark RDD获得的更好的分区。或者&#xff0c;您可能想在Spark RDD中使用组函数。您可以使用此方法&#xff0c;主要是在需要访问python函数内部spark数据框中的所有列时。无论如何&#xff0c;我发现使用RDD创建新列的这种方式对于有经验的RDD(这是Spark生态系统的基本组成部分)的人们非常有用。下面的过程利用该功能在Row和pythondict对象之间进行转换。我们将行对象转换为字典。按照我们的习惯使用字典&#xff0c;然后将该字典再次转换回行。

import mathfrom pyspark.sql import Rowdefrowwise_function(row): # convert row to dict: row_dict &#61; row.asDict() # Add a new key in the dictionary with the new column name and value. row_dict[&#39;Newcol&#39;] &#61; math.exp(row_dict[&#39;rating&#39;]) # convert dict to row: newrow &#61; Row(**row_dict) # return new row return newrow# convert ratings dataframe to RDDratings_rdd &#61; ratings.rdd# apply our function to RDDratings_rdd_new &#61; ratings_rdd.map(lambda row: rowwise_function(row))# Convert RDD Back to DataFrameratings_new_df &#61; sqlContext.createDataFrame(ratings_rdd_new)ratings_new_df.show()

8de7f9a1cb046cd8cac9ffa2dd1b9bfe.png

Pandas UDFSpark版本2.3.1中引入了此功能。这使您可以在Spark中使用Pands功能。我通常在需要在Spark数据帧上运行groupby操作或需要创建滚动功能并想使用Pandas滚动功能/窗口功能的情况下使用它。我们使用它的方式是使用F.pandas_udf装饰器。我们在这里假设该函数的输入将是一个熊猫数据框。我们需要从该函数依次返回一个Pandas数据框。这里唯一的复杂性是我们必须为输出数据框提供一个架构。我们可以使用以下格式来实现。

# Declare the schema for the output of our functionoutSchema &#61; StructType([StructField(&#39;user_id&#39;,IntegerType(),True),StructField(&#39;movie_id&#39;,IntegerType(),True),StructField(&#39;rating&#39;,IntegerType(),True),StructField(&#39;unix_timestamp&#39;,IntegerType(),True),StructField(&#39;normalized_rating&#39;,DoubleType(),True)])# decorate our function with pandas_udf decorator&#64;F.pandas_udf(outSchema, F.PandasUDFType.GROUPED_MAP)defsubtract_mean(pdf): # pdf is a pandas.DataFrame v &#61; pdf.rating v &#61; v - v.mean() pdf[&#39;normalized_rating&#39;] &#61;v return pdfrating_groupwise_normalization &#61; ratings.groupby("movie_id").apply(subtract_mean)rating_groupwise_normalization.show()

e2c58c3c2ad4f51837397fd23b1be823.png

我们还可以利用它在每个火花节点上训练多个单独的模型。为此&#xff0c;我们复制数据并为每个复制提供一个键和一些训练参数&#xff0c;例如max_depth等。然后&#xff0c;我们的函数将使用熊猫Dataframe&#xff0c;运行所需的模型&#xff0c;然后返回结果。结构如下所示。

# 0. Declare the schema for the output of our functionoutSchema &#61; StructType([StructField(&#39;replication_id&#39;,IntegerType(),True),StructField(&#39;RMSE&#39;,DoubleType(),True)])# decorate our function with pandas_udf decorator&#64;F.pandas_udf(outSchema, F.PandasUDFType.GROUPED_MAP)defrun_model(pdf): # 1. Get hyperparam values num_trees &#61; pdf.num_trees.values[0] depth &#61; pdf.depth.values[0] replication_id &#61; pdf.replication_id.values[0] # 2. Train test split Xtrain,Xcv,ytrain,ycv &#61; train_test_split..... # 3. Create model using the pandas dataframe clf &#61; RandomForestRegressor(max_depth &#61; depth, num_trees&#61;num_trees,....) clf.fit(Xtrain,ytrain) # 4. Evaluate the model rmse &#61; RMSE(clf.predict(Xcv,ycv) # 5. return results as pandas DF res &#61;pd.DataFrame({&#39;replication_id&#39;:replication_id,&#39;RMSE&#39;:rmse}) return resresults &#61; replicated_data.groupby("replication_id").apply(run_model)以上只是一个想法&#xff0c;而不是一个有效的代码。尽管应该稍作修改。

使用SQL对于喜欢SQL的人&#xff0c;甚至可以使用SQL创建列。为此&#xff0c;我们需要注册一个临时SQL表&#xff0c;然后使用带有附加列的简单选择查询。一个人也可以用它来进行联接。

ratings.registerTempTable(&#39;ratings_table&#39;)newDF &#61; sqlContext.sql(&#39;select *, 2*rating as newCol from ratings_table&#39;)newDF.show()

116d1348213fc6fe4092b2588a1c49c4.png

希望我已经很好地介绍了列创建过程&#xff0c;以帮助您解决Spark问题。

- END -

文源网络&#xff0c;仅供学习之用&#xff0c;如有侵权&#xff0c;联系删除。往期精彩

ed16a11e6244807d4fce86da5af0b052.png

◆  50款开源工具你都用过吗&#xff1f;

◆  python&#43;C、C&#43;&#43;混合编程的应用

◆  python网络爬虫的基本原理详解

◆  Python自动操控excel&#xff0c;一小时解决你一天的工作

◆  如何用Python增强Excel&#xff0c;减少处理复杂数据的痛苦&#xff1f;

2c0fe903de232fcff6aa765e5671400d.png




推荐阅读
  • 如何在Spark数据排序过程中有效避免内存溢出(OOM)问题
    本文深入探讨了在使用Spark进行数据排序时如何有效预防内存溢出(OOM)问题。通过具体的代码示例,详细阐述了优化策略和技术手段,为读者在实际工作中遇到类似问题提供了宝贵的参考和指导。 ... [详细]
  • 本文探讨了在不解压的情况下,如何高效地从包含文本文件的.gz压缩文件中查找特定字符串的方法。通过利用特定的工具和技术,可以在保持文件压缩状态的同时,快速定位和检索所需信息,提高处理大规模数据集时的效率和性能。 ... [详细]
  • 在Linux环境下编译安装Heartbeat时,常遇到依赖库缺失的问题。为确保顺利安装,建议预先通过yum安装必要的开发库,如glib2-devel、libtool-ltdl-devel、net-snmp-devel、bzip2-devel和ncurses-devel等。这些库是编译过程中不可或缺的组件,能够有效避免编译错误,确保Heartbeat的稳定运行。 ... [详细]
  • C#编程指南:实现列表与WPF数据网格的高效绑定方法 ... [详细]
  • 深入解析Gradle中的Project核心组件
    在Gradle构建系统中,`Project` 是一个核心组件,扮演着至关重要的角色。通过使用 `./gradlew projects` 命令,可以清晰地列出当前项目结构中包含的所有子项目,这有助于开发者更好地理解和管理复杂的多模块项目。此外,`Project` 对象还提供了丰富的配置选项和生命周期管理功能,使得构建过程更加灵活高效。 ... [详细]
  • 成功实现Asp.Net MVC3网站与MongoDB数据库的高效集成
    我们成功地构建了一个基于Asp.NET MVC3框架的网站,并实现了与MongoDB数据库的高效集成。此次更新不仅完善了基本的创建和显示功能,还全面实现了数据的增删改查操作。在创建功能方面,我们修复了之前代码中的错误,确保每个属性都能正确生成。此外,我们还对数据模型进行了优化,以提高系统的性能和稳定性。 ... [详细]
  • 投融资周报 | Circle 达成 4 亿美元融资协议,唯一艺术平台 A 轮融资超千万美元 ... [详细]
  • 如何高效启动大数据应用之旅?
    在前一篇文章中,我探讨了大数据的定义及其与数据挖掘的区别。本文将重点介绍如何高效启动大数据应用项目,涵盖关键步骤和最佳实践,帮助读者快速踏上大数据之旅。 ... [详细]
  • 探索聚类分析中的K-Means与DBSCAN算法及其应用
    聚类分析是一种用于解决样本或特征分类问题的统计分析方法,也是数据挖掘领域的重要算法之一。本文主要探讨了K-Means和DBSCAN两种聚类算法的原理及其应用场景。K-Means算法通过迭代优化簇中心来实现数据点的划分,适用于球形分布的数据集;而DBSCAN算法则基于密度进行聚类,能够有效识别任意形状的簇,并且对噪声数据具有较好的鲁棒性。通过对这两种算法的对比分析,本文旨在为实际应用中选择合适的聚类方法提供参考。 ... [详细]
  • 在过去,我曾使用过自建MySQL服务器中的MyISAM和InnoDB存储引擎(也曾尝试过Memory引擎)。今年初,我开始转向阿里云的关系型数据库服务,并深入研究了其高效的压缩存储引擎TokuDB。TokuDB在数据压缩和处理大规模数据集方面表现出色,显著提升了存储效率和查询性能。通过实际应用,我发现TokuDB不仅能够有效减少存储成本,还能显著提高数据处理速度,特别适用于高并发和大数据量的场景。 ... [详细]
  • 在处理大图片时,PHP 常常会遇到内存溢出的问题。为了避免这种情况,建议避免使用 `setImageBitmap`、`setImageResource` 或 `BitmapFactory.decodeResource` 等方法直接加载大图。这些函数在处理大图片时会消耗大量内存,导致应用崩溃。推荐采用分块处理、图像压缩和缓存机制等策略,以优化内存使用并提高处理效率。此外,可以考虑使用第三方库如 ImageMagick 或 GD 库来处理大图片,这些库提供了更高效的内存管理和图像处理功能。 ... [详细]
  • 揭秘腾讯云CynosDB计算层设计优化背后的不为人知的故事与技术细节
    揭秘腾讯云CynosDB计算层设计优化背后的不为人知的故事与技术细节 ... [详细]
  • 在第二课中,我们将深入探讨Scala的面向对象编程核心概念及其在Spark源码中的应用。首先,通过详细的实战案例,全面解析Scala中的类和对象。作为一门纯面向对象的语言,Scala的类设计和对象使用是理解其面向对象特性的关键。此外,我们还将介绍如何通过阅读Spark源码来进一步巩固对这些概念的理解。这不仅有助于提升编程技能,还能为后续的高级应用开发打下坚实的基础。 ... [详细]
  • 利用 Python 中的 Altair 库实现数据抖动的水平剥离分析 ... [详细]
  • 在循环读取文本文件时,经常会遇到一些常见的错误,如日期格式不正确、文件路径错误等。本文详细分析了这些问题,并提供了具体的解决方法,包括如何正确处理日期字符串和确保文件路径的准确性。通过这些方法,可以有效提高数据读取的稳定性和可靠性。 ... [详细]
author-avatar
请叫我浪漫先生_858
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有