作者:蒋易之 | 来源:互联网 | 2023-06-10 15:32
背景业务系统库数据包含了大量历史数据,核心的表超过千万级甚至亿级后,传统在业务库上做数据分析已不合时宜,需要迁移至大数据平台(hivesparksqlimpala)做数据分析,如果
背景
业务系统库数据包含了大量历史数据,核心的表超过千万级甚至亿级后,传统在业务库上做数据分析已不合时宜,需要迁移至大数据平台(hive/spark sql/impala)做数据分析,如果按天全量导入至平台不仅消耗大量服务器资源并且全量读取业务库全表速度也会超慢,这时需要增量导入的功能,因为业务系统的表会用自增ID的标志,可以按天截取新增数据导入平台。
sqoop增量迁移数据方式对比
一种是 append,即通过指定一个递增的列,比如:
–incremental append –check-column num_iid –last-value 0
另种是可以根据时间戳,比如:
–incremental lastmodified –check-column created –last-value ‘2012-02-01 11:0:00’
就是只导入created 比’2012-02-01 11:0:00’更大的数据。
第一种适合业务系统库,一般业务系统表会通过自增ID作为主键标识唯一性。
第二种适合ETL的数据
sqoop append模式使用
1.使用 sqoop create-hive-table 生成 hive表结构
2.定义 sqoop job,实际上是一个通道,通道的始发站为mysql对应的表,终点站为hive对应的表
3.使用 sqoop job执行增量导入
注:自己写个shell定时跑批或者放到调度系统定时执行
下面为整个迁移的脚本示例:
#!/bin/bash
##############################################
## $1:日期 $2:表名
## 第一个参数为日期,第二个参数为mysql表名
##############################################
#配置所在数据库地址
conf_dbhost=xxx
#配置所在数据库用户名
conf_username=xxx
#配置所在数据库密码
conf_password=xxx
#配置所在数据库名
conf_dbname=etl
var_etl_date=`mysql -h $conf_dbhost -u$conf_username -p$conf_password -D $conf_dbname -e "SELECT var_value FROM para_etl_var WHERE var_name='{ETL_DATE}';"`
echo $var_etl_date
sys_date=`date -d'-1 day' +%Y-%m-%d`
if [ ${1} == "-" ]
then
# cur_date='2016-09-23'
cur_date=${var_etl_date:10:10}
echo $cur_date
else
#echo "$1"
cur_date=`date --date="${1}" +%Y-%m-%d`
echo $cur_date
fi
echo "$cur_date"
#exit
year=`date --date=$cur_date +%Y`
mOnth=`date --date=$cur_date +%m`
day=`date --date=$cur_date +%d`
echo "cur_date:"${cur_date}
#hive库名
hdb=rmdb
#hive表名
hive_table=crm_intopieces_dk
#mysql表名
mysql_table=crm_intopieces_dk
#数据仓库基础路径
basedir=/rmdb
#mysql服务器地址
server=xxx
#mysql端口号
port=3306
#mysql数据库名
mysql_database=test
#用户名
username=xxx
#密码
password=xxx
#判断Hive是否存在,不存在执行下面创建语句,否则跳过
#hive -e "use $hdb;select * from $hive_table limit 1;"
if [ $? -ne 0 ]
then
echo "表不存在,执行创建表结构"
sqoop create-hive-table
--connect jdbc:mysql://$server:$port/$mysql_database?tinyInt1isBit=false
--username $username
--password $password
--table $mysql_table
else
echo "表已存在,执行增量导入。。。"
fi
#exit
#
#一种是 append,即通过指定一个递增的列,比如:
#--incremental append --check-column num_iid --last-value 0
#另种是可以根据时间戳,比如:
#--incremental lastmodified --check-column created --last-value '2012-02-01 11:0:00'
#就是只导入created 比'2012-02-01 11:0:00'更大的数据。
echo "创建job"
#append
sqoop job
--create crm_intopieces_dk
-- import --connect jdbc:mysql://$server:$port/$mysql_database?tinyInt1isBit=false
--username $username
--password $password
--table $mysql_table
--hive-import --hive-table $hive_table
--incremental append
--check-column id
--last-value 0
echo "append增量导入模式启动。。。"
sqoop job --exec crm_intopieces_dk
exit