热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

开发笔记MongoDB单个数据集合拆分脚本

场景描述:  数据清洗过程中发现,Mongodb中单个集合(表)数据量比较大(千万),定时任务处理加工数据时,由于服务器内存有限,会在处理过程中,出现内存不足的异常,于是希望把单个

场景描述:

    数据清洗过程中发现,Mongodb中单个集合(表)数据量比较大(千万),定时任务处理加工数据时,由于服务器内存有限,会在处理过程中,出现内存不足的异常,于是希望把单个集合进行拆分成多个,在网上找了相关文章,发现处理这种的场景的案例比较少,基本上考虑了如下3中方案来实现:

    1. mongodb sharding 分片 ---还未研究透彻

    2. mongodb 聚合查询,将结果封装成变量,循环遍历,写入新的集合 ---测试了下,效率较低  --文末会说明该方案,适用于小数据量

    3. 自定义python脚本  ---自己动手,丰衣足食

处理方式:

    这里采用了自己写脚本处理的方式,好了!逻辑比较简单,就是将源数据对象读取,循环遍历,到达自己指定的count计数后,开多线程,将符合条件的数据写入新集合(表)。

脚本内容:小伙伴们,可以根据自己实际需求修改脚本内相应的条件,有更好的实现方案,欢迎交流。---脚本运行依赖包 pymongo

---说明:由于pool.map(thread_insert_001, source_rows_list) map方法,没有找到传递多个参数的方式,因此脚本中if else和重复函数较多,封装的不好。

# -*- coding: utf-8 -*-
import datetime
import json

# from bson import ObjectId
from pymongo import MongoClient
from multiprocessing.pool import ThreadPool


# 将3千万条数据拆分成多个集合,每400万一组
# 大概拆分成8组


# mongodb settings
settings = {
    "ip": '127.0.0.1',  # ip
    "port": 27007,  # 端口
    "db_name": "add_your_db_name",  # 数据库名字
    "set_name": "add_your_collection_name" ,  # 集合名字
    "new_db_name": "add_your_target_db_name",
    "target_set_name_001": "coll_001",
    "target_set_name_002": "coll_002",
    "target_set_name_003": "coll_003",
    "target_set_name_004": "coll_004",
    "target_set_name_005": "coll_005",
    "target_set_name_006": "coll_006",
    "target_set_name_007": "coll_007",
    "target_set_name_008": "coll_008",
}


class MyMongoDB():
    def __init__(self):
        try:
            self.conn = MongoClient(settings["ip"], settings["port"])
        except Exception as e:
            print(e)
        self.db = self.conn[settings["db_name"]]
        self.my_set = self.db[settings["set_name"]]

    def insert(self, dic):
        print("inser...")
        if dic:
            self.my_set.insert(dic)
        else:
            pass

    def update(self, dic, newdic):
        print("update...")
        self.my_set.update(dic, newdic)

    def delete(self, dic):
        print("delete...")
        self.my_set.remove(dic)

    def dbfind(self, dic=None):
        print("find...")
        current_set = self.my_set
        data = current_set.find()

        return data


def thread_insert_001(rows_dic):
    conn_mongodb = MongoClient(settings["ip"], settings["port"])
    new_db = conn_mongodb[settings["new_db_name"]]
    new_collect_set = new_db[settings["target_set_name_001"]]
    new_collect_set.insert(rows_dic)


def thread_insert_002(rows_dic):
    conn_mongodb = MongoClient(settings["ip"], settings["port"])
    new_db = conn_mongodb[settings["new_db_name"]]
    new_collect_set = new_db[settings["target_set_name_002"]]
    new_collect_set.insert(rows_dic)


def thread_insert_003(rows_dic):
    conn_mongodb = MongoClient(settings["ip"], settings["port"])
    new_db = conn_mongodb[settings["new_db_name"]]
    new_collect_set = new_db[settings["target_set_name_003"]]
    new_collect_set.insert(rows_dic)


def thread_insert_004(rows_dic):
    conn_mongodb = MongoClient(settings["ip"], settings["port"])
    new_db = conn_mongodb[settings["new_db_name"]]
    new_collect_set = new_db[settings["target_set_name_004"]]

    new_collect_set.insert(rows_dic)


def thread_insert_005(rows_dic):
    conn_mongodb = MongoClient(settings["ip"], settings["port"])
    new_db = conn_mongodb[settings["new_db_name"]]
    new_collect_set = new_db[settings["target_set_name_005"]]
    new_collect_set.insert(rows_dic)


