一、原始表结构
1、imsi表
MongoDB Enterprise > db.trs_action_dzwl_zm.findOne()
{
"_id" : {
"imsi" : "460029380018855",
"start_time" : "2019-03-13 15:37:07"
},
"site_address" : "织里-大港路与G318交叉口",
"xnetbar_wacode" : "EG-MIX-WL-4C-006",
"imei" : "000000052052052",
"device_longitude" : "120.275424",
"device_latitude" : "30.838656",
"tmsi" : "1552462627",
"rssi" : "140",
"band" : "40",
"plmn" : "46000",
"tel_number" : "1595028",
"device_name" : "织里-大港路与G318交叉口-4G",
"vendor_name" : "南京森根",
"province" : "江苏省",
"city" : "盐城市"
}
2、car表
MongoDB Enterprise > db.trs_action_car_info.findOne()
{
"_id" : {
"license_number" : "苏A39NX7",
"start_time" : "2019-05-16 23:03:13"
},
"site_address" : "湖织大道-香圩桥东侧",
"site_location_id" : "",
"unlawful_act" : "",
"driving_direct" : "其它",
"lane_id" : "001",
"netbar_wacode" : "904",
"license_color" : "002",
"photo_cnt" : "",
"monitor_type" : "卡口式监控",
"photo_path" : "/pic?did=12ffaa00-78a3-1037-921c-54c4150760be&bid=486472&pid=4294966623&ptime=1558018994",
"speed" : "0",
"stat" : "0",
"vehicle_brand1" : "0",
"vehicle_brand2" : "0",
"car_length" : "",
"car_color" : "其它颜色",
"shade" : "000",
"car_type" : "轿车",
"license_type" : "92式民用车",
"vehicle_feature_path" : "",
"device_name" : "湖织大道-香圩桥东侧",
"monitor_direct" : "未知",
"lane" : "001",
"device_longitude" : "120.308512",
"device_latitude" : "30.881026",
"site_name" : "湖织大道-香圩桥东侧",
"road_segment_direct" : "未知",
"site_longitude" : "120.308512",
"site_latitude" : "30.881026"
}
3、face表
MongoDB Enterprise > db.trs_action_face_info.findOne()
{
"_id" : {
"pid" : "0120_1561570383884_d61beb5b9e644ed081f4ffc5e362ece7",
"start_time" : "2019-06-13 12:32:59"
},
"site_address" : "融泰宾馆",
"img_mode" : "",
"obj_img_url" : "/pic?=d4=i778z096as091-706105m6ep=t1i5i*d1=*ipd7=*9s8=42b8i2d05*717540c14-a563e27-1579*d-d0i806d8e42",
"quality_score" : "0.883593",
"netbar_wacode" : "33052802001310942740",
"device_name" : "融泰宾馆",
"device_longitude" : "120.262211",
"device_latitude" : "30.841749",
"age" : "",
"gender" : "1",
"race" : "",
"beard" : "",
"eye_open" : "",
"eye_glass" : "",
"sun_glass" : "1",
"mask" : "",
"mouth_open" : "",
"smile" : "1",
"similarity" : "0.97059",
"image_id" : "0120_1561570383884_d61beb5b9e644ed081f4ffc5e362ece7",
"bkg_url" : "/pic?=d4=i778z096as091-706105m6ep=t1i5i*d1=*ipd7=*9s8=42b8i2d05*717540c14-a563e27-1579*d-d0i806d8e42"
}
4、MAC表
二、合表后collectionsitetime结构
要求:将imsi、car、face、MAC(MAC暂时不合)四张表,将表中一些关键字段提取出来
1)以站点
2)以两分钟为间隔
3)一个document中,两分钟内最多只存200个关键数据
MongoDB Enterprise > db.collecsites.findOne()
{
"_id" : ObjectId("5e159ef831d840f9482b2adc"),
"timeline" : "2019-03-13 15:34:00",
"site" : "织里-大港路与G318交叉口",
"face" : [ ],
"lpn" : [ ],
"mac" : [ ],
"nsamples" : 200,
"imsi" : [
{
"start_time" : "2019-03-13 15:35:56",
"imsi" : "460078995442766"
},
{
"start_time" : "2019-03-13 15:35:56",
"imsi" : "460006254007976"
}
]
}
三、开发脚本
1、使用到python模块
from multiprocessing import Pool(进程池)
from pymongo import MongoClient(python连接mongodb驱动)
import pandas as pd(将一段时间划分为多个时间段,本例子以2分钟一个时间段)
2、脚本
1)连接mongodb的脚本
[root@mongodb07 python3]# cat mongodbclient.py
#coding=utf-8
from multiprocessing import Pool
import os, time, random
import json
from datetime import datetime
from pymongo import MongoClient
import sys
import datetime
class Database(object):
def __init__(self, address, port, database):
self.cOnn= MongoClient(host=address, port=port)
self.db = self.conn[database]
def get_state(self):
return self.conn is not None and self.db is not None
def insert_one(self, collection, data):
if self.get_state():
ret = self.db[collection].insert_one(data)
return ret.inserted_id
else:
return ""
def insert_many(self, collection, data):
if self.get_state():
ret = self.db[collection].insert_many(data)
return ret.inserted_id
else:
return ""
def update(self, collection, data):
# data format:
# {key:[old_data,new_data]}
data_filter = {}
data_revised = {}
for key in data.keys():
data_filter[key] = data[key][0]
data_revised[key] = data[key][1]
if self.get_state():
return self.db[collection].update_many(data_filter, {"$set": data_revised}).modified_count
return 0
def updateOne(self, collection, data_filter,data_revised):
if self.get_state():
return self.db[collection].update(data_filter,data_revised,True)
return 0
def find(self, col, condition, column=None):
if self.get_state():
if column is None:
return self.db[col].find(condition)
else:
return self.db[col].find(condition, column)
else:
return None
def aggregate(self, col, condition):
if self.get_state():
optiOns= {‘allowDiskUse‘:True}
result=self.db[col].aggregate(condition,**options)
return result
else:
return None
def delete(self, col, condition):
if self.get_state():
return self.db[col].delete_many(filter=condition).deleted_count
return 0
def close_connect(self):
self.conn.close()
#return ‘mongo连接已关闭‘
2)对mongodb中collection做实际操作的脚本
[root@mongodb07 python3]# cat collection_curd.py
#coding:utf-8
from multiprocessing import Pool
import os, time, random
import json
from datetime import datetime
from pymongo import MongoClient
import sys
import datetime
import mongodbclient
import pandas as pd
def max_number(num1,num2,num3): ##获取最大值
max_num=max(num1,num2,num3)
return max_num
def site_cursor_to_list(myresult,colum): ##将mongodb输出的cursor转换为python的list
sitelist=[]
for i in myresult:
sitelist.append(i[colum])
return sitelist
def list_Duplicate_removal(inlist): ##去除重复值
outlist=list(set(inlist))
return outlist
def get_time_interval(str_start_time,str_end_time): ##以2分钟为单位,将输入的时间范围切分
time_interval=pd.date_range(str_start_time, str_end_time,freq=‘2 Min‘)
return time_interval
def get_site(collection_name,str_start_time,str_end_time): ##获取2分钟内imsi/face/lpn/mac的站点名称
db = mongodbclient.Database("172.16.102.15", 27017, "idpad_zl")
myresult=db.find(collection_name, {"_id.start_time":{ "$gte":str_start_time,"$lt":str_end_time}})
db.close_connect()
return site_cursor_to_list(myresult,"site_address")
def get_site_data(collection_name,str_start_time,str_end_time,site,colums): ##根据条件:2分钟的起止时间、站点名、集合名、字段名,获取所需数据
db = mongodbclient.Database("172.16.102.15", 27017, "idpad_zl")
myresult=db.find(collection_name, {"_id.start_time":{ "$gte":str_start_time,"$lt":str_end_time},"site_address":site},colums)
db.close_connect()
return myresult
def sitetime_insert(collection_name,site,str_start_time,imsi_sitetime,face_sitetime,car_sitetime,mac_sitetime): ##将数据插入集合
db = mongodbclient.Database("172.16.102.15", 27017, "idpad_zl")
db.insert_one(collection_name,{"site":site,"timeline":str_start_time,"nsamples":200,"imsi":imsi_sitetime,"face":face_sitetime,"lpn":car_sitetime,"mac":mac_sitetime})
db.close_connect()
def sitetime_updateOne(collection_name,site,str_start_time,key,value): ##将数据更新到集合中
db = mongodbclient.Database("172.16.102.15", 27017, "idpad_zl")
db.updateOne(collection_name,{"site":site,"timeline":str_start_time,"nsamples":200,key:[]},{"$set":{key:value}})
db.close_connect()
#def sit_colse():
# db.close_connect()
3)操作脚本
[root@mongodb07 python3]# cat collection_insert.py
#coding:utf-8
from multiprocessing import Pool
import os, time, random
import json
from datetime import datetime
from pymongo import MongoClient
import sys
import datetime
import mongodbclient
import pandas as pd
import collection_curd as curd
from multiprocessing import Pool
#update_exec(imsi_outlen_flo,"collecsites",‘imsi‘,imsidata,imsi_outlen_int,imsi_max_len)
def update_exec(type_outlen_flo,collectionname,site,str_start_time,typelist,datalist,type_outlen_int,type_max_len):
if type_outlen_flo <=1.0:
curd.sitetime_updateOne(collectionname,site,str_start_time,typelist,datalist)
else:
for x in range(type_outlen_int+1):
if x==type_outlen_int:
curd.sitetime_updateOne(collectionname,site,str_start_time,typelist,datalist[x*200:type_max_len])
#print(typelist)
else:
curd.sitetime_updateOne(collectionname,site,str_start_time,typelist,datalist[x*200:(x+1)*200])
#print(typelist)
def data_exec(nums,time_interval):
#start = time.time()
#print("start_time : ",start)
#time_interval=curd.get_time_interval(‘20190310‘,‘20191230‘)
#for i in range(len(time_interval)-1): ##从时间切片中,选取每一个切片时间段
#print("start : ",nums)
str_start_time = datetime.datetime.strftime(time_interval[nums],‘%Y-%m-%d %H:%M:%S‘) ##时间切片,每个切片的开始时间
str_end_time = datetime.datetime.strftime(time_interval[nums+1],‘%Y-%m-%d %H:%M:%S‘) ##时间切片,每个切片的结束时间
#print(str_start_time,‘ ‘,str_end_time)
#print("########################")
#time.sleep(5)
#exit()
#sitelist=[]
myresult_imsi_sit=curd.get_site("trs_action_dzwl_zm",str_start_time,str_end_time) ##获取2分钟内imsi的站点名称,并将站点名带入下面的循环
myresult_car_sit=curd.get_site("trs_action_car_info",str_start_time,str_end_time) ##获取2分钟内car的站点名称,并将站点名带入下面的循环
myresult_face_sit=curd.get_site("trs_action_face_info",str_start_time,str_end_time) ##获取2分钟内face的站点名称,并将站点名带入下面的循环
myresult=myresult_imsi_sit+myresult_car_sit+myresult_face_sit
#print(myresult)
myresult=curd.list_Duplicate_removal(myresult) ##获取去重后的所有站点
#print(myresult)
#exit()
if not myresult:
pass
else:
for i in range(len(myresult)):
site=myresult[i]
#print(site)
my_imsi_site_data=curd.get_site_data("trs_action_dzwl_zm",str_start_time,str_end_time,site,{"_id"}) ##获取这个站点、这段时间内的数据imsi
my_car_site_data=curd.get_site_data("trs_action_car_info",str_start_time,str_end_time,site,{"_id"}) ##获取这个站点、这段时间内的数据car
my_face_site_data=curd.get_site_data("trs_action_face_info",str_start_time,str_end_time,site,{"_id"}) ##获取这个站点、这段时间内的数据face
imsidata=curd.site_cursor_to_list(my_imsi_site_data,"_id")
cardata=curd.site_cursor_to_list(my_car_site_data,"_id")
facedata=curd.site_cursor_to_list(my_face_site_data,"_id")
#print(imsidata)
imsi_outlen_int=len(imsidata)/200
imsi_outlen_flo=len(imsidata)/200.0
car_outlen_int=len(cardata)/200
face_outlen_int=len(facedata)/200
car_outlen_flo=len(cardata)/200.0
face_outlen_flo=len(facedata)/200.0
car_max_len=len(cardata)
face_max_len=len(facedata)
imsi_max_len=len(imsidata)
#print("car_max_len:",car_outlen_int," ","face_max_len:",face_outlen_int," ","imsi_max_len:",imsi_outlen_int)
max_mod_200=max(imsi_outlen_int,car_outlen_int,face_outlen_int)+1
#print(max_mod_200)
if imsi_outlen_flo>imsi_outlen_int or car_outlen_flo>car_outlen_int or face_outlen_flo>face_outlen_int:
for i in range(max_mod_200):
curd.sitetime_insert("collecsites",site,str_start_time,[],[],[],[])
else:
for i in range(max_mod_200-1):
curd.sitetime_insert("collecsites",site,str_start_time,[],[],[],[])
update_exec(imsi_outlen_flo,"collecsites",site,str_start_time,‘imsi‘,imsidata,imsi_outlen_int,imsi_max_len)
update_exec(car_outlen_flo,"collecsites",site,str_start_time,‘lpn‘,cardata,car_outlen_int,car_max_len)
update_exec(face_outlen_flo,"collecsites",site,str_start_time,‘face‘,facedata,face_outlen_int,face_max_len)
#print(site)
#exit()
#curd.sit_colse
#def update_exec(type_outlen_flo,collectionname,site,str_start_time,typelist,datalist,type_outlen_int,type_max_len):
#end = time.time()
#print("end_time : ",end)
#print(‘ALL Insert Task runs %s(ms).‘ % ((end - start)*1000))
if __name__ == ‘__main__‘:
start = time.time()
p=Pool(30)
#print("start_time : ",start)
time_interval=curd.get_time_interval(‘20190310‘,‘20191230‘)
for i in range(len(time_interval)-1): ##从时间切片中,选取每一个切片时间段
#print(i)
#res=p.apply_async(data_exec,args=(i,))
result=p.apply_async(data_exec, args=(i,time_interval))
p.close()
p.join()
end = time.time()
print("end_time : ",end)
print(‘ALL Insert Task runs %s(ms).‘ % ((end - start)*1000))
四、开发中遇到的问题
1、如何将一段时间按照两分钟进行划分
2、实例化mongodb连接后,在脚本运行中,连接如何close
3、进程线程池的使用
Python MongoDB 合表