(下面说的操作hdfs其实和操作hive意思一样,都是文件夹)
最近要在项目中加一个sqoop的功能,需求是将hive的数据导入至mysql,也就是export功能
由于之前没用过sqoop,所以特地去学习怎么使用,这里总结下这两天了解到的简单内容
首先sqoop有两个版本,1.4.X和1.99.X,前者俗称为sqoop1后者成为sqoop2,然后又有apache和cloudera两种
sqoop1和sqoop2使用方法和命令有较大不同,我先说sqoop1在java中的使用
首先我依赖使用的是这个(我们用的gradle来管理依赖)
compile(group: 'org.apache.sqoop', name: 'sqoop', version: '1.4.6-cdh5.5.2')
因为这个包里面和我之前的有冲突,所以我还排除了下面两个包(当时报错信息是找不到方法,然后一搜那个类项目中有两个)
compile(group: 'org.apache.sqoop', name: 'sqoop', version: '1.4.6-cdh5.5.2') { exclude group: "org.slf4j", module: "slf4j-log4j12" exclude group: "org.apache.hadoop", module: "hadoop-core" }
sqoop1在java中执行的时候,会使用本地模式去运行mapreduce任务,他会在本地tmp目录下给你生成java文件(我的电脑是在tmp下生成了很多java文件),和你要操作的hdfs集群上安没安sqoop一点关系都没有
因为和hdfs还有hadoop有关,所以你可能还需要这些依赖
hadoop-core 2.6.0-mr1-cdh5.5.2 hadoop-common 2.6.0-cdh5.5.2 hadoop-mapreduce-client-core 2.6.0-cdh5.5.2
我的项目中之前就已经有hadoop云云的依赖了,所以我直接加了sqoop的就可以用了,如果你们要单独测试,需要先在java中把这些都加上,不然运行的时候会有classNotFound错误,注意版本一致性阿
然后就可以写测试了,直接贴代码(hdfs我使用的是我自己本台机器的集群,所以地址是我自己电脑的,也没有认证)
Configuration cOnf= new Configuration(); conf.set("fs.default.name", "hdfs://192.168.2.14:9000/");//设置HDFS服务地址 String[] arg = new String[] {"--connect","jdbc:mysql://114.115.156.37:3306/test", "--username","xxx", "--password","xxx", "--table","persons", "--m","1", "--export-dir","hdfs://10.30.88.46:8020/user/hive/warehouse/dw_api_server.db/persons", "--input-fields-terminated-by","\t" }; String[] expandArguments = OptionsFileUtil.expandArguments(arg); SqoopTool tool = SqoopTool.getTool("export"); Configuration loadPlugins = SqoopTool.loadPlugins(conf); Sqoop sqoop = new Sqoop((com.cloudera.sqoop.tool.SqoopTool) tool, loadPlugins); int res = Sqoop.runSqoop(sqoop, expandArguments); if (res == 0) { System.out.println ("成功"); }else { System.out.println("失败"); }
192.168.2.14就是我本机器的地址,建议不要写localhost,因为我写这个运行不了,尽量写ip地址,也不要写主机名,如果hdfs要过kerberos会有额外的操作以及错误处理,我一会说
下面的getTool,因为我是hdfs导出mysql,所以写的export,如果是反过来的,是import,这个网上命令例子也很多,我就不写了,这里只写导出
这段代码就是将hdfs上的数据导入到mysql中去的,命令必须使用String数组形式操作,中间不用加空格,属性和值记得分开写,从上之下,mysql连接地址,用户名密码,操作的table,-m指用几个机器来执行mr(这个有点记不起来了,可能意思不太对,求纠正),export-dir简单说就是hdfs上的目录,下面那个是指按什么分隔符来插入
persons文件夹下放的是txt文件,或者csv文件,内容用\t分割的,说白了就是文本文件,这样语句才能执行成功,不然会报错
要保证访问hdfs的用户有权限操作,mysql用户也需要有读写权限
hdfs数据插入到mysql中必须类型能够匹配或转换,比如hdfs是字符串的"aaa",mysql里面是int,就会报错
hdfs的文本文件列可以和myslq不一致,他会默认依次丛左到右将hdfs的数据取出来,再从左到右的插入到mysql中去
上面的String数组里还可以加一个属性叫--columns可以指定要插入那列,这里指定的列是以mysql为基准的,但取数据依旧是从左到右的从hdfs中拿,所以可能数据会错位,所以建议结构两边一致,然后要取的时候全取,你们操作的时候可以看下,仅仅针对txt这类的文本文件
hive操作同理,textfile类型表的你就当在操作文本文件就好了
sqoop1是可以解析parquet文件的,但在我试的过程中,有几个前提条件
首先需要多加一个依赖
compile group: 'org.kitesdk', name: 'kite-data-mapreduce', version: '1.1.0'
我怎么知道要加这个,因为在操作parquet文件的时候他给我报classNotFound,就是这个包
解析parquet文件或者hive的parquet表时候,上面的命令只需要将
"--input-fields-terminated-by","\t"
删除即可
解析parquet表,必须在数据的目录下有.metadata文件,不然无法解析,hive的parquet表同理,就是数据文件和.metadata处于一个目录下
它会根据metadata里面的规则去解析parquet文件,metadata里面定义了数据的
目前我之测试了XXX.parquet文件可以解析,其他的没有测试,例如avro,有一种情况不能解析,就是在hive下,表的类型是parquet,但是表里的数据是从别的表insert过来的,那么在hdfs上的文件就变成了0000_0这种样子,这样的解析不了,我在尝试的时候报错,即使有metadata文件,不过parquet表可以执行要导出那几列,不想txt的会错位
我遇到的了下面几种错误
Path is not a file: /user/hive/warehouse/test.db/persons
提示找不到meatadata文件,如果你导出的是parquet表,目录下没有metadata会报这个错,同样没有数据文件会报找不到数据文件
Can not read value at 1 in block 0 in file
hive表是parquet表,而且也有metadata文件,会有这个错误,指不能解析文件,我这个文件下是0000_0这种样子的
classNotFound:java.sun.tools.xxxxxxxx
我第一次运行报了这个错误,需要将你jdk文件中的一个Tools.jar的jar包放到你的项目中去
numberformatexception
出现转换错误都是hive表或者文件内容和mysql结构不一致,无法自动转换的问题,改结构去,解析parquet文件的时候会出现找不到字段错误,可能是因为大小写的原因
Can't get Master Kerberos principal for use as renewer
hdfs需要过kerberos的时候,命名使用keytab文件已经登陆成功了,可是一直出这个问题,参考方法是将hadoop集群上的yarn-site.xml文件放到你项目的resource中去,然后在代码中加入
conf.addResource("yarn-site.xml"); // conf是你的Configuration对象
就没事了,该问题我是参考这里的,我把连接贴出来,感谢一下
http://www.kevin517.win/2017/11/21/%E8%BF%9E%E6%8E%A5%E5%B8%A6%E6%9C%89%20Kerberos%20%E8%AE%A4%E8%AF%81%E7%9A%84%20Hadoop%20%20HBase%20Spark/
其他错误
如果出现其他不知名的错误,请考虑版本是否一致,jar冲突这些问题
--hive和--hcatalog命令
我在找资料的时候有看到这两个命令,直接是hive-》mysql,但是经过试验,--hive显示无此命令,hcatalog命令无效,提示找不到表,也不知道是版本不对还是啥,放弃了暂时
上面的问题我解决了以后,发现打包的时候缺了一个包,具体解决如下
https://blog.csdn.net/u011856283/article/details/80690031
然后是打包以后运行不料sqoop1的程序,显示找不到jar,无法识别的符号,特别费解,最后找到了答案,参考网址如下
https://www.cnblogs.com/claren/p/7240735.html
对于这个问题,也可以在上面代码中使用如下方式解决(未测试)
SqoopOptions optiOns= sqoop.getOptions(); options.setHadoopMapRedHome("/xxx");
===================================================================================================
sqoop2是直接使用程序连接到集群上的sqoop,远程操作,流程是需要先创建link也可以理解程要操作的对象,比如一个link是hdfs,一个link是mysql,有了link后需要创建job,创建job需要指定那两个link进行交互,设置from和to的关系,然后执行job就可以了(我觉得sqoop2更方便)
首先依赖和上面sqoop1的不一样,我们用的是一个sqoop-client的jar,我这里用的是apache的版本
org.apache.sqoop sqoop-client 1.99.7
不需要其他依赖
注意:这个jar的版本请务必和你集群上安装的sqoop版本一致,不然会出莫名的错误
这里贴一下官方的demo地址,官方写的很全面,可以直接参考,我下面的程序也源自这里改编
http://sqoop.apache.org/docs/1.99.6/ClientAPI.html
/** * 创建连接 * @throws Exception */ public static void ImportTest() throws Exception{ String url = "http://localhost:12000/sqoop/"; SqoopClient client = new SqoopClient(url); // ============================================== // create a placeholder for link long cOnnectorId= 1; MLink link = client.createLink("hdfs-connector"); // ============================================================================== link.setName("HDFS"); link.setCreationUser("hadoop"); MLinkConfig linkCOnfig= link.getConnectorLinkConfig(); linkConfig.getStringInput("linkConfig.uri").setValue("hdfs://127.0.0.1:9000"); // ============================================================================== Status status = client.saveLink(link); if(status.canProceed()) { System.out.println("Created Link with Link Id : " + link.getPersistenceId()); } else { System.out.println("Something went wrong creating the link"); } }
这是创造了一个hdfs的link,上面localhost:12000/sqoop指的是你集群上的sqoop
这里有个小问题,官方的demo或者是别的地方找到的资料
MLink link = client.createLink("hdfs-connector");
这句话的createLink中写的都是数字,我这里写的是字符串,这是因为版本问题,最新的版本是不能写数字的,改成了字符串,这里的hdfs-connector指的是创建hdfs的link,瞎写会报错
下面贴一个mysql(通用jdbc)的link
/** * 创建连接 * @throws Exception */ public static void ImportTest() throws Exception{ String url = "http://localhost:12000/sqoop/"; SqoopClient client = new SqoopClient(url); // ============================================== // create a placeholder for link long cOnnectorId= 1; MLink link = client.createLink("generic-jdbc-connector"); // ============================================================================== link.setName("mysql_link"); link.setCreationUser("hahaha"); MLinkConfig linkCOnfig= link.getConnectorLinkConfig(); linkConfig.getStringInput("linkConfig.connectionString").setValue("jdbc:mysql://localhost/test"); linkConfig.getStringInput("linkConfig.jdbcDriver").setValue("com.mysql.jdbc.Driver"); linkConfig.getStringInput("linkConfig.username").setValue("hadoop"); linkConfig.getStringInput("linkConfig.password").setValue("hadoop"); // ============================================================================== Status status = client.saveLink(link); if(status.canProceed()) { System.out.println("Created Link with Link Id : " + link.getPersistenceId()); } else { System.out.println("Something went wrong creating the link"); } }这里换成了
generic-jdbc-connector
意思 是通用的jdbc,也就是说不止mysql,其他支持jdbc的也可以,虽然我没试,前提需要你在集群上的sqoop的lib下加入驱动文件,不然不能用。
然后就是创建一个任务了
/** * 创建任务 */ public static void saveJob() { String url = "http://localhost:12000/sqoop/"; SqoopClient client = new SqoopClient(url); //Creating dummy job object MJob job = client.createJob("mysql", "HDFS"); job.setName("myJobs"); job.setCreationUser("myJobsUser"); // set the "FROM" link job config values MFromConfig fromJobCOnfig= job.getFromJobConfig(); fromJobConfig.getStringInput("fromJobConfig.schemaName").setValue("test"); fromJobConfig.getStringInput("fromJobConfig.tableName").setValue("Persons"); fromJobConfig.getStringInput("fromJobConfig.partitionColumn").setValue("Id_P"); // set the "TO" link job config values MToConfig toJobCOnfig= job.getToJobConfig(); toJobConfig.getStringInput("toJobConfig.outputDirectory").setValue("/sqoop"); // set the driver config values MDriverConfig driverCOnfig= job.getDriverConfig(); //driverConfig.getStringInput("throttlingConfig.numExtractors").setValue("3"); Status status = client.saveJob(job); if(status.canProceed()) { System.out.println("Created Job with Job Id: "+ job.getPersistenceId()); } else { System.out.println("Something went wrong creating the job"); } }
这里设置from和to的关系
MJob job = client.createJob("mysql", "HDFS");
里面的值就是你刚刚创建link填写的name属性,这里指mysql到hdfs
运行任务(这段代码是我找到的,基本没有改)
/** * 启动job */ public static void startJob() { String url = "http://localhost:12000/sqoop/"; SqoopClient client = new SqoopClient(url); MJob job = client.getJob("myJobs"); //启动任务 long jobId = job.getPersistenceId(); MSubmission submission = client.startJob("myJobs"); System.out.println("JOB提交状态为 : " + submission.getStatus()); while(submission.getStatus().isRunning() && submission.getProgress() != -1) { System.out.println("进度 : " + String.format("%.2f %%", submission.getProgress() * 100)); //三秒报告一次进度 try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("JOB执行结束... ..."); System.out.println("Hadoop任务ID为 :" + submission.getExternalJobId()); Counters counters = submission.getCounters(); if(counters != null) { System.out.println("计数器:"); for(CounterGroup group : counters) { System.out.print("\t"); System.out.println(group.getName()); for(Counter counter : group) { System.out.print("\t\t"); System.out.print(counter.getName()); System.out.print(": "); System.out.println(counter.getValue()); } } } if(submission.getError() != null) { System.out.println("JOB执行异常,异常信息为 : " +submission.getError()); } System.out.println("MySQL通过sqoop传输数据到HDFS统计执行完毕"); }具体参数和安装过程大家可以去自行了解,java操作大概就是这些了,有错请指正