热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

VirginHyperloopOne使用Koalas将处理时间降到几分钟无缝将pandas切换成ApacheSpark

Koalas为什么值得你去尝试。只需要修改不到1%的pandas代码,我们就能将我们以前的代码跑在Koalas和Spark上。我们将处理时间提升了10倍,从几小时下降到了几分钟。并


Virgin Hyperloop One(超级高铁公司)是一家从事超级高铁研究的公司,致力于能让高铁达到飞机的速度并且拥有更低的成本。为了能够制造一个商业的系统,我们需要收集并且分析非常大量的各种不同的数据,包括各种运行测试数据,多种模拟数据,技术设施数据,甚至社会经济数据等等。我们之前绝大部分处理数据的代码都是基于pandas使用python脚本来进行处理。之所以写这篇文档是因为我们想分享我们如何使用Koalas在很少修改代码的情况下来扩展我们的处理能力和节省大量处理时间的。

随着我们的业务不断的增长我们的数据量也在不断的增长。我们的数据处理范围越来越大,复杂程度越来越高,这导致我们基于pandas的python脚本越来越慢,知道慢到不能满足我们的商业需求。所以我们调研了Spark,希望使用Spark能够带来更快的处理时间并且能够提供按需灵活弹性的能力。我们尝试这样做了,但是很快我们发现切换到Spark的过程中,我们必须付出非常多的时间把我们之前的基于pandas的python代码修改成基于PySpark的代码。我们意识到我们需要一套能够不需要修改过多代码就能迁移到Spark上的解决方案。我们非常高兴的发现了这个解决方案:Databricks最近开源的Koalas。

Kolas的Readme中是这样写的:

Koalas项目基于Apache Spark实现了pandas DataFrame API,从而使数据科学家能够更有效率的处理大数据。如果你已经熟悉pandas,那么你不需要付出任何学习成本就能使用Spark
一份代码可以同时在pandas(用于测试,小数据集)和Spark(用于分布式datasets)两个平台上运行。


本文我将介绍Koalas为什么值得你去尝试。只需要修改不到1%的pandas代码,我们就能将我们以前的代码跑在Koalas和Spark上。我们将处理时间提升了10倍,从几小时下降到了几分钟。并且我们具备了水平扩展的能力,这使我们能够处理更多的数据。

快速开始

在安装Koalas之前,首先我们需要一个能够运行PySpark的Spark集群。然后我们执行以下命令:

pip install koalas

如果使用conda,则执行以下命令:

conda install koalas -c conda-forge

更详细的信息可以查看Koalas的Readme文档。

安装完之后我们执行一个快速测试:

import databricks.koalas as ks
kdf = ks.DataFrame({'column1':[4.0, 8.0]}, {'column2':[1.0, 2.0]})
kdf

可以看到Koalas渲染出来了pandas风格的interactive tables,这太便利了。

基本操作示例:

首先我们生成了一个含有4列多行的测试数据:

import pandas as pd
## generate 1M rows of test data
pdf = generate_pd_test_data( 1e6 )
pdf.head(3)
>>> timestamp pod_id trip_id speed_mph
0 7.522523 pod_13 trip_6 79.340006
1 22.029855 pod_5 trip_22 65.202122
2 21.473178 pod_20 trip_10 669.901507


  • 免责声明:这只是我们随机生成的一些性能测试数据,不是我们真实的数据。完成的测试脚本在这里:https://gist.github.com/patryk-oleniuk/043f97ae9c405cbd13b6977e7e6d6fbc


我们先描述下我们要处理的业务,比如:每个pod-trip的时间是多少。

这需要进行以下操作:

  1. Group by ['pod_id','trip id']

  2. 对于每个trip, 计算trip_time,计算逻辑为:last timestamp – first timestamp

  3. 计算pod-trip时间分布 (mean, stddev)

Pandas方式(代码短,速度慢):

