热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

PythonMongoDB合表

一、原始表结构1、imsi表MongoDBEnterprisedb.trs_action_dzwl_zm.findOne(){_id:{imsi:46002938001

一、原始表结构

1、imsi表

MongoDB Enterprise > db.trs_action_dzwl_zm.findOne()
{
        "_id" : {
                "imsi" : "460029380018855",
                "start_time" : "2019-03-13 15:37:07"
        },
        "site_address" : "织里-大港路与G318交叉口",
        "xnetbar_wacode" : "EG-MIX-WL-4C-006",
        "imei" : "000000052052052",
        "device_longitude" : "120.275424",
        "device_latitude" : "30.838656",
        "tmsi" : "1552462627",
        "rssi" : "140",
        "band" : "40",
        "plmn" : "46000",
        "tel_number" : "1595028",
        "device_name" : "织里-大港路与G318交叉口-4G",
        "vendor_name" : "南京森根",
        "province" : "江苏省",
        "city" : "盐城市"
}

2、car表

MongoDB Enterprise > db.trs_action_car_info.findOne()
{
        "_id" : {
                "license_number" : "苏A39NX7",
                "start_time" : "2019-05-16 23:03:13"
        },
        "site_address" : "湖织大道-香圩桥东侧",
        "site_location_id" : "",
        "unlawful_act" : "",
        "driving_direct" : "其它",
        "lane_id" : "001",
        "netbar_wacode" : "904",
        "license_color" : "002",
        "photo_cnt" : "",
        "monitor_type" : "卡口式监控",
        "photo_path" : "/pic?did=12ffaa00-78a3-1037-921c-54c4150760be&bid=486472&pid=4294966623&ptime=1558018994",
        "speed" : "0",
        "stat" : "0",
        "vehicle_brand1" : "0",
        "vehicle_brand2" : "0",
        "car_length" : "",
        "car_color" : "其它颜色",
        "shade" : "000",
        "car_type" : "轿车",
        "license_type" : "92式民用车",
        "vehicle_feature_path" : "",
        "device_name" : "湖织大道-香圩桥东侧",
        "monitor_direct" : "未知",
        "lane" : "001",
        "device_longitude" : "120.308512",
        "device_latitude" : "30.881026",
        "site_name" : "湖织大道-香圩桥东侧",
        "road_segment_direct" : "未知",
        "site_longitude" : "120.308512",
        "site_latitude" : "30.881026"
}

3、face表

MongoDB Enterprise > db.trs_action_face_info.findOne()
{
        "_id" : {
                "pid" : "0120_1561570383884_d61beb5b9e644ed081f4ffc5e362ece7",
                "start_time" : "2019-06-13 12:32:59"
        },
        "site_address" : "融泰宾馆",
        "img_mode" : "",
        "obj_img_url" : "/pic?=d4=i778z096as091-706105m6ep=t1i5i*d1=*ipd7=*9s8=42b8i2d05*717540c14-a563e27-1579*d-d0i806d8e42",
        "quality_score" : "0.883593",
        "netbar_wacode" : "33052802001310942740",
        "device_name" : "融泰宾馆",
        "device_longitude" : "120.262211",
        "device_latitude" : "30.841749",
        "age" : "",
        "gender" : "1",
        "race" : "",
        "beard" : "",
        "eye_open" : "",
        "eye_glass" : "",
        "sun_glass" : "1",
        "mask" : "",
        "mouth_open" : "",
        "smile" : "1",
        "similarity" : "0.97059",
        "image_id" : "0120_1561570383884_d61beb5b9e644ed081f4ffc5e362ece7",
        "bkg_url" : "/pic?=d4=i778z096as091-706105m6ep=t1i5i*d1=*ipd7=*9s8=42b8i2d05*717540c14-a563e27-1579*d-d0i806d8e42"
}

4、MAC表

二、合表后collectionsitetime结构

要求:将imsi、car、face、MAC(MAC暂时不合)四张表,将表中一些关键字段提取出来

1)以站点

2)以两分钟为间隔

