作者:上海外国灬语大学-校友粉丝团 | 来源:互联网 | 2023-02-03 14:32
MongoDB中我的集合中的所有文档都具有相同的字段.我的目标是将它们加载到Python中pandas.DataFrame
或dask.DataFrame
.
我想通过并行化来加速加载过程.我的计划是生成几个进程或线程.每个进程都会加载一个集合的块,然后这些块将合并在一起.
如何使用MongoDB正确完成?
我尝试过类似PostgreSQL的方法.我最初的想法是在SQL查询中使用SKIP
和LIMIT
.它失败了,因为为每个特定查询打开的每个游标都从头开始读取数据表,只是跳过了指定的行数.所以我必须创建包含记录号的附加列,并在查询中指定这些数字的范围.
相反,MongoDB为每个文档分配唯一的ObjectID.但是,我发现不可能从另一个ObjectID中减去一个ObjectID,它们只能与排序操作进行比较:less,greater和equal.
另外,pymongo
返回支持索引操作的游标对象,并且有一些看似对我的任务有用的方法,比如count
,limit
.
用于Spark的MongoDB连接器以某种方式完成此任务.不幸的是,我对Scala并不熟悉,因此,我很难找到他们是如何做到的.
那么,从Mongo并行加载数据到python的正确方法是什么?
到目前为止,我已经达到了以下解决方案:
import pandas as pd
import dask.dataframe as dd
from dask.delayed import delayed
# import other modules.
collection = get_mongo_collection()
cursor = collection.find({ })
def process_document(in_doc):
out_doc = # process doc keys and values
return pd.DataFrame(out_doc)
df = dd.from_delayed( (delayed(process_document)(d) for d in cursor) )
但是,它看起来像是dask.dataframe.from_delayed
从传递的生成器内部创建一个列表,有效地在一个线程中加载所有集合.
更新.我在docs中发现,这种skip
方法pymongo.Cursor
从集合的开头也开始,就像PostgreSQL一样.同一页面建议在应用程序中使用分页逻辑.到目前为止,我发现的解决方案使用_id
了这个分类.但是,它们也存储在最后一次看到_id
,这意味着它们也可以在单个线程中工作.
Update2.我在官方的MongoDb Spark连接器中找到了分区器的代码:https://github.com/mongodb/mongo-spark/blob/7c76ed1821f70ef2259f8822d812b9c53b6f2b98/src/main/scala/com/mongodb/spark/rdd/partitioner/ MongoPaginationPartitioner.scala#L32
看起来,最初这个分区器从集合中的所有文档中读取关键字段并计算值的范围.
Update3:我的解决方案不完整.
不起作用,从pymongo获取异常,因为dask似乎错误地处理了Collection
对象:
/home/user/.conda/envs/MBA/lib/python2.7/site-packages/dask/delayed.pyc in (***failed resolving arguments***)
81 return expr, {}
82 if isinstance(expr, (Iterator, list, tuple, set)):
---> 83 args, dasks = unzip((to_task_dask(e) for e in expr), 2)
84 args = list(args)
85 dsk = sharedict.merge(*dasks)
/home/user/.conda/envs/MBA/lib/python2.7/site-packages/pymongo/collection.pyc in __next__(self)
2342
2343 def __next__(self):
-> 2344 raise TypeError("'Collection' object is not iterable")
2345
2346 next = __next__
TypeError: 'Collection' object is not iterable
是什么引发了异常:
def process_document(in_doc, other_arg):
# custom processing of incoming records
return out_doc
def compute_id_ranges(collection, query, partition_size=50):
cur = collection.find(query, {'_id': 1}).sort('_id', pymongo.ASCENDING)
id_ranges = [cur[0]['_id']]
count = 1
for r in cur:
count += 1
if count > partition_size:
id_ranges.append(r['_id'])
count = 0
id_ranges.append(r['_id'])
return zip(id_ranges[:len(id_ranges)-1], id_ranges[1: ])
def load_chunk(id_pair, collection, query={}, projection=None):
q = query
q.update( {"_id": {"$gte": id_pair[0], "$lt": id_pair[1]}} )
cur = collection.find(q, projection)
return pd.DataFrame([process_document(d, other_arg) for d in cur])
def parallel_load(*args, **kwargs):
collection = kwargs['collection']
query = kwargs.get('query', {})
projection = kwargs.get('projection', None)
id_ranges = compute_id_ranges(collection, query)
dfs = [ delayed(load_chunk)(ir, collection, query, projection) for ir in id_ranges ]
df = dd.from_delayed(dfs)
return df
collection = connect_to_mongo_and_return_collection_object(credentials)
# df = parallel_load(collection=collection)
id_ranges = compute_id_ranges(collection)
dedf = delayed(load_chunk)(id_ranges[0], collection)
load_chunk
直接调用时完美运行.但是,delayed(load_chunk)( blah-blah-blah )
如上所述,调用失败,异常.