我想在大型数据集上使用多处理来查找两个gps点之间的距离.我构建了一个测试集,但是我无法使用多处理来处理这个集合.
import pandas as pd
from geopy.distance import vincenty
from itertools import combinations
import multiprocessing as mp
df = pd.DataFrame({'ser_no': [1, 2, 3, 4, 5, 6, 7, 8, 9, 0],'co_nm': ['aa', 'aa', 'aa', 'bb', 'bb', 'bb', 'bb', 'cc', 'cc', 'cc'],'lat': [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],'lon': [21, 22, 23, 24, 25, 26, 27, 28, 29, 30]})
def calc_dist(x):
return pd.DataFrame(
[ [grp, df.loc[c[0]].ser_no, df.loc[c[1]].ser_no, vincenty(df.loc[c[0], x], df.loc[c[1], x]) ] for grp,lst in df.groupby('co_nm').groups.items() for c in combinations(lst, 2)
],
columns=['co_nm','machineA','machineB','distance'])
if __name__ == '__main__':
pool = mp.Pool(processes = (mp.cpu_count() - 1))
pool.map(calc_dist, ['lat','lon'])
pool.close()
pool.join()
当发生此错误时,我在Windows7 Professional上使用Python 2.7.11和Ipython 4.1.2与Anaconda 2.5.0 64位.
runfile(‘C:/…/Desktop/multiprocessing test.py’, wdir=’C:/…/Desktop’)
Traceback (most recent call last):
File “”, line 1, in
runfile(‘C:/…/Desktop/multiprocessing test.py’, wdir=’C:/…/Desktop’)
File “C:…\Local\Continuum\Anaconda2\lib\site-packages\spyderlib\widgets\externalshell\sitecustomize.py”, line 699, in runfile
execfile(filename, namespace)
File “C:…\Local\Continuum\Anaconda2\lib\site-packages\spyderlib\widgets\externalshell\sitecustomize.py”, line 74, in execfile
exec(compile(scripttext, filename, ‘exec’), glob, loc)
File “C:/…./multiprocessing test.py”, line 33, in
pool.map(calc_dist, [‘lat’,’lon’])
File “C:…\AppData\Local\Continuum\Anaconda2\lib\multiprocessing\pool.py”, line 251, in map
return self.map_async(func, iterable, chunksize).get()
File “C:…\Local\Continuum\Anaconda2\lib\multiprocessing\pool.py”, line 567, in get
raise self._value
TypeError: Failed to create Point instance from 1.
def get(self, timeout=None):
self.wait(timeout)
if not self._ready:
raise TimeoutError
if self._success:
return self._value
else:
raise self._value
解决方法:
怎么了
您的代码中的这一行:
pool.map(calc_dist, ['lat','lon'])
产生2个进程 – 一个运行calc_dist(‘lat’),另一个运行calc_dist(‘lon’).比较doc中的第一个示例.(基本上,pool.map(f,[1,2,3])使用下面列表中给出的参数调用f三次:f(1),f(2)和f( 3).)如果我没弄错,你的函数calc_dist只能被称为calc_dist(‘lat’,’lon’).它不允许并行处理.
解
我相信你想要在进程之间拆分工作,可能会将每个元组(grp,lst)发送到一个单独的进程.以下代码就是这样做的.
首先,让我们准备分裂:
grp_lst_args = list(df.groupby('co_nm').groups.items())
print(grp_lst_args)
[('aa', [0, 1, 2]), ('cc', [7, 8, 9]), ('bb', [3, 4, 5, 6])]
我们将发送这些元组中的每一个(这里,其中有三个)作为单独进程中函数的参数.我们需要重写函数,我们称之为calc_dist2.为方便起见,它的参数是一个元组,如calc_dist2((‘aa’,[0,1,2]))
def calc_dist2(arg):
grp, lst = arg
return pd.DataFrame(
[ [grp, df.loc[c[0]].ser_no, df.loc[c[1]].ser_no, vincenty(df.loc[c[0], ['lat','lon']], df.loc[c[1], ['lat','lon']]) ] for c in combinations(lst, 2)
],
columns=['co_nm','machineA','machineB','distance'])
现在来了多处理:
pool = mp.Pool(processes = (mp.cpu_count() - 1))
results = pool.map(calc_dist2, grp_lst_args)
pool.close()
pool.join()
results_df = pd.concat(results)
results是grp_lst_args中(grp,lst)的调用calc_dist2((grp,lst))的结果列表(此处为数据框).结果元素稍后连接到一个数据框.
print(results_df)
co_nm machineA machineB distance
0 aa 1 2 156.876149391 km
1 aa 1 3 313.705445447 km
2 aa 2 3 156.829329105 km
0 cc 8 9 156.060165391 km
1 cc 8 0 311.910998169 km
2 cc 9 0 155.851498134 km
0 bb 4 5 156.665641837 km
1 bb 4 6 313.214333025 km
2 bb 4 7 469.622535339 km
3 bb 5 6 156.548897414 km
4 bb 5 7 312.957597466 km
5 bb 6 7 156.40899677 km
顺便说一句,在Python 3中我们可以使用带构造:
with mp.Pool() as pool:
results = pool.map(calc_dist2, grp_lst_args)
更新
我只在linux上测试过这段代码.在linux上,只读数据框df可以被子进程访问,并且不会被复制到它们的内存空间,但我不确定它在Windows上是如何工作的.您可以考虑将df拆分为块(按co_nm分组)并将这些块作为参数发送到某个其他版本的calc_dist.