3)一个document中,两分钟内最多只存200个关键数据

MongoDB Enterprise > db.collecsites.findOne()
{
        "_id" : ObjectId("5e159ef831d840f9482b2adc"),
        "timeline" : "2019-03-13 15:34:00",
        "site" : "织里-大港路与G318交叉口",
        "face" : [ ],
        "lpn" : [ ],
        "mac" : [ ],
        "nsamples" : 200,
        "imsi" : [
                {
                        "start_time" : "2019-03-13 15:35:56",
                        "imsi" : "460078995442766"
                },
                {
                        "start_time" : "2019-03-13 15:35:56",
                        "imsi" : "460006254007976"
                }
        ]
}

三、开发脚本

1、使用到python模块

from multiprocessing import Pool(进程池)

from pymongo import MongoClient(python连接mongodb驱动)

import pandas as pd(将一段时间划分为多个时间段,本例子以2分钟一个时间段)

2、脚本

1)连接mongodb的脚本

[root@mongodb07 python3]# cat mongodbclient.py
#coding=utf-8
from multiprocessing import Pool
import os, time, random
import json
from datetime import datetime
from pymongo import MongoClient
import sys
import datetime
class Database(object):
    def __init__(self, address, port, database):
        self.cOnn= MongoClient(host=address, port=port)
        self.db = self.conn[database]
    def get_state(self):
        return self.conn is not None and self.db is not None
    def insert_one(self, collection, data):
        if self.get_state():
            ret = self.db[collection].insert_one(data)
            return ret.inserted_id
        else:
            return ""
    def insert_many(self, collection, data):
        if self.get_state():
            ret = self.db[collection].insert_many(data)
            return ret.inserted_id
        else:
            return ""
    def update(self, collection, data):
        # data format:
        # {key:[old_data,new_data]}
        data_filter = {}
        data_revised = {}
        for key in data.keys():
            data_filter[key] = data[key][0]
            data_revised[key] = data[key][1]
        if self.get_state():
            return self.db[collection].update_many(data_filter, {"$set": data_revised}).modified_count
        return 0
    def updateOne(self, collection, data_filter,data_revised):
        if self.get_state():
            return self.db[collection].update(data_filter,data_revised,True)
        return 0

    def find(self, col, condition, column=None):
        if self.get_state():
            if column is None:
                return self.db[col].find(condition)
            else:
                return self.db[col].find(condition, column)
        else:
            return None
    def aggregate(self, col, condition):
        if self.get_state():
            optiOns= {‘allowDiskUse‘:True}
            result=self.db[col].aggregate(condition,**options)
            return result
        else:
            return None

    def delete(self, col, condition):
        if self.get_state():
            return self.db[col].delete_many(filter=condition).deleted_count
        return 0
    def close_connect(self):
        self.conn.close()
        #return ‘mongo连接已关闭‘
2)对mongodb中collection做实际操作的脚本
[root@mongodb07 python3]# cat collection_curd.py
#coding:utf-8
from multiprocessing import Pool
import os, time, random
import json
from datetime import datetime
from pymongo import MongoClient
import sys
import datetime
import mongodbclient
import pandas as pd
def max_number(num1,num2,num3):   ##获取最大值
    max_num=max(num1,num2,num3)
    return max_num
def site_cursor_to_list(myresult,colum):  ##将mongodb输出的cursor转换为python的list
    sitelist=[]
    for i in myresult:
        sitelist.append(i[colum])
    return sitelist
def list_Duplicate_removal(inlist):   ##去除重复值
    outlist=list(set(inlist))
    return outlist
def get_time_interval(str_start_time,str_end_time):  ##以2分钟为单位,将输入的时间范围切分
    time_interval=pd.date_range(str_start_time, str_end_time,freq=‘2 Min‘)
    return time_interval
