热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

使用Sqoop从MySQL同步表到Hive集群

Sqoop是Cloudera公司创造的一个数据同步工具,现在已经完全开源了。目前已经是hadoop生态环境中数据迁移的首选,另外还有ali开发的DataX属于同类型工具,由于社

Sqoop 是 Cloudera 公司创造的一个数据同步工具,现在已经完全开源了。 

目前已经是 hadoop 生态环境中数据迁移的首选,另外还有 ali 开发的 DataX 属于同类型工具,由于社区的广泛使用和文档的健全,调研之后决定使用 Sqoop 来做我们之后数据同步的工具。

我们首先来看下 Sqoop 的工作流

 

 他将我们传统的关系型数据库 | 文件型数据库 | 企业数据仓库 同步到我们的 hadoop 生态集群中。

同时也可以将 hadoop 生态集群中的数据导回到传统的关系型数据库 | 文件型数据库 | 企业数据仓库中。

那么 Sqoop 如何抽取数据呢

1. 首先 Sqoop 去 rdbms 抽取元数据。

2. 当拿到元数据之后将任务切成多个任务分给多个 map。

3. 然后再由每个 map 将自己的任务完成之后输出到文件。

 

Sqoop import Command:

先从最简单的任务开始

sqoop import\
  --connect jdbc:mysql://10.66.38.125:3306/user_db \
--username cloudera \
--password secretkey \
--table department \
--target-dir /sqoopdata/departments \      # HDFS 的目标存储位置
--where "department_id = 1000" \         # 指定条件,只有达成这个条件的才会被 import 进来
-- m 1

就这个语句就可以将我们关系型数据库中的某个表 import 进 HDFS 的某个位置。

 

同样我们可以 import 某些字段进来生成文件

sqoop import \
  --connect jdbc:mysql://localhost:3306/retry_db \
  --username cloudera \ 
  --password secret \ 
  --table departments \
  --columns "dept_id, name" \  # 指定需要的字段
  --as-avrodatafile        # 指定存成 avro 数据文件

 

如果我们要 import 一个库里面的所有表可以使用

sqoop import-all-tables \
  --connect jdbc:mysql://localhost:3306/retry_db \
  --username cloudera \
  --password secret \
  --warehouse-dir /mydata    # HDFS parent for table 这个会将所有这些表都放到 HDFS 这个文件夹下面

 

Sqoop import Command:

我们将数据从 Hadooop HDFS 导出向 RDBMS

sqoop export \
  --connect jdbc:mysql://localhost:3306/retry_db \
  --username cloudera \
  --password departments \
  --export-dir /sqoopdata/departments \    # HDFS source path for the export
  --table departments

 

Sqoop Job:

Sqoop 提供一种能力,可以把我们经常会执行的任务存储成 jobs. 这些 jobs 可以在未来任何一个时间点被我们拿来使用。

sqoop job \
  --create job_name \
  --import \
  --connect jdbc:mysql://localhost:3306/retry_db \
  --username cloudera \
  --password departments 

 

常用姿势上面就介绍完了,当我们需要将 MySQL 数据同步到 Hive 去的时候如果表还没有创建我们只需要执行:

sudo-u hive sqoop import \
--connect jdbc:mysql://10.66.38.15:3306/user \      # 连接需要被同步的 MySQL
--username xxx \
--password xxx \
--table user \                         # 需要被同步的表
--delete-target-dir \                     # 之前有同步的文件已经存在删除掉- m 1 \                             # 开一个 map 这个值得注意,不是每个 source 表都可以轻松被分为多个 map 的。如果你这里要填写非 1 的数最好去熟悉一些细节
--hive-import \                         
--hive-tableuser.user \
--create-hive-table \                     # 创建 hive 表
--hive-drop-import-delims                  # Drops \n, \r, and \01 from string fields when importing to Hive.

 

如果是表已经创建好而需要全量同步数据执行:

