作者:一切随缘2502885767 | 来源:互联网 | 2023-05-18 08:42
场景描述: 数据清洗过程中发现,Mongodb中单个集合(表)数据量比较大(千万),定时任务处理加工数据时,由于服务器内存有限,会在处理过程中,出现内存不足的异常,于是希望把单个
场景描述:
数据清洗过程中发现,Mongodb中单个集合(表)数据量比较大(千万),定时任务处理加工数据时,由于服务器内存有限,会在处理过程中,出现内存不足的异常,于是希望把单个集合进行拆分成多个,在网上找了相关文章,发现处理这种的场景的案例比较少,基本上考虑了如下3中方案来实现:
1. mongodb sharding 分片 ---还未研究透彻
2. mongodb 聚合查询,将结果封装成变量,循环遍历,写入新的集合 ---测试了下,效率较低 --文末会说明该方案,适用于小数据量
3. 自定义python脚本 ---自己动手,丰衣足食
处理方式:
这里采用了自己写脚本处理的方式,好了!逻辑比较简单,就是将源数据对象读取,循环遍历,到达自己指定的count计数后,开多线程,将符合条件的数据写入新集合(表)。
脚本内容:小伙伴们,可以根据自己实际需求修改脚本内相应的条件,有更好的实现方案,欢迎交流。---脚本运行依赖包 pymongo
---说明:由于pool.map(thread_insert_001, source_rows_list) map方法,没有找到传递多个参数的方式,因此脚本中if else和重复函数较多,封装的不好。
# -*- coding: utf-8 -*-
import datetime
import json
# from bson import ObjectId
from pymongo import MongoClient
from multiprocessing.pool import ThreadPool
# 将3千万条数据拆分成多个集合,每400万一组
# 大概拆分成8组
# mongodb settings
settings = {
"ip": '127.0.0.1', # ip
"port": 27007, # 端口
"db_name": "add_your_db_name", # 数据库名字
"set_name": "add_your_collection_name" , # 集合名字
"new_db_name": "add_your_target_db_name",
"target_set_name_001": "coll_001",
"target_set_name_002": "coll_002",
"target_set_name_003": "coll_003",
"target_set_name_004": "coll_004",
"target_set_name_005": "coll_005",
"target_set_name_006": "coll_006",
"target_set_name_007": "coll_007",
"target_set_name_008": "coll_008",
}
class MyMongoDB():
def __init__(self):
try:
self.conn = MongoClient(settings["ip"], settings["port"])
except Exception as e:
print(e)
self.db = self.conn[settings["db_name"]]
self.my_set = self.db[settings["set_name"]]
def insert(self, dic):
print("inser...")
if dic:
self.my_set.insert(dic)
else:
pass
def update(self, dic, newdic):
print("update...")
self.my_set.update(dic, newdic)
def delete(self, dic):
print("delete...")
self.my_set.remove(dic)
def dbfind(self, dic=None):
print("find...")
current_set = self.my_set
data = current_set.find()
return data
def thread_insert_001(rows_dic):
conn_mongodb = MongoClient(settings["ip"], settings["port"])
new_db = conn_mongodb[settings["new_db_name"]]
new_collect_set = new_db[settings["target_set_name_001"]]
new_collect_set.insert(rows_dic)
def thread_insert_002(rows_dic):
conn_mongodb = MongoClient(settings["ip"], settings["port"])
new_db = conn_mongodb[settings["new_db_name"]]
new_collect_set = new_db[settings["target_set_name_002"]]
new_collect_set.insert(rows_dic)
def thread_insert_003(rows_dic):
conn_mongodb = MongoClient(settings["ip"], settings["port"])
new_db = conn_mongodb[settings["new_db_name"]]
new_collect_set = new_db[settings["target_set_name_003"]]
new_collect_set.insert(rows_dic)
def thread_insert_004(rows_dic):
conn_mongodb = MongoClient(settings["ip"], settings["port"])
new_db = conn_mongodb[settings["new_db_name"]]
new_collect_set = new_db[settings["target_set_name_004"]]
new_collect_set.insert(rows_dic)
def thread_insert_005(rows_dic):
conn_mongodb = MongoClient(settings["ip"], settings["port"])
new_db = conn_mongodb[settings["new_db_name"]]
new_collect_set = new_db[settings["target_set_name_005"]]
new_collect_set.insert(rows_dic)
def thread_insert_006(rows_dic):
conn_mongodb = MongoClient(settings["ip"], settings["port"])
new_db = conn_mongodb[settings["new_db_name"]]
new_collect_set = new_db[settings["target_set_name_006"]]
new_collect_set.insert(rows_dic)
def thread_insert_007(rows_dic):
conn_mongodb = MongoClient(settings["ip"], settings["port"])
new_db = conn_mongodb[settings["new_db_name"]]
new_collect_set = new_db[settings["target_set_name_007"]]
new_collect_set.insert(rows_dic)
def thread_insert_008(rows_dic):
conn_mongodb = MongoClient(settings["ip"], settings["port"])
new_db = conn_mongodb[settings["new_db_name"]]
new_collect_set = new_db[settings["target_set_name_008"]]
new_collect_set.insert(rows_dic)
def split_mg_dt_col_to_new_col():
# mongodb 旧集合数据 拆分 写入新集合
my_mongodb_obj = MyMongoDB()
data = my_mongodb_obj.dbfind()
# 开10个线程,根据服务器配置修改
pool = ThreadPool(10)
count = 1
source_rows_list = []
for result in data:
count += 1
source_rows_list.append(result)
# print(source_rows_list)
if count < 4000000:
if len(source_rows_list) == 2000:
print('11111111111111111')
print(count)
print(len(source_rows_list))
insert_dic = pool.map(thread_insert_001, source_rows_list)
del source_rows_list[:]
elif 4000000 <= count < 8000000:
if len(source_rows_list) == 2000:
print('222222222222222222')
print(count)
print(len(source_rows_list))
insert_dic = pool.map(thread_insert_002, source_rows_list)
del source_rows_list[:]
elif 8000000 <= count < 12000000:
if len(source_rows_list) == 2000:
print('333333333333333333')
print(count)
print(len(source_rows_list))
insert_dic = pool.map(thread_insert_003, source_rows_list)
del source_rows_list[:]
elif 12000000 <= count < 16000000:
if len(source_rows_list) == 2000:
print('44444444444444444444')
print(count)
print(len(source_rows_list))
insert_dic = pool.map(thread_insert_004, source_rows_list)
del source_rows_list[:]
elif 16000000 <= count < 20000000:
if len(source_rows_list) == 2000:
print('5555555555555555555555')
print(count)
print(len(source_rows_list))
insert_dic = pool.map(thread_insert_005, source_rows_list)
del source_rows_list[:]
elif 20000000 <= count < 24000000:
if len(source_rows_list) == 2000:
print('666666666666666666666')
print(count)
print(len(source_rows_list))
insert_dic = pool.map(thread_insert_006, source_rows_list)
del source_rows_list[:]
elif 24000000 <= count < 28000000:
if len(source_rows_list) == 2000:
print('7777777777777777777')
print(count)
print(len(source_rows_list))
insert_dic = pool.map(thread_insert_007, source_rows_list)
del source_rows_list[:]
elif 28000000 <= count < 32000000:
if len(source_rows_list) == 2000:
print('88888888888888888')
print(count)
print(len(source_rows_list))
insert_dic = pool.map(thread_insert_008, source_rows_list)
del source_rows_list[:]
pool.close()
if __name__ == '__main__':
split_mg_dt_col_to_new_col()
拆分脚本-展开查看