对于处理读取大批量MongoDB数据的需求,一般采用通过游标分批读取数据,逐批按需求处理数据(数据治理)方案,这样过程思维清晰,和Oracle、MySQL等传统数据库处理方式类似。
缺点是大数据量时速度较慢,而且需要调优游标批量处理量(batch_size),例如我处理读取100万条数据时,耗费我大概5天时间,而直接采用Pandas工具一次性读取MongoDB数据,仅仅用时了不到5分钟。
主要过程思路是Pandas一次性读取数据,数据接口仍是pymongo,直接创建DataFrame,构建Pandas数据表,特殊点是默认带“_id”,如下所示:
client = pymongo.MongoClient('mongodb://localhost:27017')
db = client["oildepot"]
collection = db["TCS_OilCanHistory"]
df = pd.DataFrame(list(collection.find({'OilStockCode':'K1060030002','MearsureTime':{'$gt':'2019/7/1'}},{'OilStockCode':1,'OilCanID':1})))
由于目前计算机内存都比较多,就不在乎多读入的内容,读入后再分别删除多余的列,替换数据标签、数据治理,主要过程参考如下:
(1)删除某列,可以特指“_id”:
df.drop(['_id'],axis=1, inplace=True)
(2)数据截断处理:
#按“.“截断时间字符串,取分割的第一个
df.loc[:,'MearsureTime'] = df['MearsureTime'].str.split('.',expand=True)[0]
(3)通过lambda表达式替换数据:
def function(key):
CanStatus = {'出油':-1,'静止':0,'进油':1}
return CanStatus.get(key)
#使用lambda表达式替换字符串为数值(油罐状态)
df.loc[:,'OilCanStatus'] = df.apply(lambda x: function(x.OilCanStatus), axis = 1)
(4)替换数据标签,放在最后,便于中间操作:
df.rename(columns={'OilStockCode':'油库编码','OilCanID':'油罐编码'},inplace = True)
最后,合并完整代码如下:
import pymongo
import pandas as pd
def get_data():
client = pymongo.MongoClient('mongodb://localhost:27017')
db = client["oildepot"]
collection = db["TCS_OilCanHistory"]
def function(key):
CanStatus = {
'出油':-1,'静止':0,'进油':1}
return CanStatus.get(key)
df = pd.DataFrame(list(collection.find({
'OilStockCode':'K1060030002','MearsureTime':{
'$gt':'2019/7/1'}},{
'OilStockCode':1,'OilCanID':1,'OilCode':1,
'WatchDensity':1,'MearsureTime':1,'WatchTemperature':1,'StandardDensity':1,'OilTemperature':1,'OilCanStatus':1,
'LiquidLevel':1,'LiquidVolume':1,'VCF':1,'OilWeight':1,'FloatingRoofSpeed':1,'OutletPipeSpeed':1,'FlowRate':1,
'InletPipeSpeed':1})))
df.drop(['_id'],axis=1, inplace=True)
#df['MearsureTime'].str[1:4]
#按“.“截断时间字符串,取分割的第一个
df.loc[:,'MearsureTime'] = df['MearsureTime'].str.split('.',expand=True)[0]
#使用lambda表达式替换字符串为数值(油罐状态)
df.loc[:,'OilCanStatus'] = df.apply(lambda x: function(x.OilCanStatus), axis = 1)
df.rename(columns={
'OilStockCode':'油库编码','OilCanID':'油罐编码','OilCode':'油品编码','WatchDensity':'视密度','MearsureTime':'采集时间',
'WatchTemperature':'视油温','StandardDensity':'密度','OilTemperature':'油温','OilCanStatus':'状态','LiquidLevel':'液位',
'LiquidVolume':'体积','VCF':'体积修正次数','OilWeight':'重量','FloatingRoofSpeed':'浮盘流速','OutletPipeSpeed':'出油管流度',
'FlowRate':'流速','InletPipeSpeed':'进油管流速'},inplace = True)
return df
def main():
df = get_data()
df.to_hdf("E:/data/OilCan2020_1.h5", key='df', mode='w', complevel=9) #, format='table')
if __name__ == '__main__':
main()
参考:
《Pandas(数据表)深入应用经验小结(查询、分组、上下行间计算等)》
CSDN博客 , 肖永威 ,2020年8月