(snippet #1)

import pandas as pd
# take the grouped.max (last timestamp) and join with grouped.min (first timestamp)
gdf = pdf.groupby(['pod_id','trip_id']).agg({'timestamp': ['min','max']})
gdf.columns = ['timestamp_first','timestamp_last']
gdf['trip_time_sec'] = gdf['timestamp_last'] - gdf['timestamp_first']
gdf['trip_time_hours'] = gdf['trip_time_sec'] / 3600.0
# calculate the statistics on trip times
pd_result = gdf.describe()

使用PySpark方式(代码长,速度快):

(snippet #2)

import pyspark as spark
# import pandas df to spark (this line is not used for profiling)
sdf = spark.createDataFrame(pdf)
# sort by timestamp and groupby
sdf = sdf.sort(desc('timestamp'))
sdf = sdf.groupBy("pod_id", "trip_id").agg(F.max('timestamp').alias('timestamp_last'), F.min('timestamp').alias('timestamp_first'))
# add another column trip_time_sec as the difference between first and last
sdf = sdf.withColumn('trip_time_sec', sdf2['timestamp_last'] - sdf2['timestamp_first'])
sdf = sdf.withColumn('trip_time_hours', sdf3['trip_time_sec'] / 3600.0)
# calculate the statistics on trip times
sdf4.select(F.col('timestamp_last'),F.col('timestamp_first'),F.col('trip_time_sec'),F.col('trip_time_hours')).summary().toPandas()

使用Koalas方式(代码短,速度快):

(snippet #3)

import databricks.koalas as ks
# import pandas df to koalas (and so also spark) (this line is not used for profiling)
kdf = ks.from_pandas(pdf)
# the code below is the same as the pandas version
gdf = kdf.groupby(['pod_id','trip_id']).agg({'timestamp': ['min','max']})
gdf.columns = ['timestamp_first','timestamp_last']
gdf['trip_time_sec'] = gdf['timestamp_last'] - gdf['timestamp_first']
gdf['trip_time_hours'] = gdf['trip_time_sec'] / 3600.0
ks_result = gdf.describe().to_pandas()

注意第一段和第三段中的代码基本上一致,可以无缝迁移到Spark上面。对于大多数pandas代码,你只需要将import pandas改为databricks.koalas as pd,对于某些脚本需要微调还有些限制我们下面将会提到。

结果:
All the snippets have been verified to return the same pod-trip-times results. The describe and summary methods for pandas and Spark are slightly different, as explained here but this should not affect performance too much.
上面三段代码经过验证都返回了一致的结果。pandas和spark的describe
summary
方法不太一样(https://www.kdnuggets.com/2016/01/python-data-science-pandas-spark-dataframe-differences.html)但是这不影响太多性能。

示例的结果

进阶示例:UDFs和复杂操作

接下来我们用相同的的datafrme来尝试解决负责的计算,从而对比出pandas和Koalas的实现上的不同。

目标:分析每个pod-trip的平均速度:

  1. Group by ['pod_id','trip id']

  2. 通过查找下方的速度(时间)表(计算方法连接 

    https://www.quora.com/How-do-I-find-the-total-distance-covered-from-a-velocity-time-graph 计算出每个pod-trip的总距离。

  3. 对group过的df使用timestamp
    列排序。

  4. 计算timestamps的差值。

  5. 将速度和时间戳的差值相乘。-- 这样可以得到在一段时间之内的行驶距离。

  6. 对于distance_travelled
    进行sum计算 – 这样算出每个pod-trip的距离。

  7. 计算 trip time
     as timestamp.last – timestamp.first

  8. 计算 average_speed
     as distance_travelled / trip time

  9. 计算pod-trip时间的分布 (mean, stddev)。


    我们使用custom apply function和UDF (user defined functions)来实现上面的计算过程。

使用pandas的方式:

(snippet #4)

import pandas as pd
def calc_distance_from_speed( gdf ):
gdf = gdf.sort_values('timestamp')
gdf['time_diff'] = gdf['timestamp'].diff()
return pd.DataFrame({
'distance_miles':[ (gdf['time_diff']*gdf['speed_mph']).sum()],
'travel_time_sec': [ gdf['timestamp'].iloc[-1] - gdf['timestamp'].iloc[0] ]
})
results = df.groupby(['pod_id','trip_id']).apply( calculate_distance_from_speed)
results['distance_km'] = results['distance_miles'] * 1.609
results['avg_speed_mph'] = results['distance_miles'] / results['travel_time_sec'] / 60.0
results['avg_speed_kph'] = results['avg_speed_mph'] * 1.609
results.describe()

PySpark的方式:

(snippet #5)

import databricks.koalas as ks
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import *
import pyspark.sql.functions as F
schema = StructType([
StructField("pod_id", StringType()),
StructField("trip_id", StringType()),
StructField("distance_miles", DoubleType()),
StructField("travel_time_sec", DoubleType())
])
@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def calculate_distance_from_speed( gdf ):
gdf = gdf.sort_values('timestamp')
print(gdf)
gdf['time_diff'] = gdf['timestamp'].diff()
return pd.DataFrame({
'pod_id':[gdf['pod_id'].iloc[0]],
'trip_id':[gdf['trip_id'].iloc[0]],
'distance_miles':[ (gdf['time_diff']*gdf['speed_mph']).sum()],
'travel_time_sec': [ gdf['timestamp'].iloc[-1]-gdf['timestamp'].iloc[0] ]
})
sdf = spark_df.groupby("pod_id","trip_id").apply(calculate_distance_from_speed)
sdf = sdf.withColumn('distance_km',F.col('distance_miles') * 1.609)
sdf = sdf.withColumn('avg_speed_mph',F.col('distance_miles')/ F.col('travel_time_sec') / 60.0)
sdf = sdf.withColumn('avg_speed_kph',F.col('avg_speed_mph') * 1.609)
sdf = sdf.orderBy(sdf.pod_id,sdf.trip_id)
sdf.summary().toPandas() # summary calculates almost the same results as describe

使用Koalas的方式:

(snippet #6)

import databricks.koalas as ks
def calc_distance_from_speed_ks( gdf ) -> ks.DataFrame[ str, str, float , float]:
gdf = gdf.sort_values('timestamp')
gdf['meanspeed'] = (gdf['timestamp'].diff()*gdf['speed_mph']).sum()
gdf['triptime'] = (gdf['timestamp'].iloc[-1] - gdf['timestamp'].iloc[0])
return gdf[['pod_id','trip_id','meanspeed','triptime']].iloc[0:1]


kdf = ks.from_pandas(df)
results = kdf.groupby(['pod_id','trip_id']).apply( calculate_distance_from_speed_ks)
# due to current limitations of the package, groupby.apply() returns c0 .. c3 column names
results.columns = ['pod_id', 'trip_id', 'distance_miles', 'travel_time_sec']
# spark groupby does not set the groupby cols as index and does not sort them
results = results.set_index(['pod_id','trip_id']).sort_index()
results['distance_km'] = results['distance_miles'] * 1.609
results['avg_speed_mph'] = results['distance_miles'] / results['travel_time_sec'] / 60.0
results['avg_speed_kph'] = results['avg_speed_mph'] * 1.609
results.describe()

Koalas是基于PySpark的pandas_udf
的来实现的,这就是为什么在定义function的时候需要定义类型提示。这个包的作者介绍了新的类型提示,ks.DataFrame
ks.Series
。但是当前的实现有些笨重并且需要进行相应的修改才能达到相同的数据结果(改变列名,groupby key不返回)。不过上面这些行为都在文档中进行了适当的说明。

性能:

为了评估Koalas的性能,我们对不同的数据量进行了测试。

这个性能测试在Databricks的平台上进行,使用了如下的配置:

  • 1个Spark driver节点 (也用来跑pandas脚本): 8 CPU cores, 61GB RAM.

  • 15个Spark worker 节点: 4CPU cores, 30.5GB RAM (一共: 60CPUs / 457.5GB内存 ).

每次测试重复10次。

基础操作

当数据很少的时候,初始化的时间和数据传输的时间相对于计算时间要长的所以pandas速度快一点(标记a)。当有大量数据的时候,pandas的数据处理时间要多于使用Spark的方案(标记b),我们可以观察到Koalas比PySpark有一些性能损耗,但是随着数据量的增长这个差距在减小(标记c)。

UDFs

在UDF的评测中,PySpark和Koalas文档当中描述了使用UDF会带来性能的大幅下降,所以我们将测试数据的行数减少了100倍。在每个测试用例中,Koalas和PySpark具有相同的性能,这暗示着他们使用了相同的底层实现。在实验过程中,我们发现使用PySpark windows功能性能会得到提升,但是Koalas没有此实现,所以我们仅测试UDF。

讨论

如果你想使你的pandas代码能够具有处理更大数据量和水平扩展的能力,那么Koalas将是一个正确的选择。当快速切换到Koalas上后你可以通过调整你的Spark集群的规模来处理更大的数据量并且显而易见的缩短处理时间。而且性能和PySpark差不多(根据集群规模和数据量的规模,有5%到50%的损失)。

另一方面,Koalas API相对于原生的Spark有一些损耗。如果你对性能非常敏感,你可以考虑使用Scala来实现你的处理逻辑。

限制和不同

当你刚开始接触Koalas的时候,你需要知道“为什么这个没有实现?!” 当前Koalas的包仍然在开发过程中,并且有些pandas API没有实现,但是大多数目前没有的功能会在接下来的几个月中实现(比如说groupby.diff()
或者kdf.rename()

根据我作为这个项目的contributor的经验,有些功能使用Spark API实现起来过于复杂或者有较大性能损耗。比如:DataFrame.values
需要在单一节点的内存中处理整个数据集,所以这是不太合适甚至是不太可能的。取而代之如果你需要在driver中检索结果,你可以调用DataFrame.to_pandas()
或者DataFrame.to_numpy()

另外一个重要的事情是Koalas的执行链和pandas是不同的:当在dataframe上面执行一些操作,会将操作放入一个队列当中,只有需要results的时候才会真正执行完毕。比如:当调用kdf.head()
或者kdf.to_pandas()
操作的时候才会被真正执行。这可能会对一些没有使用过Spark的人造成困扰,因为pandas是按照一行一行来具体执行的。

结论

Koalas帮助我们快速的讲pandas代码迁移到Spark上。如果你在尝试扩展你的pandas代码,你可以尝试使用Koalas。如果你遇到某些问题或者需要更多的功能,你可以提issue  https://github.com/databricks/koalas/issues  给社区,我们将保障Koalas的活跃度并不断的改进。另外的,欢迎来一起贡献代码。

资源

  1. Koalas github: https://github.com/databricks/koalas

  2. Koalas documentation: https://koalas.readthedocs.io

  3. 文章中的代码: https://gist.github.com/patryk-oleniuk/043f97ae9c405cbd13b6977e7e6d6fbc .

  4. 原文链接:https://databricks.com/blog/2019/08/22/guest-blog-how-virgin-hyperloop-one-reduced-processing-time-from-hours-to-minutes-with-koalas.html








推荐阅读
  • 秒建一个后台管理系统?用这5个开源免费的Java项目就够了
    秒建一个后台管理系统?用这5个开源免费的Java项目就够了 ... [详细]
  • 投融资周报 | Circle 达成 4 亿美元融资协议,唯一艺术平台 A 轮融资超千万美元 ... [详细]
  • 如何高效启动大数据应用之旅?
    在前一篇文章中,我探讨了大数据的定义及其与数据挖掘的区别。本文将重点介绍如何高效启动大数据应用项目,涵盖关键步骤和最佳实践,帮助读者快速踏上大数据之旅。 ... [详细]
  • com.sun.javadoc.PackageDoc.exceptions()方法的使用及代码示例 ... [详细]
  • PTArchiver工作原理详解与应用分析
    PTArchiver工作原理及其应用分析本文详细解析了PTArchiver的工作机制,探讨了其在数据归档和管理中的应用。PTArchiver通过高效的压缩算法和灵活的存储策略,实现了对大规模数据的高效管理和长期保存。文章还介绍了其在企业级数据备份、历史数据迁移等场景中的实际应用案例,为用户提供了实用的操作建议和技术支持。 ... [详细]
  • 在Linux系统中避免安装MySQL的简易指南
    在Linux系统中避免安装MySQL的简易指南 ... [详细]
  • 在C#编程中,数值结果的格式化展示是提高代码可读性和用户体验的重要手段。本文探讨了多种格式化方法和技巧,如使用格式说明符、自定义格式字符串等,以实现对数值结果的精确控制。通过实例演示,展示了如何灵活运用这些技术来满足不同的展示需求。 ... [详细]
  • 本文介绍了如何利用Struts1框架构建一个简易的四则运算计算器。通过采用DispatchAction来处理不同类型的计算请求,并使用动态Form来优化开发流程,确保代码的简洁性和可维护性。同时,系统提供了用户友好的错误提示,以增强用户体验。 ... [详细]
  • 在开发过程中,我最初也依赖于功能全面但操作繁琐的集成开发环境(IDE),如Borland Delphi 和 Microsoft Visual Studio。然而,随着对高效开发的追求,我逐渐转向了更加轻量级和灵活的工具组合。通过 CLIfe,我构建了一个高度定制化的开发环境,不仅提高了代码编写效率,还简化了项目管理流程。这一配置结合了多种强大的命令行工具和插件,使我在日常开发中能够更加得心应手。 ... [详细]
  • 在进行网络编程时,准确获取本地主机的IP地址是一项基本但重要的任务。Winsock作为20世纪90年代初由Microsoft与多家公司共同制定的Windows平台网络编程接口,为开发者提供了一套高效且易用的工具。通过Winsock,开发者可以轻松实现网络通信功能,并准确获取本地主机的IP地址,从而确保应用程序在网络环境中的稳定运行。此外,了解Winsock的工作原理及其API函数的使用方法,有助于提高开发效率和代码质量。 ... [详细]
  • Flutter 2.* 路由管理详解
    本文详细介绍了 Flutter 2.* 中的路由管理机制,包括路由的基本概念、MaterialPageRoute 的使用、Navigator 的操作方法、路由传值、命名路由及其注册、路由钩子等。 ... [详细]
  • 如何使用 `org.apache.poi.openxml4j.opc.PackagePart` 类中的 `loadRelationships()` 方法及其代码示例详解 ... [详细]
  • 本文详细介绍了在CentOS 6.5 64位系统上使用阿里云ECS服务器搭建LAMP环境的具体步骤。首先,通过PuTTY工具实现远程连接至服务器。接着,检查当前系统的磁盘空间使用情况,确保有足够的空间进行后续操作,可使用 `df` 命令进行查看。此外,文章还涵盖了安装和配置Apache、MySQL和PHP的相关步骤,以及常见问题的解决方法,帮助用户顺利完成LAMP环境的搭建。 ... [详细]
  • 第二章:Kafka基础入门与核心概念解析
    本章节主要介绍了Kafka的基本概念及其核心特性。Kafka是一种分布式消息发布和订阅系统,以其卓越的性能和高吞吐量而著称。最初,Kafka被设计用于LinkedIn的活动流和运营数据处理,旨在高效地管理和传输大规模的数据流。这些数据主要包括用户活动记录、系统日志和其他实时信息。通过深入解析Kafka的设计原理和应用场景,读者将能够更好地理解其在现代大数据架构中的重要地位。 ... [详细]
  • 在第二课中,我们将深入探讨Scala的面向对象编程核心概念及其在Spark源码中的应用。首先,通过详细的实战案例,全面解析Scala中的类和对象。作为一门纯面向对象的语言,Scala的类设计和对象使用是理解其面向对象特性的关键。此外,我们还将介绍如何通过阅读Spark源码来进一步巩固对这些概念的理解。这不仅有助于提升编程技能,还能为后续的高级应用开发打下坚实的基础。 ... [详细]
author-avatar
mobiledu2502929507
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有