作者:mobiledu2502909447 | 来源:互联网 | 2023-09-25 17:42
一、需求:
将MongoDB表中的数据按照时间戳增量抽取到Mysql表中。
二、实现方式:
1. kettle
2. python 脚本
三、遇到的问题:
kettle 如何将增量时间作为变量传入到MongoDB Input 中?
翻遍了百度,没有一篇文章能写明白的,本人将实现方式详尽记载下来,供大家参考!
四、具体实现:
1. kettle实现
实现思路:
step1: 先全量将MongoDB 导入到 Mysql 表中。(这个不难,百度中有实现。)
step2:创建第一个Transformation,获取Mysql 中最大时间字段 例如:Max(time),将其存入到Mysql的一张表中。
step2-1: 配置表输入(目的:为了取得全量表中时间字段最大值)
step2-2:设置变量
step2-3:将变量值输出到表T1中
step3:创建第二个Transformation,配置MongoDB Input -> Mysql
step3-1 表输入
step3-2 设置变量
step3-3 配置MongoDB Input
step3-4 配置字段选择
step3-5 表输出
step4:创建作业执行step2 和 step3 两个转换。
具体步骤如下:
step1:先全量将MongoDB 导入到 Mysql 表中。(略)
mongo_test 表(全量表)
step1-1
step2:创建第一个Transformation,获取Mysql 中最大时间字段 例如:Max(time),将其存入到Mysql的一张表中。
step2
step2-1 表输入
step2-2 设置变量
step2-3 将变量值保存到表t1中
step2 t1 表
step3 :创建第二个Transformation,配置MongoDB Input -> Mysql
step3
step3-1 表输入
step3-2 设置变量
step3-3 配置MongoDB Input
注意写法
这里需要注意:STRAT_TIME是step3-2中的变量名,这样就可以获取到变量值
step3-4 配置字段选择
step3-5 表输出
step4:创建作业执行step2 和 step3 两个转换。
测试:
1. 当前t1 中的时间为:
2018-05-19 00:00:00
2. 向MongoDB中添加一条数据
> db.xk.insert({“name”:”发发发发发财”,”age”:”30″,”time”:”2018-05-20 00:00:00″})
3. 执行job
4. 查看结果:
以上结束:
python 脚本方式也很简单:
==============================================================
#coding=utf-8
# Python 实现 从MongoDB 想MySql 增量导数据
from pymongo import MongoClient
import pymysql
# mongodb
client = MongoClient(‘192.168.107.128’, 27017)
TempleSpider = client[‘xk’]
temple_comment_collect = TempleSpider[‘xk’]
# mysql
mysql = pymysql.connect(‘localhost’, ‘root’, ‘123456’, ‘test’, charset=”utf8″)
# insert sql
sql = ‘insert into mongo_test values(%s,%s,%s)’
# 批量提交
def batch_commit(connect, target_sql, result):
cursor = connect.cursor()
list = []
i = 0
try:
for row in result:
data = (row[‘name’].encode(‘utf-8’), row[‘age’], row[‘time’])
print(row[‘name’])
list.append(data)
if i >= 1000:
cursor.executemany(target_sql, list)
connect.commit()
print(‘1000 条插入成功…’)
i = 0
list.clear()
i = i+1
if i > 0:
cursor.executemany(target_sql, list)
connect.commit()
print(‘%d 条插入成功……’ % i)
except Exception as e:
print(e)
print(“增量数据导入失败!!!”)
# 回滚
connect.rollback()
records = temple_comment_collect.find({“time”: {“$gt”: “2018-05-15 00:00:00”}})
batch_commit(mysql, sql, records)
client.close()
mysql.close()
==============================================================