作者:手浪用户2602925023 | 来源:互联网 | 2023-06-18 20:22
我有一个约2亿行,约10个分组变量和约20个变量相加的数据集,并且是一个约50GB的csv。我要做的第一件事是查看运行时是按顺序但按块顺序运行的。这有点复杂,因为某些groupby实际上处于另一个聚合级别的另一个数据集中,因此只有200mb。所以现在相关的代码如下:
group_cols = ['cols','to','group','by']
cols_to_summarize = ['cols','summarize']
groupbys = []
df = pd.read_csv("file/path/df.csv",chunksize=1000000)
for chunk in df:
chunk = chunk.merge(other_df,left_on="id",right_index=True,how="inner")
groupbys.append(chunk.groupby(group_cols)[cols_to_summarize].sum())
finalAgg = pd.concat(groupbys).groupby(group_cols)[cols_to_summarize].sum()
每个块大约需要5秒钟来处理,因此200个块大约需要15-20分钟。我正在使用的服务器具有16个内核,所以我希望在这里获得一些提速,如果可以将它提高到2-3分钟,那将是惊人的。
但是,当我尝试使用多进程时,我正在竭力使速度大大提高。基于我的谷歌搜索,我认为这将有助于读取CSV,但我想知道是否多个进程无法读取同一CSV,也许我应该先将其拆分?这是我尝试过的方法,它花了比顺序运行更长的时间:
def agg_chunk(start):
[pull in small dataset]
chunk = pd.read_csv("file/path/df.csv",skiprows=range(1,start+1),nrows=1000000)
chunk = chunk.merge(other_df,how="inner")
return chunk.groupby(group_cols)[cols_to_summarize].sum()
if __name__ == "__main__":
pool = mp.Pool(16)
r = list(np.array(range(200))*1000000)
groupbys = pool.map(agg_chunk,r)
finalAgg = pd.concat(groupbys).groupby(group_cols)[cols_to_summarize].sum()
有更好的方法吗?额外的[拉入小型数据集]块需要大约5秒钟,但是将每个进程的时间加倍,然后除以16,仍然应该是一个相当不错的提速对吗?相反,并行版本已运行了半个小时,但仍未完成。还有什么方法可以将数据集传递给每个过程,而不是让每个过程都重新创建一次?