def thread_insert_006(rows_dic):
    conn_mongodb = MongoClient(settings["ip"], settings["port"])
    new_db = conn_mongodb[settings["new_db_name"]]
    new_collect_set = new_db[settings["target_set_name_006"]]
    new_collect_set.insert(rows_dic)


def thread_insert_007(rows_dic):
    conn_mongodb = MongoClient(settings["ip"], settings["port"])
    new_db = conn_mongodb[settings["new_db_name"]]
    new_collect_set = new_db[settings["target_set_name_007"]]
    new_collect_set.insert(rows_dic)


def thread_insert_008(rows_dic):
    conn_mongodb = MongoClient(settings["ip"], settings["port"])
    new_db = conn_mongodb[settings["new_db_name"]]
    new_collect_set = new_db[settings["target_set_name_008"]]
    new_collect_set.insert(rows_dic)


def split_mg_dt_col_to_new_col():
    # mongodb 旧集合数据 拆分 写入新集合
    my_mongodb_obj = MyMongoDB()
    data = my_mongodb_obj.dbfind()

    # 开10个线程,根据服务器配置修改
    pool = ThreadPool(10)

    count = 1
    source_rows_list = []
    for result in data:
        count += 1
        source_rows_list.append(result)
        # print(source_rows_list)
        if count < 4000000:
            if len(source_rows_list) == 2000:
                print('11111111111111111')
                print(count)
                print(len(source_rows_list))
                insert_dic = pool.map(thread_insert_001, source_rows_list)
                del source_rows_list[:]
        elif 4000000 <= count < 8000000:
            if len(source_rows_list) == 2000:
                print('222222222222222222')
                print(count)
                print(len(source_rows_list))
                insert_dic = pool.map(thread_insert_002, source_rows_list)
                del source_rows_list[:]
        elif 8000000 <= count < 12000000:
            if len(source_rows_list) == 2000:
                print('333333333333333333')
                print(count)
                print(len(source_rows_list))
                insert_dic = pool.map(thread_insert_003, source_rows_list)
                del source_rows_list[:]
        elif 12000000 <= count < 16000000:
            if len(source_rows_list) == 2000:
                print('44444444444444444444')
                print(count)
                print(len(source_rows_list))
                insert_dic = pool.map(thread_insert_004, source_rows_list)
                del source_rows_list[:]
        elif 16000000 <= count < 20000000:
            if len(source_rows_list) == 2000:
                print('5555555555555555555555')
                print(count)
                print(len(source_rows_list))
                insert_dic = pool.map(thread_insert_005, source_rows_list)
                del source_rows_list[:]
        elif 20000000 <= count < 24000000:
            if len(source_rows_list) == 2000:
                print('666666666666666666666')
                print(count)
                print(len(source_rows_list))
                insert_dic = pool.map(thread_insert_006, source_rows_list)
                del source_rows_list[:]
        elif 24000000 <= count < 28000000:
            if len(source_rows_list) == 2000:
                print('7777777777777777777')
                print(count)
                print(len(source_rows_list))
                insert_dic = pool.map(thread_insert_007, source_rows_list)
                del source_rows_list[:]
        elif 28000000 <= count < 32000000:
            if len(source_rows_list) == 2000:
                print('88888888888888888')
                print(count)
                print(len(source_rows_list))
                insert_dic = pool.map(thread_insert_008, source_rows_list)
                del source_rows_list[:]

    pool.close()

if __name__ == '__main__':
    split_mg_dt_col_to_new_col()
