热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

使用Python将MongoDB数据同步到Elasticsearch

使用Python将MongoDB数据同步到Elasticsearch版本说明:Python3.7PyMongo:3.11.0Elasticsearch

使用Python将MongoDB数据同步到Elasticsearch

版本说明:Python 3.7 PyMongo:3.11.0 Elasticsearch:5.5.3
话不多说直接鲁码(如遇到什么问题欢迎留言讨论)

# coding:utf8
# 将mongodb中的数据同步到Es中
from pymongo import MongoClient
from elasticsearch import Elasticsearch, helpers
import json
import logging# mongodb 数据库地址
CONN_ADDR1 = '更换为自己的MongoDB地址'
USERNAME = 'MongoDB用户名'
PASSWORD = 'MongoDB密码'
DB = "MongoDB的库"
COLLECTION = "MongoDB集合"# Es 数据库地址
class ElasticObj:def __init__(self, index_name, index_type, ip):""":param index_name: 索引名称:param index_type: 索引类型"""self.index_name = index_nameself.index_type = index_type# 无用户名密码状态# self.es = Elasticsearch([ip])# 用户名密码状态# 连接ESself.es = Elasticsearch([ip], http_auth=('ES用户名', 'ES密码'), port=9200)# def chaxun(self):#查询所有数据# db = self.client['xcc_company_name']# collection = db['name_A']# data_qiyes = collection.find({}, no_cursor_timeout=True)# return data_qiyes# 创建索引def create_index(self):'''创建索引,创建索引名称为ott,类型为ott_type的索引:param ex: Elasticsearch对象:return:'''# 创建映射_index_mappings = {"mappings": {self.index_type: {"properties": {"name": {'type': 'text'},"password": {'type': 'text'},"birthplace": {'type': 'text'}}}}}if self.es.indices.exists(index=self.index_name) is not True:res = self.es.indices.create(index=self.index_name, body=_index_mappings, ignore=400)print(res)# 打印详细的日志def detailedlog(self):logging.basicConfig(format='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s',level=logging.DEBUG)logging.debug('debug 信息')logging.info('info 信息')logging.warning('warning 信息')logging.error('error 信息')logging.critical('critial 信息')# 插入数据def insert_data(self, p=0):ACTIONS = []p = 1bulk_num = 200# 连接mongo库client = MongoClient(CONN_ADDR1, port=27017)client.admin.authenticate(USERNAME, PASSWORD)mydb = client[DB]mycol = mydb[COLLECTION]# find()最大限制是101条 使用mycol.find({}, no_cursor_timeout=True) 查询所有Cursor = mycol.find({}, no_cursor_timeout=True)n = 0for mongoRecordRes in Cursor:data = []# for mongoRecordRes in list(mycol.find({}, no_cursor_timeout=True)):n = n + 1print("开始处理数据" + str(n) + "条")data_value = []data_name = ["corporateAnnualInfoId", "companyId", "nbYear", "annualCount", "basicInfo", "onlineStoreInfo","shareContributive", "frimAssetInfo", "socialInfo"]。。。(这里编写自己处理数据的逻辑)# print(data_value)# print(data_name)data_name_value = dict(zip(data_name, data_value))data.append(data_name_value)# print(data)# 遍历data数据for list_line in data:# 去掉引号# list_line = eval(list_line)# print("-" * 10)# print(list_line)# print("-" * 10)action = {"_index": self.index_name,"_type": self.index_type,"_id": list_line["corporateAnnualInfoId"], # _id 也可以默认生成,不赋值"_source": {"corporateAnnualInfoId": list_line["corporateAnnualInfoId"],"companyId": list_line["companyId"],"nbYear": list_line["nbYear"],"annualCount": list_line["annualCount"],"basicInfo": list_line["basicInfo"],"onlineStoreInfo": list_line["onlineStoreInfo"],"shareContributive": list_line["shareContributive"],"frimAssetInfo": list_line["frimAssetInfo"],"socialInfo": list_line["socialInfo"]}}p += 1ACTIONS.append(action)print(ACTIONS)# success, _ = helpers.bulk(self.es, action, index=self.index_name, raise_on_error=True)# 批量处理if len(ACTIONS) == bulk_num:print('插入', p / bulk_num, '批数据')print(len(ACTIONS))success, _ = helpers.bulk(self.es, ACTIONS, index=self.index_name, raise_on_error=True)del ACTIONS[0:len(ACTIONS)]print(success)if len(ACTIONS) > 0:success, _ = helpers.bulk(self.es, ACTIONS, index=self.index_name, raise_on_error=True)del ACTIONS[0:len(ACTIONS)]print('Performed %d actions' % success)if __name__ == '__main__':#这里IP更换为自己的obj = ElasticObj("IndexName", "IndexType",ip="es-cn-********.com")#创建索引 本人这里没有使用只是测试了一下#obj.create_index()obj.detailedlog()#执行数据的插入obj.insert_data()


