一、实测结果
-
业务无感知,无死锁平滑
-
线上800万数据以下 直接使用 alter 新增字段 300ms左右
-
2000万数据,强制使用主键索引,每次查询50万数据 并插入新表 ,耗时 20s ,cpu 占45%
二、整体步骤
-
创建新表 biz_table_new
-
查询当前表first_max_id
- 确定当前同步数据量
37509688
-
将数据同步至新表
insert into biz_table_new(id,app_trade_no,trade_no,trade_sub_no,uid,account_type,relate_uid,relate_account_type,biz_type,biz_subtype,in_out,amount,balance_before,remark,extra,created_at,updated_at)
select * from journal_2022 force index(PRIMARY)
where id>0 and id <&#61;500000;select sleep(10);
insert into biz_table_new(id,app_trade_no,trade_no,trade_sub_no,uid,account_type,relate_uid,relate_account_type,biz_type,biz_subtype,in_out,amount,balance_before,remark,extra,created_at,updated_at)
select * from biz_table force index(PRIMARY)
where id>500000 and id <&#61;1000000;select sleep(10);
....
- 分批次查询&#xff0c;根据步长设置区间
- 每次执行完毕休眠10s
-
验证数据一致性&#xff0c;截止到first_max_id的数据同步完成
- 根据max_id 验证历史数据是否一致
select count(1) from biz_table where id<&#61;37509688;
select count(1) from biz_table_new where id<&#61;37509688;
-
评估1分钟后的increment值
- 查询当前max_id 如&#xff1a;37609688
- 当前业务&#xff0c;增长量 &#xff0c;计算1分钟内的数据量&#xff0c;如&#xff1a;200000
- 得出预计的increment 值 如&#xff1a;37609688&#43;200000&#61;37809688
-
执行表迁移
- 设置新表的Increment 值
- 重命名 biz_table 为 biz_table_bak
- 重命名 new_table 为 biz_table
alter table biz_table_new AUTO_INCREMENT &#61;37809688;
rename table biz_table to biz_table_bak;
rename table biz_table_new to biz_table;
-
同步新增数据量
- 计算biz_table_bak 的max_id 为 last_max_id 如
37808688
- 根据first_max_id () 、last_max_id 获取此区间数据&#xff0c;同步至新表
insert into biz_table_new(id,app_trade_no,trade_no,trade_sub_no,uid,account_type,relate_uid,relate_account_type,biz_type,biz_subtype,in_out,amount,balance_before,remark,extra,created_at,updated_at)
select * from biz_table force index(PRIMARY)
where id>37509688 and id <&#61;37808688
-
数据同步完毕
三、GO脚本分享
package utils
import "fmt"
const diffTmp &#61; &#96;
insert into %s(id,uid,account_type,balance,frozen,status,created_at,updated_at,app_alias)
select *,"lailiao" as app_alias from %s_bak force index(PRIMARY)
where id>%d and id <&#61;%d;select sleep(1);
&#96;
func SyncDataV2(tmp string, oldTable, newTable string, step, minId, maxId int) string {
if tmp &#61;&#61; "" {
tmp &#61; defaultTmp
}
var sql string
if minId < 0 {
minId &#61; 0
}
var nextId int
for i :&#61; minId; i < maxId; i&#43;&#43; {
nextId &#61; minId &#43; step
if nextId > maxId {
nextId &#61; maxId
}
sql &#43;&#61; fmt.Sprintf(tmp, newTable, oldTable, minId, nextId)
minId &#61; nextId
if minId &#61;&#61; maxId {
break
}
}
return sql
}
func SyncDiffData(tmp string, oldTable, newTable string, step, minId, maxId int) string {
if tmp &#61;&#61; "" {
tmp &#61; diffTmp
}
var sql string
if minId < 0 {
minId &#61; 0
}
var nextId int
for i :&#61; minId; i < maxId; i&#43;&#43; {
nextId &#61; minId &#43; step
if nextId > maxId {
nextId &#61; maxId
}
sql &#43;&#61; fmt.Sprintf(tmp, newTable, oldTable, minId, nextId)
minId &#61; nextId
if minId &#61;&#61; maxId {
break
}
}
return sql
}
func VerifyData(oldTable, newTable string, max int) string {
sql :&#61; fmt.Sprintf("select count(1) from %s where id<&#61;%d;", oldTable, max)
sql &#43;&#61; fmt.Sprintf("select count(1) from %s where id<&#61;%d;", newTable, max)
return sql
}
func RenameTable(oldTable, newTable string, incrementId int) (sql string) {
sql &#43;&#61; fmt.Sprintf("alter table %s AUTO_INCREMENT &#61;%d;", newTable, incrementId)
sql &#43;&#61; fmt.Sprintf("rename table %s to %s;", oldTable, oldTable&#43;"_bak")
sql &#43;&#61; fmt.Sprintf("rename table %s to %s;", newTable, oldTable)
return sql
}
func TestAccount(t *testing.T) {
oldTable :&#61; "account"
newTable :&#61; "account_new"
step :&#61; 100000
min :&#61; 0
max :&#61; 952317
tmp :&#61; &#96;
insert into %s(id,uid,account_type,balance,frozen,status,created_at,updated_at,app_alias)
select *,"xxx" as app_alias from %s force index(PRIMARY)
where id>%d and id <&#61;%d;select sleep(10);
&#96;
template :&#61; SyncDataV2(tmp, oldTable, newTable, step, min, max)
t.Log(template)
verify :&#61; VerifyData(oldTable, newTable, max)
t.Log("Verify:\n",verify)
increment :&#61; max &#43; 20000
sql :&#61; RenameTable(oldTable, newTable, increment)
t.Log("SyncData:\n",sql)
tmp &#61; &#96;
insert into %s(id,uid,account_type,balance,frozen,status,created_at,updated_at,app_alias)
select *,"lailiao" as app_alias from %s_bak force index(PRIMARY)
where id>%d and id <&#61;%d;select sleep(1);
&#96;
template &#61; SyncDiffData(tmp, oldTable, oldTable, step, max, increment)
t.Log(template)
}