功能:
将mysql指定表的数据导入到mongodb的指定表,导入过程保证数据不丢失,如果数据有更新也会重新再导。
要求:
源表必须要有两个字段:id:主键 mmm_ts:最后更新时间戳
require 'rubygems'
require 'mongo'
require 'active_record'
mongo_server =
{:local =>
{:server => "localhost", :port => 27017}
}
mysql_server =
{:local =>
{
:adapter => 'mysql',
:host => 'localhost',
:username => 'root',
:password => '$$$',
:encoding => 'utf8'
} ,
:main_db =>
{
:adapter => 'mysql',
:host => 'mydb.com',
:username => 'abc',
:password => 'cba',
:encoding => 'utf8'
}
}
#从mysql指定的表中将数据导入到mongodb中
#源表必须有两个字段:id,自增, mmm_ts, 时间戳
#导数据的时候先按上次操作截止的时间戳获取指定数量的数据,然后再将数据存入到目标库中(如果id相同则更新数据)
#导入完毕更新截止点
class MMM
def initialize(mysql_server, mongo_server, operation_type)
@mysql_server= mysql_server
@mongo_server = mongo_server
@operation_type = operation_type
end
def migrate(source_db, source_table, process_items_each_time, target_db, target_table)
mongodb = Mongo::Connection.new(@mongo_server[:server], @mongo_server[:port]).db(target_db)
mongo_collection = mongodb.collection(target_table)
@mysql_server[:database] = source_db
ActiveRecord::Base.establish_connection(@mysql_server)
model = Class.new(ActiveRecord::Base) do
set_table_name source_table
end
operation_point = OperationPoint.new(mongodb)
opt = operation_point.get(@operation_type)
if opt == nil
opt = 0
mongo_collection.create_index("id", :unique => true)
else
opt = opt["point"].getlocal()
end
cnt = model.count( :conditions => ["mmm_ts=?", opt])
#防止在过多mmm_ts字段的值都一样的情况下无法进行到下一步,这里一定要避免大量mmm_ts字段的值相同
process_items_each_time = cnt + 1 if cnt >= process_items_each_time
records = model.find(:all, :conditions => ["mmm_ts>=?", opt], :order => "mmm_ts", :limit => process_items_each_time)
puts("#{records.length} records read from mysql")
records.each do |record|
record_hash = {}
model.column_names.each do |column|
record_hash[column] = record.read_attribute(column)
end
mongo_collection.update({:id => record.id}, record_hash, :upsert => true)
opt = record.mmm_ts
end
operation_point.save(@operation_type, opt)
puts("#{records.length} records saved to mongodb")
end
end
#操作点
class OperationPoint
def initialize(db)
@table = db.collection("operation_point")
@table.create_index("operation", :unique => true)
end
def save(operation, point)
@table.update({:operation => operation}, {:operation => operation, :point => point}, :upsert => true)
end
def get(operation)
return @table.find_one({:operation => operation})
end
end
mmm = MMM.new(mysql_server[:main_db], mongo_server[:local], "export_mdb")
while (true) do
begin
mmm.migrate("mdb_production", "bet_plans", 3000, "mdb_production", "export_mdb")
rescue Exception => ex
puts ex.message
puts ex.backtrace.join("\n")
sleep(30)
ensure
sleep(0.001)
end
end
分享到:
![18e900b8666ce6f233d25ec02f95ee59.png](https://img.php1.cn/3cd4a/1eebe/cd5/ed19db63ee478b98.png)
![72dd548719f0ace4d5f9bca64e1d7715.png](https://img.php1.cn/3cd4a/1eebe/cd5/8343fdbffb0056b5.webp)
2011-05-21 12:01
浏览 2089
评论