sudo -u hive sqoop import\
--connect jdbc:mysql://10.66.38.125:16033/user \
--username xxx \
--password xxx \
--table user \
--delete-target-dir \
--hive-overwrite \          # 全量重写数据
- m 1 \
--hive-import \
--hive-table user.user \
--hive-drop-import-delims

 

同样的 Sqoop 还支持 Hive 的增量同步。但是基于 mapreduce 的全量同步速度也快得超出想象。实测在三机集群上(12核 | 32内存)机器上1分钟基本能完成对 20 个字段左右的表 250w 条记录的抽取。并且对目标数据库机器的压力不算大。是非常理想的大数据同步工具。

 

Sqoop 的配置参数非常之多,在使用的时候建议先撸一遍文档(文档不长大概撸一次 2 3 个小时左右),找到自己需要注意的地方和更多适合自己的功能在使用的时候就能避免踩坑。比如上面使用的   hive-drop-import-delims 参数的使用就是还没看完文档就使用造成的坑。我们数据库中有字段没有过滤 \n 。有个用户的名字被误操作使用 \n 开头导致 hive 误以为遇到了换行符,该数据不仅错乱而且后面字段全部被置为 NULL。要避免这种问题一方面要对这个使用链上各个组件有所了解,更是应该读一读文档可以最大程度的避免踩坑。

----------------------------------------------------------分割线----------------------------------------------------------

下面将纪录一下我全量阅读 Sqoop 文档觉得需要纪录的一些东西。

首先我们上面看到命令 Sqoop Command 这个 Command 其实是指定 Sqoop 使用哪种 Tool 。

$ sqoop help
usage: sqoop COMMAND [ARGS]

Available commands:
  codegen            Generate code to interact with database records
  create-hive-table  Import a table definition into Hive
  eval               Evaluate a SQL statement and display the results
  export             Export an HDFS directory to a database table
  help               List available commands
  import             Import a table from a database to HDFS
  import-all-tables  Import tables from a database to HDFS
  import-mainframe   Import mainframe datasets to HDFS
  list-databases     List available databases on a server
  list-tables        List available tables in a database
  version            Display version information

See 'sqoop help COMMAND' for information on a specific command.

可以看到我上面举例的所有内容都只是简单的使用到了 export 和 import 还有 import-all-tables  工具。 还有非常多的工具没有使用到。

因为 sqoop 是依赖 hadoop 生态的关系,所以也有响应的查找链,因为使用了 CDH 大礼包,所以我只是简单的安装了一下,相关的依赖都已经被配置好了包括 path

lrwxrwxrwx 1 root root 23 Nov 13 20:55 /usr/bin/sqoop -> /etc/alternatives/sqoop

 

下面我们在使用 import tool 的时候遵循这个原则:

sqoop import (generic-args) (import-args)
sqoop-import (generic-args) (import-args)
While the Hadoop generic arguments must precede any import arguments, you can type the import arguments in any order with respect to one another.

当我们在写语句的时候应该首先使用了 generic-args 参数可以是以下的参数。

Argument Description
--connect Specify JDBC connect string
--connection-manager Specify connection manager class to use
--driver Manually specify JDBC driver class to use
--hadoop-mapred-home Override $HADOOP_MAPRED_HOME
--help Print usage instructions
--password-file Set path for a file containing the authentication password
-P Read password from console
--password Set authentication password
--username Set authentication username
--verbose Print more information while working
--connection-param-file Optional properties file that provides connection parameters
--relaxed-isolation Set connection transaction isolation to read uncommitted for the mappers.

后面的 import args 可选项就非常丰富。

比如可以导入校验使用的 class 删除控制参数啥的。

Argument Description
--validate Enable validation of data copied, supports single table copy only.
--validator Specify validator class to use.
--validation-threshold Specify validation threshold class to use.
--validation-failurehandler Specify validation failure handler class to use.

 

 option.
