使用Python将MongoDB数据同步到Elasticsearch
版本说明:Python 3.7 PyMongo:3.11.0 Elasticsearch:5.5.3
话不多说直接鲁码(如遇到什么问题欢迎留言讨论)
from pymongo import MongoClient
from elasticsearch import Elasticsearch, helpers
import json
import logging
CONN_ADDR1 = '更换为自己的MongoDB地址'
USERNAME = 'MongoDB用户名'
PASSWORD = 'MongoDB密码'
DB = "MongoDB的库"
COLLECTION = "MongoDB集合"
class ElasticObj:def __init__(self, index_name, index_type, ip):""":param index_name: 索引名称:param index_type: 索引类型"""self.index_name = index_nameself.index_type = index_typeself.es = Elasticsearch([ip], http_auth=('ES用户名', 'ES密码'), port=9200)def create_index(self):'''创建索引,创建索引名称为ott,类型为ott_type的索引:param ex: Elasticsearch对象:return:'''_index_mappings = {"mappings": {self.index_type: {"properties": {"name": {'type': 'text'},"password": {'type': 'text'},"birthplace": {'type': 'text'}}}}}if self.es.indices.exists(index=self.index_name) is not True:res = self.es.indices.create(index=self.index_name, body=_index_mappings, ignore=400)print(res)def detailedlog(self):logging.basicConfig(format='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s',level=logging.DEBUG)logging.debug('debug 信息')logging.info('info 信息')logging.warning('warning 信息')logging.error('error 信息')logging.critical('critial 信息')def insert_data(self, p=0):ACTIONS = []p = 1bulk_num = 200client = MongoClient(CONN_ADDR1, port=27017)client.admin.authenticate(USERNAME, PASSWORD)mydb = client[DB]mycol = mydb[COLLECTION]Cursor = mycol.find({}, no_cursor_timeout=True)n = 0for mongoRecordRes in Cursor:data = []n = n + 1print("开始处理数据" + str(n) + "条")data_value = []data_name = ["corporateAnnualInfoId", "companyId", "nbYear", "annualCount", "basicInfo", "onlineStoreInfo","shareContributive", "frimAssetInfo", "socialInfo"]。。。(这里编写自己处理数据的逻辑)data_name_value = dict(zip(data_name, data_value))data.append(data_name_value)for list_line in data:action = {"_index": self.index_name,"_type": self.index_type,"_id": list_line["corporateAnnualInfoId"], "_source": {"corporateAnnualInfoId": list_line["corporateAnnualInfoId"],"companyId": list_line["companyId"],"nbYear": list_line["nbYear"],"annualCount": list_line["annualCount"],"basicInfo": list_line["basicInfo"],"onlineStoreInfo": list_line["onlineStoreInfo"],"shareContributive": list_line["shareContributive"],"frimAssetInfo": list_line["frimAssetInfo"],"socialInfo": list_line["socialInfo"]}}p += 1ACTIONS.append(action)print(ACTIONS)if len(ACTIONS) == bulk_num:print('插入', p / bulk_num, '批数据')print(len(ACTIONS))success, _ = helpers.bulk(self.es, ACTIONS, index=self.index_name, raise_on_error=True)del ACTIONS[0:len(ACTIONS)]print(success)if len(ACTIONS) > 0:success, _ = helpers.bulk(self.es, ACTIONS, index=self.index_name, raise_on_error=True)del ACTIONS[0:len(ACTIONS)]print('Performed %d actions' % success)if __name__ == '__main__':obj = ElasticObj("IndexName", "IndexType",ip="es-cn-********.com")obj.detailedlog()obj.insert_data()