作者:才女与尔同销万古愁 | 来源:互联网 | 2023-07-20 16:12
在MongoDB中使用批量操作或块操作「BulkWrite」在效率上有非常大的提升,适合大量写操作第一次尝试使用批量操作进行数据清洗,并且用PyMongo模拟了少量数
在MongoDB中使用批量操作或块操作「Bulk Write」在效率上有非常大的提升,适合大量写操作
第一次尝试使用批量操作进行数据清洗,并且用PyMongo模拟了少量数据来进行测试,构造50w条数据进行插入或更新操作。
模拟环境:
PyMongo 3.6.1
MongoDB 3.4.7
Python 3.6.4 :: Anaconda, Inc.
模拟数据项:
items = [
{'i': 0},
{'i': 1},
{'i': 2},
{'i': 3},
{'i': 4},
...
{'i': 500000},
]
按条插入/更新的情况如下:
方法 |
总数 |
单次条数 |
时间 |
语句 |
save |
50w |
1 |
00:02:54 |
db[‘test’].save(item) |
insert |
50w |
1 |
00:02:50 |
db[‘test’].insert(item) |
insert批量插入的情况如下:
方法 |
总数 |
单次条数 |
时间 |
语句 |
insert |
50w |
1k |
00:00:07 |
db[‘test’].insert(items) |
insert |
50w |
10k |
00:00:08 |
db[‘test’].insert(items) |
块操作的情况如下:
方法 |
总数 |
单次 |
时间 |
语句 |
bulk_write + InsertOne |
50w |
1k |
00:00:09 |
db[‘test’].bulk_write(list(map(InsertOne, items))) |
bulk_write + InsertOne |
50w |
10k |
00:00:07 |
db[‘test’].bulk_write(list(map(InsertOne, items))) |
bulk_write + InsertOne |
50w |
50w |
00:00:09 |
db[‘test’].bulk_write(list(map(InsertOne, items))) |
bulk_write + ReplaceOne |
50w |
1k |
00:00:20 |
db[‘test’].bulk_write(list(map(lambda item: ReplaceOne({‘_id’: item[‘_id’]}, item, upsert=True), items))) |
bulk_write + ReplaceOne |
50w |
10k |
00:00:21 |
db[‘test’].bulk_write(list(map(lambda item: ReplaceOne({‘_id’: item[‘_id’]}, item, upsert=True), items))) |
bulk_write + ReplaceOne |
50w |
50w |
00:00:22 |
db[‘test’].bulk_write(list(map(lambda item: ReplaceOne({‘_id’: item[‘_id’]}, item, upsert=True), items))) |
bulk_write + UpdateOne |
50w |
1k |
00:00:20 |
db[‘test’].bulk_write(list(map(lambda item: UpdateOne({‘_id’: item[‘_id’]}, {‘$set’: {‘i’: item[‘i’]}}, upsert=True),items))) |
bulk_write + UpdateOne |
50w |
10k |
00:00:21 |
db[‘test’].bulk_write(list(map(lambda item: UpdateOne({‘_id’: item[‘_id’]}, {‘$set’: {‘i’: item[‘i’]}}, upsert=True),items))) |
bulk_write + UpdateOne |
50w |
50w |
00:00:22 |
db[‘test’].bulk_write(list(map(lambda item: UpdateOne({‘_id’: item[‘_id’]}, {‘$set’: {‘i’: item[‘i’]}}, upsert=True),items))) |
bulk_write + UpdateOne + InsertOne |
100w |
10k |
00:00:38 |
db[‘test’].bulk_write(list(map(InsertOne, items1)) + list(map(lambda item: UpdateOne({‘_id’: item[‘_id’]}, {‘$set’: {‘i’: 0}}, upsert=True),items2))) |
模拟代码如下:
import pymongo
import time
from pymongo import InsertOne, ReplaceOne, UpdateOne
from pymongo.errors import BulkWriteError
settings = {
'MONGO_HOST': "***",
'MONGO_PORT': ***,
'MONGO_DB': "***",
'MONGO_USER': "***",
'MONGO_PSW': "***",
}
client = pymongo.MongoClient(host=settings['MONGO_HOST'],port=settings['MONGO_PORT'])
client.admin.authenticate(settings['MONGO_USER'], settings['MONGO_PSW'],mechanism='SCRAM-SHA-1')
db = client[settings['MONGO_DB']]
l1 = []
for i in range(500000, 1000001):
l1.append({'i': i})
l2 = list(db['test'].find({}))
start_time = time.time()
page = 0
count = 10000
while True:
skip = page * count
page = page + 1
items1 = l1[skip:skip + count]
items2 = l2[skip:skip + count]
items = list(map(InsertOne, items1)) + list(map(InsertOne, items1))
try:
db['test'].bulk_write( \
list(map(InsertOne, items1)) + \
list(map(lambda item: UpdateOne({'_id': item['_id']}, {'$set': {'i': 0}}, upsert=True),items2)))
except BulkWriteError as bwe:
print(bwe.details)
else:
print(page)
if page == 50:
break
end_time = time.time()
consume_time = end_time - start_time
consume_time = '{:0>2s}'.format(str(int(consume_time
+ ':{:0>2s}'.format(str(int((consume_time
+ ':{:0>2s}'.format(str(int(consume_time % 60)))
print(consume_time)
注意:bulk_write(list)传入的list不能为空,会出现报错信息。
经过测试,可以看到批量操作与单条操作的写入效率相差非常大,Insert批量插入与Bulk Write快操作效率基本相同。
但bulk_write()可以将增删改操作合在一起,具有更好的灵活性。
吐槽:手贱循环了一个亿的数据进列表,系统直接跑死机了,PyCharm/SecureCRT/Studio 3T环境全部崩溃,连搜狗输入法都崩了!!!摔!!!