Argument Description
--append Append data to an existing dataset in HDFS
--as-avrodatafile Imports data to Avro Data Files
--as-sequencefile Imports data to SequenceFiles
--as-textfile Imports data as plain text (default)
--as-parquetfile Imports data to Parquet Files
--boundary-query Boundary query to use for creating splits
--columns Columns to import from table
--delete-target-dir Delete the import target directory if it exists
--direct Use direct connector if exists for the database
--fetch-size Number of entries to read from database at once.
--inline-lob-limit Set the maximum size for an inline LOB
-m,--num-mappers Use n map tasks to import in parallel
-e,--query Import the results of statement.
--split-by Column of the table used to split work units. Cannot be used with --autoreset-to-one-mapper option.
--split-limit Upper Limit for each split size. This only applies to Integer and Date columns. For date or timestamp fields it is calculated in seconds.
--autoreset-to-one-mapper Import should use one mapper if a table has no primary key and no split-by column is provided. Cannot be used with --split-by
--table Table to read
--target-dir HDFS destination dir
--temporary-rootdir HDFS directory for temporary files created during import (overrides default "_sqoop")
--warehouse-dir HDFS parent for table destination
--where WHERE clause to use during import
-z,--compress Enable compression
--compression-codec Use Hadoop codec (default gzip)
--null-string The string to be written for a null value for string columns
--null-non-string The string to be written for a null value for non-string columns

 

包括支持 free-form query .使用 --query 参数然后写一个 sql 来过滤自己想要 import 的数据 just like 

$ sqoop import \
  --query 'SELECT a.*, b.* FROM a JOIN b on (a.id == b.id) WHERE $CONDITIONS' \
  --split-by a.id --target-dir /user/foo/joinresults

这个使用方法必须要使用 --target-dir 

 

如果需要控制并行操作普遍使用的是 -m 参数,--num-mapers参数。我们可以显示的指定使用的用来并行分配的键,使用例如 --split-by employee_id 达到目标。

如果说我们没有使用 --split-by 参数主键也不是 int 型,可能会导致指定 -m 大于 1 的时候出问题。因为程序没有办法知道应该根据哪个键来分配 map 任务。

但是我们可以使用 --autoreset-to-one-mapper 选项 --autoreset-to-one-mapper is typically used with the import-all-tables tool to automatically handle tables without a primary key in a schema.

 

使用 Oozie 调起 Sqoop job 执行任务的时候要注意一个 Controlling Distributed Cache 的问题。在第一个Sqoop作业期间,Oozie只会在每个工作节点上对Sqoop依赖项进行一次本地化,并会在工作节点上重用jar来执行子节点作业。在Oozie启动Sqoop命令时使用option - skip-dist-cache,可以跳过Sqoop将依赖项复制到作业缓存并保存大量I/O的步骤。达到优化的目的。

 

在控制导入的过程中也有很多优化的地方可以做,例如我们在对关系行数据库 MySQL 进行导入的时候,可以通过使用关键字 --direct 加速导入的速度。他的原理是默认情况下我们会使用 JDBC 对数据库进行连接,但是有一些数据库提供了更高性能可以指定数据库进行转移的工具。比如 MySQL 提供的 MySQL 提供的工具 mysqldump 使用 --direct 参数就可以尝试让 Sqoop 使用这种方式去导出数据,可能会得到更高的效能。

 

Reference:

https://archive.cloudera.com/cdh6/6.0.1/docs/sqoop-1.4.7-cdh6.0.1/SqoopUserGuide.html  Sqoop User Guide (v1.4.7-cdh6.0.1)

https://blog.csdn.net/Gavin_chun/article/details/78314065  SQOOP从MySQL导入数据到Hive

https://segmentfault.com/a/1190000002532293  sqoop  导入关系数据库到 hive

https://blog.csdn.net/myrainblues/article/details/43673129  sqoop使用中文手册

https://blog.csdn.net/lyp5257918/article/details/53820690  sqoop抽取文本数据到hive由于存在空字符导致字段错位和丢失错误

https://www.youtube.com/watch?v=72M5lMP8dMg  COSO IT Sqoop Tutorial 

 