def get_site(collection_name,str_start_time,str_end_time):  ##获取2分钟内imsi/face/lpn/mac的站点名称
    db = mongodbclient.Database("172.16.102.15", 27017, "idpad_zl")
    myresult=db.find(collection_name, {"_id.start_time":{ "$gte":str_start_time,"$lt":str_end_time}})
    db.close_connect()
    return site_cursor_to_list(myresult,"site_address")
def get_site_data(collection_name,str_start_time,str_end_time,site,colums):  ##根据条件:2分钟的起止时间、站点名、集合名、字段名,获取所需数据
    db = mongodbclient.Database("172.16.102.15", 27017, "idpad_zl")
    myresult=db.find(collection_name, {"_id.start_time":{ "$gte":str_start_time,"$lt":str_end_time},"site_address":site},colums)
    db.close_connect()
    return myresult

def sitetime_insert(collection_name,site,str_start_time,imsi_sitetime,face_sitetime,car_sitetime,mac_sitetime):  ##将数据插入集合
    db = mongodbclient.Database("172.16.102.15", 27017, "idpad_zl")
    db.insert_one(collection_name,{"site":site,"timeline":str_start_time,"nsamples":200,"imsi":imsi_sitetime,"face":face_sitetime,"lpn":car_sitetime,"mac":mac_sitetime})
    db.close_connect()
def sitetime_updateOne(collection_name,site,str_start_time,key,value):   ##将数据更新到集合中
    db = mongodbclient.Database("172.16.102.15", 27017, "idpad_zl")
    db.updateOne(collection_name,{"site":site,"timeline":str_start_time,"nsamples":200,key:[]},{"$set":{key:value}})
    db.close_connect()
#def sit_colse():
#    db.close_connect()
 
3)操作脚本
[root@mongodb07 python3]# cat collection_insert.py
#coding:utf-8
from multiprocessing import Pool
import os, time, random
import json
from datetime import datetime
from pymongo import MongoClient
import sys
import datetime
import mongodbclient
import pandas as pd
import collection_curd as curd
from multiprocessing import Pool
#update_exec(imsi_outlen_flo,"collecsites",‘imsi‘,imsidata,imsi_outlen_int,imsi_max_len)
def update_exec(type_outlen_flo,collectionname,site,str_start_time,typelist,datalist,type_outlen_int,type_max_len):
    if type_outlen_flo <=1.0:
        curd.sitetime_updateOne(collectionname,site,str_start_time,typelist,datalist)
    else:
        for x in range(type_outlen_int+1):
            if x==type_outlen_int:
                curd.sitetime_updateOne(collectionname,site,str_start_time,typelist,datalist[x*200:type_max_len])
                #print(typelist)
            else:
                curd.sitetime_updateOne(collectionname,site,str_start_time,typelist,datalist[x*200:(x+1)*200])
                #print(typelist)
