Sqoop provides an incremental import mode which can be used to retrieve only rows newer than some previously-imported set of rows.
The following arguments control incremental imports:
Table 4. Incremental import arguments:
Argument |
Description |
--check-column (col) |
Specifies the column to be examined when determining which rows to import. |
--incremental (mode) |
Specifies how Sqoop determines which rows are new. Legal values for mode include append and lastmodified . |
--last-value (value) |
Specifies the maximum value of the check column from the previous import. |
Sqoop supports two types of incremental imports: append
and lastmodified
. You can use the --incremental
argument to specify the type of incremental import to perform.
You should specify append
mode when importing a table where new rows are continually being added with increasing row id values. You specify the column containing the row’s id with --check-column
. Sqoop imports rows where the check column has a value greater than the one specified with --last-value
.
An alternate table update strategy supported by Sqoop is called lastmodified
mode. You should use this when rows of the source table may be updated, and each such update will set the value of a last-modified column to the current timestamp. Rows where the check column holds a timestamp more recent than the timestamp specified with --last-value
are imported.
At the end of an incremental import, the value which should be specified as --last-value
for a subsequent import is printed to the screen. When running a subsequent import, you should specify --last-value
in this way to ensure you import only the new or updated data. This is handled automatically by creating an incremental import as a saved job, which is the preferred mechanism for performing a recurring incremental import. See the section on saved jobs later in this document for more information.
通过官网不难看出 sqoop 的增量导入需要三个参数 --check-column --incremental --last-value
时间增量举例:
首先数据库的数据如下图所示:
现在的时间是 8月27号。28,29 号 的日期是测试数据,观察结果使用;这里想实现的功能就是,每天12点导入一次数据,这样就得使用时间增量导入;
脚本如下:
#!/bin/bash
#Set the RDBMS connection params
rdbms_cOnnstr="jdbc:oracle:thin:@192.168.0.105:1521:ORCL"
rdbms_username="hbea"
rdbms_pwd="fulong"
#Set the source table in RDBMS
rdbms_table="action_log"
rdbms_columns="ACTION_TIME,USER_ID,RCOOKIE_ID,SESSION_ID,LOGIN_STATUS,USER_NEW_OR_OLD,ITEM_ID,ITEM_URL,ITEM_PRICE,ITEM_NUM,ORDER_TOTAL,ITEM_TYPE,ACTION_TYPE,SEARCH_WORD,SEARCH_USE_URL,SEARCH_USE_TITLE,NATION_ENGLISH,NATION_CHINESE,LOCATION_AREA_CODE,LOCATION_AREA_WORD,TERMINAL_SIZE,TERMINAL_TYPE,IP,LL_GRID,APP_SYSTEM_ID,OS,IF_FROMREC,EXPLORER,EQUIPMENT,PAGE_TYPE"
#Set the hive tables
hive_full_table="T_AC_RAW_ACTION_LOG"
hive_database="HEBEI_SP_RECS"
datenow=$(date -d last-day +%Y-%m-%d)
#---------------------------------------------------------
#Import init data in RDBMS into Hive
$SQOOP_HOME/bin/sqoop import --connect ${rdbms_connstr} --username ${rdbms_username} --password ${rdbms_pwd} --table ${rdbms_table} --columns "${rdbms_columns}" --hive-import --hive-database ${hive_database} --hive-table ${hive_full_table} --fields-terminated-by "\t" --lines-terminated-by "\n" --hive-drop-import-delims –-check-column ACTION_TIME –-incremental lastmodified --last-value ${datenow} -m 5 --split-by "RCOOKIE_ID";
红色部分就是 增量的一个参数 意思是 根据 action_time 去增量,时间增量 用lastmodified 这个参数,增量部分从 lastvalue开始,这里的datenow获取的是当天时间-1;到底这样当天时间-1会得到什么结果呢,下面来看一下脚本日志;
执行脚本,得到脚本有用信息:
从这里看出来 sqoop 是这样过滤数据的,使用的 action_time >= ${datenow} and action_time<当前时间 过滤出来的数据;
拿这句话到oracle执行,得到结果:
然后去hive 看一下结果 : 我的hive指向的是hdfs存储,所以这里我就不查询hive了,直接去hdfs 上看数据
数据一致;
这里大家可以观察到 他并没有把数据库28,29 日期的数据给导入进去,说明他的增量时间指的结束时间就是当前时间!而不是所有大于${datenow}的时间;
还有一个问题就是,我想要的其实就是今天的数据,不包含26的数据,所以脚本中,可以吧datenow变量换成当天的时间;
datenow=$(date +%Y-%m-%d)
再次执行
数据出来的就只有今天的时间;搞定OK;