Apache Sqoop Cookbook 笔记
一导入:
1 全表导入(mysql 2 hdfs)
sqoop import \
--connectjdbc:mysql://mysql.example.com/sqoop \
--username sqoop \
--password sqoop \
--tablecities
2 全表导入(mysql 2hdfs)指定到入文件夹
sqoop import \
--connectjdbc:mysql://mysql.example.com/sqoop \
--username sqoop \
--passwordsqoop \
--table cities \
--target-dir/etl/input/cities
或者
sqoop import \
--connectjdbc:mysql://mysql.example.com/sqoop \
--username sqoop \
--password sqoop \
--table cities \
--warehouse-dir/etl/input/
指定的文件夹必须不能存在,sqoop会拒绝向已经存在的文件夹写入,以避免覆盖数据
3 部分表导入(mysql 2 hdfs)
sqoop import \
--connectjdbc:mysql://mysql.example.com/sqoop \
--username sqoop \
--password sqoop \
--table cities \
--where"country= 'USA'"
Where语句中也可以放方法,甚至自定义函数,但是sqoop是集群并行执行的,要小心权衡,避免在集群中对相关数据库做过多操作,导致负载高等问题;如果使用函数,函数应该尽量简单
4 保护密码
sqoop import \
--connectjdbc:mysql://mysql.example.com/sqoop \
--username sqoop \
--table cities \
-P
这个会命令sqoop从键盘输入中获取密码
sqoop import \
--connect jdbc:mysql://mysql.example.com/sqoop\
--username sqoop \
--table cities \
--password-filemy-sqoop-password
将密码保存在文件中
echo "my-secret-password">sqoop.password
hadoop dfs -put sqoop.password /user/$USER/sqoop.password
hadoopdfs -chown 400 /user/$USER/sqoop.password
rm sqoop.password
sqoopimport --password-file /user/$USER/sqoop.password...
创建密码文件,设置权限为400,其他用户无法操作和查看
5 用除了“逗号分隔”的方式(Avro、SequenceFile.)
sqoop import \
--connectjdbc:mysql://mysql.example.com/sqoop \
--username sqoop \
--password sqoop \
--table cities \
--as-sequencefile
或者
sqoop import \
--connectjdbc:mysql://mysql.example.com/sqoop \
--username sqoop \
--password sqoop \
--table cities \
--as-avrodatafile
6 压缩导入数据
sqoop import \
--connect jdbc:mysql://mysql.example.com/sqoop\
--username sqoop \
--table cities \
--compress
也可以指定压缩格式
sqoop import --compress \
--compression-codecorg.apache.hadoop.io.compress.BZip2Codec
7 更快的传输
sqoop import \
--connectjdbc:mysql://mysql.example.com/sqoop \
--username sqoop \
--table cities \
--direct
不是支持每一个服务器,mysql可以,其他不一定,必须在各个节点都安装了相关数据库的本地jar包
8 覆盖表的映射类型类型
sqoop import \
--connectjdbc:mysql://mysql.example.com/sqoop \
--usernamesqoop\
--table cities \
-map-column-javac1=Float,c2=String,c3=String ...
9 设置并行map数量
sqoop import \
--connectjdbc:mysql://mysql.example.com/sqoop \
--username sqoop \
--password sqoop \
--table cities \
--num-mappers10
10 覆盖null值
sqoop import\
--connectjdbc:mysql://mysql.example.com/sqoop \
--username sqoop \
--password sqoop \
--table cities \
--null-string 'NULLSTR' \
--null-non-string'NULLNON'
导入hbase这个配置好像无效
11 导入全部表
sqoopimport-all-tables \
--connectjdbc:mysql://mysql.example.com/sqoop \
--username sqoop \
--passwordsqoop
或者
导入全部表,但是除了….
sqoop import-all-tables \
--connectjdbc:mysql://mysql.example.com/sqoop \
--username sqoop \
--password sqoop \
--exclude-tablescities,countries
二增量导入:
1 只增量导入新数据
sqoop import \
--connectjdbc:mysql://mysql.example.com/sqoop \
--username sqoop \
--password sqoop \
--table visits \
--incremental append \
--check-column id \
--last-value1
过去的数据不会被重新载入,即时修改过
2 增量导入易变数据
sqoop import \
--connectjdbc:mysql://mysql.example.com/sqoop \
--usernamesqoop \
--password sqoop \
--table visits \
--incremental lastmodified \
--check-column last_update_date \
--last-value"2013-05-2201:01:01"
3建立job(可自动实现每次导入最新)
sqoop job \
--createvisits \
-- \
import \
--connectjdbc:mysql://mysql.example.com/sqoop \
--username sqoop \
--password sqoop \
--table visits \
--incremental append \
--check-column id \
--last-value0
执行
sqoop job --execvisits
看都有哪些job
sqoop job –list
删除job
sqoop job--delete visits
查看job
sqoop job --showvisits
用job会自动
4 job的密码存储
可以直接在定义时写,也可以写到配置文件中
...
5 在运行时增加(或重写)job参数
sqoop job --execvisits -- --verbose
如果是临时使用要特别小心,因为job无论你是采用哪种方式执行,都会把最后一行的信息存入元数据,这样有可能会丢失信息
6 机器间共享元数据
首先开启服务
sqoop metastore
然后执行时指定
sqoop job
--create visits \
--meta-connectjdbc:hsqldb:hsql://metastore.example.com:16000/sqoop\
-- \
import \
--table visits
...
配置
sqoop-site.xml
服务端口:sqoop.metastore.server.port
自动连接metastore:
...
三 从多个表自由导入(用query)
1 从两个表导入(有外键关联)
sqoop import \
--connectjdbc:mysql://mysql.example.com/sqoop \
--username sqoop \
--password sqoop \
--query 'SELECTnormcities.id, \
countries.country, \
normcities.city \
FROM normcities \
JOIN countries USING(country_id) \
WHERE $CONDITIONS' \
--split-by id \
--target-dircities
2 用自定义边界的query
sqoop import \
--connectjdbc:mysql://mysql.example.com/sqoop \
--username sqoop \
--password sqoop \
--query 'SELECTnormcities.id, \
countries.country, \
normcities.city \
FROM normcities \
JOIN countries USING(country_id) \
WHERE $CONDITIONS' \
--split-by id \
--target-dir cities \
--boundary-query"selectmin(id), max(id) from normcities"
3 给sqoop内部的mapreducejob起名
sqoop import \
--connectjdbc:mysql://mysql.example.com/sqoop \
--username sqoop \
--password sqoop \
--query 'SELECTnormcities.id, \
countries.country, \
normcities.city \
FROM normcities \
JOIN countries USING(country_id) \
WHERE $CONDITIONS' \
--split-by id \
--target-dir cities \
--mapreduce-job-namenormcities
4 从多表导入时有重复列
--query "SELECT\
cities.city AS first_city \
normcities.city AS second_city \
FROM cities \
LEFTJOIN normcities USING(id)"
四 导出
1 从hadoop环境导出到mysql数据库
sqoop export\
--connectjdbc:mysql://mysql.example.com/sqoop \
--username sqoop \
--password sqoop \
--table cities \
--export-dircities
2 批量导出
首先
sqoop export\
--connectjdbc:mysql://mysql.example.com/sqoop \
--username sqoop \
--password sqoop \
--table cities \
--export-dir cities \
--batch
定义每个语句的数量
sqoop export\
-Dsqoop.export.records.per.statement=10 \
--connectjdbc:mysql://mysql.example.com/sqoop \
--username sqoop \
--password sqoop \
--table cities \
--export-dircities
然后定义每个事物提交的数量
sqoop export\
-Dsqoop.export.statements.per.transaction=10\
--connectjdbc:mysql://mysql.example.com/sqoop \
--username sqoop \
--password sqoop \
--table cities \
--export-dircities
3 保证事务性(或者全部转移成功或者都不转移)
sqoop export\
--connectjdbc:mysql://mysql.example.com/sqoop \
--username sqoop \
--password sqoop \
--table cities \
--staging-tablestaging_cities
用中间表
可以加下面这句来清空中间表
--clear-staging-table
4 更新已经存在的数据集
sqoop export\
--connectjdbc:mysql://mysql.example.com/sqoop \
--username sqoop \
--password sqoop \
--table cities \
--update-keyc2,c4
5 一次插入和更新
sqoop export\
--connectjdbc:mysql://mysql.example.com/sqoop \
--username sqoop \
--password sqoop \
--table cities \
--update-key id \
--update-modeallowinsert
6 使用存储过程
sqoop export\
--connectjdbc:mysql://mysql.example.com/sqoop \
--username sqoop \
--password sqoop \
--callpopulate_cities
7 导出的列少于目标表的列时
指定hdfs中存储的是哪些列
sqoop export\
--connectjdbc:mysql://mysql.example.com/sqoop \
--username sqoop \
--password sqoop \
--table cities \
--columnscountry,city
8 为null值设置不同的值
sqoop export\
--connectjdbc:mysql://mysql.example.com/sqoop \
--username sqoop \
--password sqoop \
--table cities \
--input-null-string '\\N'\
--input-null-non-string'\\N'
9 导出数据出错
读hadoop的日志
五 hadoop生态系统集成
1 和oozie整合,用oozie启动sqoop命令
...
...
为避免无法识别command里的空格,可以用如下方法
...
等待
2 oozie中的command指令无需转移
比如
sqoopimport --password "spEci@l\$" --connect'jdbc:x:/yyy;db=sqoop'
可直接写成
[
3 oozie中配置属性参数
...
...
4 安装 oozie JDBC Driver
放到相关目录下,如需,查本书(Apache Sqoop Cookbook)
5 将数据直接导入hive
sqoop import \
--connectjdbc:mysql://mysql.example.com/sqoop \
--username sqoop \
--password sqoop \
--table cities \
--hive-import
改变键的类型可以用如下
sqoop import \
...
--hive-import \
--map-column-hiveid=STRING,price=DECIMAL
可以用如下命令,会清空这个hive表,再导入
--hive-overwrite
6 导入分区的hive表
sqoop import \
--connect jdbc:mysql://mysql.example.com/sqoop\
--username sqoop \
--password sqoop \
--table cities \
--hive-import \
--hive-partition-key day \
--hive-partition-value"2013-05-22"
7 导入hive时去掉源数据中的在hive中是分隔符的字符
sqoop import \
--connectjdbc:mysql://mysql.example.com/sqoop \
--username sqoop \
--password sqoop \
--table cities \
--hive-import \
--hive-drop-import-delims
会去掉\n, \t, 和 \01
或者将这些改成指定字符串:
sqoop import \
--connectjdbc:mysql://mysql.example.com/sqoop \
--username sqoop \
--password sqoop \
--table cities \
--hive-import \
--hive-delims-replacement"SPECIAL"
8 导入hive时null值的处理
sqoop import \
--connectjdbc:mysql://mysql.example.com/sqoop \
--username sqoop \
--password sqoop \
--table cities \
--hive-import \
--null-string '\\N'\
--null-non-string'\\N'
9 导入hbase
sqoop import \
--connectjdbc:mysql://mysql.example.com/sqoop \
--username sqoop \
--password sqoop \
--table cities \
--hbase-table cities \
--column-familyworld
如果需要自动创建则添加
--create-hbasetable
10 将行健也存入列中(本来是不存的)
sqoop import \
-Dsqoop.hbase.add.row.key=true\
--connectjdbc:mysql://mysql.example.com/sqoop \
--username sqoop \
--password sqoop \
--table cities \
--hbase-table cities \
--column-familyworld
11 导入hbase时提高性能
提前建立hbase表,将分区设的比较大
hbase> create 'cities', 'world', {NUMREGIONS=>20,SPLITALGO=>'HexString
Split'}
六 各种数据库专门的连接器
1postgreSQL中loolean值的重写
sqoop import \
--connectjdbc:postgresql://postgresql.example.com/database \
--username sqoop \
--password sqoop \
--direct \
--table table_with_booleans \
-- \
--boolean-true-string 1 \
--boolean-false-string 0
将true改成1,false改成0
这两个参数只有导入时有用,导出不生效
2 postgreSql 导入Custom Schema
sqoop import \
--connectjdbc:postgresql://postgresql.example.com/database \
--username sqoop \
--password sqoop \
--table cities \
-- \
--schema us
3 postgreSql pg_bulkload
4 连接mysql问题
5 直接从mysql导入到hive时null值的设置
sqoop import \
--connectjdbc:mysql://mysql.example.com/sqoop \
--username sqoop \
--password sqoop \
--table cities \
--hive-import \
--null-string '\\N'\
--null-non-string '\\N'
6 导到mysql时自动辨别更新或插入
--update-key
用来标识唯一列,用于判断是更新还是插入
7 oracle 导入时用户名密码区分大小写
sqoop import \
--connectjdbc:oracle:thin:@oracle.example.com:1521/ORACLE \
--username SQOOP \
--password sqoop \
--table KATHLEEN.cities
8 导入oracle时 使用Synonyms
sqoop import \
--connectjdbc:oracle:thin:@oracle.example.com:1521/ORACLE \
--username SQOOP \
--password sqoop \
--table CIT \
--driver oracle.jdbc.OracleDriver \
--connection-managerorg.apache.sqoop.manager.GenericJdbcManager
9 oracle时提高传输速度
用OraOop
10 用oraoop将oracle导入avro
sqoop import \
-Doraoop.timestamp.string=false\
--connectjdbc:oracle:thin:@oracle.example.com:1521/ORACLE \
--username SQOOP \
--password sqoop \
--table cities \
--as-avrodatafile
sqoop import \
-Doraoop.timestamp.string=false\
--connectjdbc:oracle:thin:@oracle.example.com:1521/ORACLE \
--username SQOOP \
--password sqoop \
--table cities \
--as-avrodatafile \
--map-column-java CREATED=String,UPDATED=String
11 为oracle选择适当的连接器
首选OraOop,不行再用内置连接器,不行再用the Generic JDBCConnector
Choose the OraOop connector:
sqoop import \
--connection-managercom.quest.oraoop.OraOopConnManager \
--connectjdbc:oracle:thin:@oracle.example.com:1521/ORACLE \
--username SQOOP \
--password sqoop \
--table cities
Choose the built-in Oracle connector:
sqoop import \
--connection-managerorg.apache.sqoop.manager.OracleManager \
--connectjdbc:oracle:thin:@oracle.example.com:1521/ORACLE \
--username SQOOP \
--password sqoop \
--table cities
And finally, choose the Generic JDBCConnector:
sqoop import \
--connection-managerorg.apache.sqoop.manager.GenericJdbcManager \
--driver oracle.jdbc.OracleDriver \
--connectjdbc:oracle:thin:@oracle.example.com:1521/ORACLE \
--username SQOOP \
--password sqoop \
--table cities
12 导出到Teradata
sqoop export\
-Dsqoop.export.records.per.statement=1\
--connectjdbc:teradata://teradata.example.com/DATABASE=database\
--username sqoop \
--password sqoop \
--table cities\
--export-dir cities
13 导出到Teradata时使用ClouderaTeradata连接器
下载Cloudera Teradata Connector
14 导出到Teradata时使用长列名
sqoop import \
--connectjdbc:teradata://teradata.example.com/DATABASE=database\
--username sqoop \
--password sqoop \
--query "SELECTREALLY_LONG_COLUMN_NAME_30CHAR AS shorter_column_name \
FROM table"