def data_exec(nums,time_interval):
    #start = time.time()
    #print("start_time : ",start)
    #time_interval=curd.get_time_interval(‘20190310‘,‘20191230‘)
    #for i in range(len(time_interval)-1):  ##从时间切片中,选取每一个切片时间段
    #print("start : ",nums)
    str_start_time = datetime.datetime.strftime(time_interval[nums],‘%Y-%m-%d %H:%M:%S‘)  ##时间切片,每个切片的开始时间
    str_end_time = datetime.datetime.strftime(time_interval[nums+1],‘%Y-%m-%d %H:%M:%S‘)  ##时间切片,每个切片的结束时间
    #print(str_start_time,‘   ‘,str_end_time)
    #print("########################")
    #time.sleep(5)
    #exit()
    #sitelist=[]
    myresult_imsi_sit=curd.get_site("trs_action_dzwl_zm",str_start_time,str_end_time) ##获取2分钟内imsi的站点名称,并将站点名带入下面的循环
    myresult_car_sit=curd.get_site("trs_action_car_info",str_start_time,str_end_time) ##获取2分钟内car的站点名称,并将站点名带入下面的循环
    myresult_face_sit=curd.get_site("trs_action_face_info",str_start_time,str_end_time) ##获取2分钟内face的站点名称,并将站点名带入下面的循环
    myresult=myresult_imsi_sit+myresult_car_sit+myresult_face_sit
    #print(myresult)
    myresult=curd.list_Duplicate_removal(myresult) ##获取去重后的所有站点
    #print(myresult)
    #exit()
    if not myresult:
        pass
    else:
        for i in range(len(myresult)):
            site=myresult[i]
            #print(site)
            my_imsi_site_data=curd.get_site_data("trs_action_dzwl_zm",str_start_time,str_end_time,site,{"_id"}) ##获取这个站点、这段时间内的数据imsi
            my_car_site_data=curd.get_site_data("trs_action_car_info",str_start_time,str_end_time,site,{"_id"}) ##获取这个站点、这段时间内的数据car
            my_face_site_data=curd.get_site_data("trs_action_face_info",str_start_time,str_end_time,site,{"_id"}) ##获取这个站点、这段时间内的数据face
            imsidata=curd.site_cursor_to_list(my_imsi_site_data,"_id")
            cardata=curd.site_cursor_to_list(my_car_site_data,"_id")
            facedata=curd.site_cursor_to_list(my_face_site_data,"_id")
            #print(imsidata)
            imsi_outlen_int=len(imsidata)/200
            imsi_outlen_flo=len(imsidata)/200.0
            car_outlen_int=len(cardata)/200
            face_outlen_int=len(facedata)/200
            car_outlen_flo=len(cardata)/200.0
            face_outlen_flo=len(facedata)/200.0
            car_max_len=len(cardata)
            face_max_len=len(facedata)
            imsi_max_len=len(imsidata)
            #print("car_max_len:",car_outlen_int," ","face_max_len:",face_outlen_int," ","imsi_max_len:",imsi_outlen_int)
            max_mod_200=max(imsi_outlen_int,car_outlen_int,face_outlen_int)+1
            #print(max_mod_200)
            if imsi_outlen_flo>imsi_outlen_int or car_outlen_flo>car_outlen_int or face_outlen_flo>face_outlen_int:
                for i in range(max_mod_200):
                    curd.sitetime_insert("collecsites",site,str_start_time,[],[],[],[])
            else:
                for i in range(max_mod_200-1):
                    curd.sitetime_insert("collecsites",site,str_start_time,[],[],[],[])
            update_exec(imsi_outlen_flo,"collecsites",site,str_start_time,‘imsi‘,imsidata,imsi_outlen_int,imsi_max_len)
            update_exec(car_outlen_flo,"collecsites",site,str_start_time,‘lpn‘,cardata,car_outlen_int,car_max_len)
            update_exec(face_outlen_flo,"collecsites",site,str_start_time,‘face‘,facedata,face_outlen_int,face_max_len)
            #print(site)
            #exit()
            #curd.sit_colse
    #def update_exec(type_outlen_flo,collectionname,site,str_start_time,typelist,datalist,type_outlen_int,type_max_len):
    #end = time.time()
    #print("end_time : ",end)
    #print(‘ALL Insert Task runs %s(ms).‘ % ((end - start)*1000))

if __name__ == ‘__main__‘:
    start = time.time()
    p=Pool(30)
    #print("start_time : ",start)
    time_interval=curd.get_time_interval(‘20190310‘,‘20191230‘)
    for i in range(len(time_interval)-1):  ##从时间切片中,选取每一个切片时间段
        #print(i)
        #res=p.apply_async(data_exec,args=(i,))
        result=p.apply_async(data_exec, args=(i,time_interval))
    p.close()
    p.join()
    end = time.time()
    print("end_time : ",end)
    print(‘ALL Insert Task runs %s(ms).‘ % ((end - start)*1000))
 

