热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Python:在pandas数据帧上使用多处理

我想在大型数据集上使用多处理来查找两个gps点之间的距离.我构建了一个测试集,但是我无法使用多处理来处理这个集合.importpandasaspdfrom

我想在大型数据集上使用多处理来查找两个gps点之间的距离.我构建了一个测试集,但是我无法使用多处理来处理这个集合.

import pandas as pd
from geopy.distance import vincenty
from itertools import combinations
import multiprocessing as mp
df = pd.DataFrame({'ser_no': [1, 2, 3, 4, 5, 6, 7, 8, 9, 0],'co_nm': ['aa', 'aa', 'aa', 'bb', 'bb', 'bb', 'bb', 'cc', 'cc', 'cc'],'lat': [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],'lon': [21, 22, 23, 24, 25, 26, 27, 28, 29, 30]})
def calc_dist(x):
return pd.DataFrame(
[ [grp, df.loc[c[0]].ser_no, df.loc[c[1]].ser_no, vincenty(df.loc[c[0], x], df.loc[c[1], x]) ] for grp,lst in df.groupby('co_nm').groups.items() for c in combinations(lst, 2)
],
columns=['co_nm','machineA','machineB','distance'])
if __name__ == '__main__':
pool = mp.Pool(processes = (mp.cpu_count() - 1))
pool.map(calc_dist, ['lat','lon'])
pool.close()
pool.join()

当发生此错误时,我在Windows7 Professional上使用Python 2.7.11和Ipython 4.1.2与Anaconda 2.5.0 64位.

runfile(‘C:/…/Desktop/multiprocessing test.py’, wdir=’C:/…/Desktop’)

Traceback (most recent call last):

File “”, line 1, in

runfile(‘C:/…/Desktop/multiprocessing test.py’, wdir=’C:/…/Desktop’)

File “C:…\Local\Continuum\Anaconda2\lib\site-packages\spyderlib\widgets\externalshell\sitecustomize.py”, line 699, in runfile

execfile(filename, namespace)

File “C:…\Local\Continuum\Anaconda2\lib\site-packages\spyderlib\widgets\externalshell\sitecustomize.py”, line 74, in execfile

exec(compile(scripttext, filename, ‘exec’), glob, loc)

File “C:/…./multiprocessing test.py”, line 33, in

pool.map(calc_dist, [‘lat’,’lon’])

File “C:…\AppData\Local\Continuum\Anaconda2\lib\multiprocessing\pool.py”, line 251, in map

return self.map_async(func, iterable, chunksize).get()

File “C:…\Local\Continuum\Anaconda2\lib\multiprocessing\pool.py”, line 567, in get

raise self._value

TypeError: Failed to create Point instance from 1.

def get(self, timeout=None):
self.wait(timeout)
if not self._ready:
raise TimeoutError
if self._success:
return self._value
else:
raise self._value

解决方法:

怎么了

您的代码中的这一行:

pool.map(calc_dist, ['lat','lon'])

产生2个进程 – 一个运行calc_dist(‘lat’),另一个运行calc_dist(‘lon’).比较doc中的第一个示例.(基本上,pool.map(f,[1,2,3])使用下面列表中给出的参数调用f三次:f(1),f(2)和f( 3).)如果我没弄错,你的函数calc_dist只能被称为calc_dist(‘lat’,’lon’).它不允许并行处理.

我相信你想要在进程之间拆分工作,可能会将每个元组(grp,lst)发送到一个单独的进程.以下代码就是这样做的.

首先,让我们准备分裂:

grp_lst_args = list(df.groupby('co_nm').groups.items())
print(grp_lst_args)
[('aa', [0, 1, 2]), ('cc', [7, 8, 9]), ('bb', [3, 4, 5, 6])]

我们将发送这些元组中的每一个(这里,其中有三个)作为单独进程中函数的参数.我们需要重写函数,我们称之为calc_dist2.为方便起见,它的参数是一个元组,如calc_dist2((‘aa’,[0,1,2]))

def calc_dist2(arg):
grp, lst = arg
return pd.DataFrame(
[ [grp, df.loc[c[0]].ser_no, df.loc[c[1]].ser_no, vincenty(df.loc[c[0], ['lat','lon']], df.loc[c[1], ['lat','lon']]) ] for c in combinations(lst, 2)
],
columns=['co_nm','machineA','machineB','distance'])

现在来了多处理:

pool = mp.Pool(processes = (mp.cpu_count() - 1))
results = pool.map(calc_dist2, grp_lst_args)
pool.close()
pool.join()
results_df = pd.concat(results)

results是grp_lst_args中(grp,lst)的调用calc_dist2((grp,lst))的结果列表(此处为数据框).结果元素稍后连接到一个数据框.

print(results_df)
co_nm machineA machineB distance
0 aa 1 2 156.876149391 km
1 aa 1 3 313.705445447 km
2 aa 2 3 156.829329105 km
0 cc 8 9 156.060165391 km
1 cc 8 0 311.910998169 km
2 cc 9 0 155.851498134 km
0 bb 4 5 156.665641837 km
1 bb 4 6 313.214333025 km
2 bb 4 7 469.622535339 km
3 bb 5 6 156.548897414 km
4 bb 5 7 312.957597466 km
5 bb 6 7 156.40899677 km

顺便说一句,在Python 3中我们可以使用带构造:

with mp.Pool() as pool:
results = pool.map(calc_dist2, grp_lst_args)

更新

我只在linux上测试过这段代码.在linux上,只读数据框df可以被子进程访问,并且不会被复制到它们的内存空间,但我不确定它在Windows上是如何工作的.您可以考虑将df拆分为块(按co_nm分组)并将这些块作为参数发送到某个其他版本的calc_dist.


推荐阅读
author-avatar
zx15899966868
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有