作者:pea101 | 来源:互联网 | 2023-07-29 18:20
我们在大数据进行数据抽取(入湖)中离不开从源系统的数据库同步数据到大数据平台(hive)中。我的平台(带有调度工具):联想大数据平(LeapHD)。
同步的方式有两种:
1.增量抽取:即每次抽取表的全部数据然后覆盖表的全部数据。频率一般每天抽取或者每周每月抽取,具体看业务需求。
1.1在hive中建一张结构与源表的相同表
eg:
源表(mysql)的结构:
表名:user 字段为:user_id int,user_name varchar,create_time datetime,phone varchar ;
hive建表:
create table ods.user_id (
etl_date string COMMENT '入湖时间',
user_id string COMMENT '客户id',
user_name string COMMENT '客户姓名',
create_time string COMMENT '创建时间',
phone string COMMENT '电话'
)comment '客户信息表'
row format delimited fields terminated by '\001' --换行方式
1.2 同步 用sqoop 或者用kettle工具同步。我用的是联想平台的自带的etl工具。
抽取全量的源表,进行任务调度的设置,是每天或者每周每月(看业务需求)。
select
'${now}' etl_time,
user_id ,
user_name ,
create_time ,
phone
from dd.user
导入到创建的hive表
ods.user_id
2.增量导入
增量方式的话我处理的方法是创建分区表,按照同步方式建立相应的分区。比如在单表数据量较大(比如:100万以上)的我会按照日分区,数据量较小的按照月分区。
2.1日分区
2.1.1 创建日分区表
create table ods.user_id (
etl_date string COMMENT '入湖时间',
user_id string COMMENT '客户id',
user_name string COMMENT '客户姓名',
create_time string COMMENT '创建时间',
phone string COMMENT '电话'
)comment '客户信息表'
partitioned by (day_id string comment '日分区')
row format delimited fields terminated by '\001' --换行方式
2.1.2 进行表的初始化数据(跟全量的操作差不多,多了分区)
全量抽取
select
'${now}' etl_time,
user_id ,
user_name ,
create_time ,
phone
from dd.user
导入到创建的hive表指定临时分区
ods.user_id[day_id='1']
2.1.3 将临时分区导入到动态分区
insert overwrite table ods.user_id partition(day_id)
select
etl_time ,
user_id ,
user_name ,
create_time ,
phone ,
substr(create_time,1,10) as day_id --日期格式为 YYYY-MM-DD
from ods.user_id partition
where day_id='1'
2.1.4 删除表的临时分区
alter table ods.user_id drop partition (day_id='1');
2.1.5 完成初始化后就是增量导入增量数据
设置把昨天的日期设置成变量:{last_day} = {now} - 1
抽取昨天的数据
select
'${now}' etl_time ,
user_id ,
user_name ,
create_time ,
phone
from dd.user
where substr(create_time,1,10)='{last_day}'
同理,抽数放到临时分区
ods.user_id[day_id='000']
2.1.6 将临时分区换成动态分区
insert overwrite table ods.user_id partition(day_id)
select
etl_time ,
user_id ,
user_name ,
create_time ,
phone ,
substr(create_time,1,10) as day_id --日期格式为 YYYY-MM-DD
from ods.user_id partition
where day_id='000'
2.1.7 删除临时分区
alter table ods.user_id drop partition (day_id='000');
这样就完成每天的抽数定时任务。