四、开发中遇到的问题

1、如何将一段时间按照两分钟进行划分
2、实例化mongodb连接后,在脚本运行中,连接如何close
3、进程线程池的使用

Python MongoDB 合表


推荐阅读
  • 本文介绍了如何通过C#语言调用动态链接库(DLL)中的函数来实现IC卡的基本操作,包括初始化设备、设置密码模式、获取设备状态等,并详细展示了将TextBox中的数据写入IC卡的具体实现方法。 ... [详细]
  • 在1995年,Simon Plouffe 发现了一种特殊的求和方法来表示某些常数。两年后,Bailey 和 Borwein 在他们的论文中发表了这一发现,这种方法被命名为 Bailey-Borwein-Plouffe (BBP) 公式。该问题要求计算圆周率 π 的第 n 个十六进制数字。 ... [详细]
  • 本文将从基础概念入手,详细探讨SpringMVC框架中DispatcherServlet如何通过HandlerMapping进行请求分发,以及其背后的源码实现细节。 ... [详细]
  • 回顾两年前春节期间的一个个人项目,该项目原本计划参加竞赛,但最终作为练习项目完成。独自完成了从编码到UI设计的全部工作,尽管代码量不大,但仍有一定的参考价值。本文将详细介绍该项目的背景、功能及技术实现。 ... [详细]
  • SQL Server 存储过程实践任务(第二部分)
    本文档详细介绍了三个SQL Server存储过程的创建与使用方法,包括统计特定类型客房的入住人数、根据房间号查询客房详情以及删除特定类型的客房记录。 ... [详细]
  • 本文探讨了程序员这一职业的本质,认为他们是专注于问题解决的专业人士。文章深入分析了他们的日常工作状态、个人品质以及面对挑战时的态度,强调了编程不仅是一项技术活动,更是个人成长和精神修炼的过程。 ... [详细]
  • 本文介绍了SIP(Session Initiation Protocol,会话发起协议)的基本概念、功能、消息格式及其实现机制。SIP是一种在IP网络上用于建立、管理和终止多媒体通信会话的应用层协议。 ... [详细]
  • 二维码的实现与应用
    本文介绍了二维码的基本概念、分类及其优缺点,并详细描述了如何使用Java编程语言结合第三方库(如ZXing和qrcode.jar)来实现二维码的生成与解析。 ... [详细]
  • 在日常生活中,支付宝已成为不可或缺的支付工具之一。本文将详细介绍如何通过支付宝实现免费提现,帮助用户更好地管理个人财务,避免不必要的手续费支出。 ... [详细]
  • 我的读书清单(持续更新)201705311.《一千零一夜》2006(四五年级)2.《中华上下五千年》2008(初一)3.《鲁滨孙漂流记》2008(初二)4.《钢铁是怎样炼成的》20 ... [详细]
  • 本文详细介绍了C++中的构造函数,包括其定义、特点以及如何通过构造函数进行对象的初始化。此外,还探讨了转换构造函数的概念及其在不同情境下的应用,以及如何避免不必要的隐式类型转换。 ... [详细]
  • 本文详细介绍了iOS应用的生命周期,包括各个状态及其转换过程中的关键方法调用。 ... [详细]
  • 材料光学属性集
    材料光学属性集概述了材料在不同光谱下的光学行为,包括可见光透射率、太阳光透射率等关键参数。 ... [详细]
  • 在 Django 模型中,ForeignKey 的 on_delete 参数定义了当关联对象被删除时,当前模型实例的行为。本文详细解释了 on_delete 的各个选项及其具体作用。 ... [详细]
  • 本文提供了《汇编语言 第3版》中检测点11.2的详细参考答案,包括了各指令执行后的状态标志分析。 ... [详细]
author-avatar
君君6789_903
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有