推荐阅读
  • 本文介绍了在sqoop1.4.*版本中,如何实现自定义分隔符的方法及步骤。通过修改sqoop生成的java文件,并重新编译,可以满足实际开发中对分隔符的需求。具体步骤包括修改java文件中的一行代码,重新编译所需的hadoop包等。详细步骤和编译方法在本文中都有详细说明。 ... [详细]
  • 一、Hadoop来历Hadoop的思想来源于Google在做搜索引擎的时候出现一个很大的问题就是这么多网页我如何才能以最快的速度来搜索到,由于这个问题Google发明 ... [详细]
  • Spring源码解密之默认标签的解析方式分析
    本文分析了Spring源码解密中默认标签的解析方式。通过对命名空间的判断,区分默认命名空间和自定义命名空间,并采用不同的解析方式。其中,bean标签的解析最为复杂和重要。 ... [详细]
  • 开发笔记:加密&json&StringIO模块&BytesIO模块
    篇首语:本文由编程笔记#小编为大家整理,主要介绍了加密&json&StringIO模块&BytesIO模块相关的知识,希望对你有一定的参考价值。一、加密加密 ... [详细]
  • 本文讨论了一个关于cuowu类的问题,作者在使用cuowu类时遇到了错误提示和使用AdjustmentListener的问题。文章提供了16个解决方案,并给出了两个可能导致错误的原因。 ... [详细]
  • 1,关于死锁的理解死锁,我们可以简单的理解为是两个线程同时使用同一资源,两个线程又得不到相应的资源而造成永无相互等待的情况。 2,模拟死锁背景介绍:我们创建一个朋友 ... [详细]
  • sklearn数据集库中的常用数据集类型介绍
    本文介绍了sklearn数据集库中常用的数据集类型,包括玩具数据集和样本生成器。其中详细介绍了波士顿房价数据集,包含了波士顿506处房屋的13种不同特征以及房屋价格,适用于回归任务。 ... [详细]
  • 个人学习使用:谨慎参考1Client类importcom.thoughtworks.gauge.Step;importcom.thoughtworks.gauge.T ... [详细]
  • 树莓派语音控制的配置方法和步骤
    本文介绍了在树莓派上实现语音控制的配置方法和步骤。首先感谢博主Eoman的帮助,文章参考了他的内容。树莓派的配置需要通过sudo raspi-config进行,然后使用Eoman的控制方法,即安装wiringPi库并编写控制引脚的脚本。具体的安装步骤和脚本编写方法在文章中详细介绍。 ... [详细]
  • 大数据Hadoop生态(20)MapReduce框架原理OutputFormat的开发笔记
    本文介绍了大数据Hadoop生态(20)MapReduce框架原理OutputFormat的开发笔记,包括outputFormat接口实现类、自定义outputFormat步骤和案例。案例中将包含nty的日志输出到nty.log文件,其他日志输出到other.log文件。同时提供了一些相关网址供参考。 ... [详细]
  • importorg.apache.hadoop.hdfs.DistributedFileSystem;导入方法依赖的package包类privatevoidtestHSyncOpe ... [详细]
  • mapreduce源码分析总结
    这篇文章总结的非常到位,故而转之一MapReduce概述MapReduce是一个用于大规模数据处理的分布式计算模型,它最初是由Google工程师设计并实现的ÿ ... [详细]
  • Hadoop 源码学习笔记(4)Hdfs 数据读写流程分析
    Hdfs的数据模型在对读写流程进行分析之前,我们需要先对Hdfs的数据模型有一个简单的认知。数据模型如上图所示,在NameNode中有一个唯一的FSDirectory类负责维护文件 ... [详细]
  • docker安装到基本使用
    记录docker概念,安装及入门日常使用Docker安装查看官方文档,在"Debian上安装Docker",其他平台在"这里查 ... [详细]
  • Flink使用java实现读取csv文件简单实例首先我们来看官方文档中给出的几种方法:首先我们来看官方文档中给出的几种方法:第一种:Da ... [详细]
author-avatar
自由就是幸2602880665
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有