推荐阅读
  • 本文介绍了SIP(Session Initiation Protocol,会话发起协议)的基本概念、功能、消息格式及其实现机制。SIP是一种在IP网络上用于建立、管理和终止多媒体通信会话的应用层协议。 ... [详细]
  • Web动态服务器Python基本实现
    Web动态服务器Python基本实现 ... [详细]
  • web页面报表js下载,web报表软件 ... [详细]
  • MySQL Decimal 类型的最大值解析及其在数据处理中的应用艺术
    在关系型数据库中,表的设计与SQL语句的编写对性能的影响至关重要,甚至可占到90%以上。本文将重点探讨MySQL中Decimal类型的最大值及其在数据处理中的应用技巧,通过实例分析和优化建议,帮助读者深入理解并掌握这一重要知识点。 ... [详细]
  • 在CentOS 7环境中安装配置Redis及使用Redis Desktop Manager连接时的注意事项与技巧
    在 CentOS 7 环境中安装和配置 Redis 时,需要注意一些关键步骤和最佳实践。本文详细介绍了从安装 Redis 到配置其基本参数的全过程,并提供了使用 Redis Desktop Manager 连接 Redis 服务器的技巧和注意事项。此外,还探讨了如何优化性能和确保数据安全,帮助用户在生产环境中高效地管理和使用 Redis。 ... [详细]
  • 二维码的实现与应用
    本文介绍了二维码的基本概念、分类及其优缺点,并详细描述了如何使用Java编程语言结合第三方库(如ZXing和qrcode.jar)来实现二维码的生成与解析。 ... [详细]
  • 入门指南:使用FastRPC技术连接Qualcomm Hexagon DSP
    本文旨在为初学者提供关于如何使用FastRPC技术连接Qualcomm Hexagon DSP的基础知识。FastRPC技术允许开发者在本地客户端实现远程调用,从而简化Hexagon DSP的开发和调试过程。 ... [详细]
  • 本文详细介绍了如何正确设置Shadowsocks公共代理,包括调整超时设置、检查系统限制、防止滥用及遵守DMCA法规等关键步骤。 ... [详细]
  • Jupyter Notebook多语言环境搭建指南
    本文详细介绍了如何在Linux环境下为Jupyter Notebook配置Python、Python3、R及Go四种编程语言的环境,包括必要的软件安装和配置步骤。 ... [详细]
  • 本文详细介绍了如何搭建一个高可用的MongoDB集群,包括环境准备、用户配置、目录创建、MongoDB安装、配置文件设置、集群组件部署等步骤。特别关注分片、读写分离及负载均衡的实现。 ... [详细]
  • 本文档介绍了如何使用ESP32开发板在STA模式下实现与TCP服务器的通信,包括环境搭建、代码解析及实验步骤。 ... [详细]
  • C# 中创建和执行存储过程的方法
    本文详细介绍了如何使用 C# 创建和调用 SQL Server 存储过程,包括连接数据库、定义命令类型、设置参数等步骤。 ... [详细]
  • 本文介绍了实时流协议(RTSP)的基本概念、组成部分及其与RTCP的交互过程,详细解析了客户端请求格式、服务器响应格式、常用方法分类及协议流程,并提供了SDP格式的深入解析。 ... [详细]
  • 如何在U8系统中连接服务器并获取数据
    本文介绍了如何在U8系统中通过不同的方法连接服务器并获取数据,包括使用MySQL客户端连接实例的方法,如非SSL连接和SSL连接,并提供了详细的步骤和注意事项。 ... [详细]
  • mysql 授权!!
    为什么80%的码农都做不了架构师?MySQL的权限系统围绕着两个概念:认证-确定用户是否允许连接数据库服务器授权-确定用户是否拥有足够的权限执 ... [详细]
author-avatar
临冬将至
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有