拆分脚本-展开查看
推荐阅读
  • 本文详细介绍了如何在Linux系统中搭建51单片机的开发与编程环境,重点讲解了使用Makefile进行项目管理的方法。首先,文章指导读者安装SDCC(Small Device C Compiler),这是一个专为小型设备设计的C语言编译器,适合用于51单片机的开发。随后,通过具体的实例演示了如何配置Makefile文件,以实现代码的自动化编译与链接过程,从而提高开发效率。此外,还提供了常见问题的解决方案及优化建议,帮助开发者快速上手并解决实际开发中可能遇到的技术难题。 ... [详细]
  • MongoDB Aggregates.group() 方法详解与编程实例 ... [详细]
  • 在2020年8月19日的深度分析中,我们探讨了HTML标签中同时存在`a`标签的`href`和`onclick`属性时的触发顺序问题。此外,还讨论了如何在一个自适应高度的父级`div`中,使两个子`div`中的一个固定高度为300px,另一个自动填充剩余空间的方法。最后,文章详细介绍了JavaScript异步加载的多种实现方式,包括但不限于`async`、`defer`属性以及动态脚本插入技术,为开发者提供了丰富的技术参考。 ... [详细]
  • 深入解析Tomcat:开发者的实用指南
    深入解析Tomcat:开发者的实用指南 ... [详细]
  • 如何在Java中高效构建WebService
    本文介绍了如何利用XFire框架在Java中高效构建WebService。XFire是一个轻量级、高性能的Java SOAP框架,能够简化WebService的开发流程。通过结合MyEclipse集成开发环境,开发者可以更便捷地进行项目配置和代码编写,从而提高开发效率。此外,文章还详细探讨了XFire的关键特性和最佳实践,为读者提供了实用的参考。 ... [详细]
  • Go语言实现Redis客户端与服务器的交互机制深入解析
    在前文对Godis v1.0版本的基础功能进行了详细介绍后,本文将重点探讨如何实现客户端与服务器之间的交互机制。通过具体代码实现,使客户端与服务器能够顺利通信,赋予项目实际运行的能力。本文将详细解析Go语言在实现这一过程中的关键技术和实现细节,帮助读者深入了解Redis客户端与服务器的交互原理。 ... [详细]
  • 在使用关系型数据库时,通常需要通过用户名和密码进行身份验证才能访问数据。然而,MongoDB默认情况下并不强制要求这种身份验证机制,使得用户无需凭据即可访问并执行各种操作。虽然这一设计简化了初学者的上手过程,但也带来了显著的安全风险。为了提升MongoDB的连接安全性,本文将探讨多种策略与实践,包括启用身份验证、配置网络访问控制、加密通信以及定期审计安全设置,以确保数据库的安全性和数据的完整性。 ... [详细]
  • DHCP三层交换机设置方式全局模式和接口模式设置方式和命令resetsave回车输入yreboot输入n输入y重启后就恢复默认设置了默认用户名密码adminAdmin@huawei ... [详细]
  • 本文全面概述了MySQL的发展历程与演进。最初,我们旨在通过自定义的快速低级(ISAM)接口连接到表格,利用mSQL数据库系统。随着时间的推移,MySQL不仅在性能和稳定性上取得了显著提升,还引入了多种高级功能,如事务处理、存储过程和视图等,成为全球广泛使用的开源数据库管理系统之一。 ... [详细]
  • 本文提供了 RabbitMQ 3.7 的快速上手指南,详细介绍了环境搭建、生产者和消费者的配置与使用。通过官方教程的指引,读者可以轻松完成初步测试和实践,快速掌握 RabbitMQ 的核心功能和基本操作。 ... [详细]
  • APKAnalyzer(1):命令行操作体验与功能解析
    在对apkChecker进行深入研究后,自然而然地关注到了Android Studio中的APK分析功能。将APK文件导入IDE中,系统会自动解析并展示其中各类文件的详细信息。官方文档提供了详细的命令行工具使用指南,帮助开发者快速上手。本文以一个RecyclerView的Adapter代理开源库为例,探讨了如何利用这些工具进行高效的APK分析。 ... [详细]
  • 随着越来越多的应用程序采用JSON格式作为响应数据,基于Spring Framework构建的服务端应用也广泛采用了这一实践。本文将详细介绍如何在Spring 4.x版本的MVC框架中配置和实现HTTP请求返回JSON数据流,涵盖相关配置、依赖管理和代码示例,帮助开发者高效地实现这一功能。 ... [详细]
  • SQL Server 数据导入过程中遇到错误:MySQL 文件导入失败分析与解决 ... [详细]
  • 使用PyQt5与OpenCV实现电脑摄像头的图像捕捉功能
    本文介绍了如何使用Python中的PyQt5和OpenCV库来实现电脑摄像头的图像捕捉功能。通过结合这两个强大的工具,用户可以轻松地打开摄像头并进行实时图像采集和处理。代码示例展示了如何初始化摄像头、捕获图像并将其显示在PyQt5的图形界面中。此外,还提供了详细的步骤说明和代码注释,帮助开发者快速上手并实现相关功能。 ... [详细]
  • 触发器是数据库中一种特殊类型的存储过程,其执行依赖于预定义的事件,而非直接调用。在数据库管理中,触发器主要用于实现数据完整性、自动化日志记录及复杂业务规则的执行。当对数据库中的表、视图等对象进行插入、更新或删除操作时,系统将自动激活相关的触发器,以确保数据的一致性和安全性。此外,通过合理设计和优化触发器,还可以显著提升数据库性能和响应速度。 ... [详细]
author-avatar
一切随缘2502885767
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有