热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

sqoop的java操作,总结归纳,含代码

(下面说的操作hdfs其实和操作hive意思一样,都是文件夹)最近要在项目中加一个sqoop的功能,需求是将hive的数据导入至mysql,也就是export功能由于之前没用过sqo

(下面说的操作hdfs其实和操作hive意思一样,都是文件夹)

最近要在项目中加一个sqoop的功能,需求是将hive的数据导入至mysql,也就是export功能

由于之前没用过sqoop,所以特地去学习怎么使用,这里总结下这两天了解到的简单内容

首先sqoop有两个版本,1.4.X和1.99.X,前者俗称为sqoop1后者成为sqoop2,然后又有apache和cloudera两种

sqoop1和sqoop2使用方法和命令有较大不同,我先说sqoop1在java中的使用

首先我依赖使用的是这个(我们用的gradle来管理依赖)

compile(group: 'org.apache.sqoop', name: 'sqoop', version: '1.4.6-cdh5.5.2')

因为这个包里面和我之前的有冲突,所以我还排除了下面两个包(当时报错信息是找不到方法,然后一搜那个类项目中有两个)

compile(group: 'org.apache.sqoop', name: 'sqoop', version: '1.4.6-cdh5.5.2') {
        exclude group: "org.slf4j", module: "slf4j-log4j12"
        exclude group: "org.apache.hadoop", module: "hadoop-core"
    }

sqoop1在java中执行的时候,会使用本地模式去运行mapreduce任务,他会在本地tmp目录下给你生成java文件(我的电脑是在tmp下生成了很多java文件),和你要操作的hdfs集群上安没安sqoop一点关系都没有

因为和hdfs还有hadoop有关,所以你可能还需要这些依赖

hadoop-core 2.6.0-mr1-cdh5.5.2

hadoop-common 2.6.0-cdh5.5.2

hadoop-mapreduce-client-core 2.6.0-cdh5.5.2

我的项目中之前就已经有hadoop云云的依赖了,所以我直接加了sqoop的就可以用了,如果你们要单独测试,需要先在java中把这些都加上,不然运行的时候会有classNotFound错误,注意版本一致性阿

然后就可以写测试了,直接贴代码(hdfs我使用的是我自己本台机器的集群,所以地址是我自己电脑的,也没有认证)

Configuration cOnf= new Configuration();
conf.set("fs.default.name", "hdfs://192.168.2.14:9000/");//设置HDFS服务地址
String[] arg = new String[] {"--connect","jdbc:mysql://114.115.156.37:3306/test",
                "--username","xxx",
                "--password","xxx",
                "--table","persons",
                "--m","1",
                "--export-dir","hdfs://10.30.88.46:8020/user/hive/warehouse/dw_api_server.db/persons",
                "--input-fields-terminated-by","\t"
        		};
String[] expandArguments = OptionsFileUtil.expandArguments(arg);
        SqoopTool tool = SqoopTool.getTool("export");
        Configuration loadPlugins = SqoopTool.loadPlugins(conf);
        Sqoop sqoop = new Sqoop((com.cloudera.sqoop.tool.SqoopTool) tool, loadPlugins);

        int res = Sqoop.runSqoop(sqoop, expandArguments);
        if (res == 0) {
            System.out.println ("成功");
        }else {
        	System.out.println("失败");
        }

192.168.2.14就是我本机器的地址,建议不要写localhost,因为我写这个运行不了,尽量写ip地址,也不要写主机名,如果hdfs要过kerberos会有额外的操作以及错误处理,我一会说

下面的getTool,因为我是hdfs导出mysql,所以写的export,如果是反过来的,是import,这个网上命令例子也很多,我就不写了,这里只写导出

这段代码就是将hdfs上的数据导入到mysql中去的,命令必须使用String数组形式操作,中间不用加空格,属性和值记得分开写,从上之下,mysql连接地址,用户名密码,操作的table,-m指用几个机器来执行mr(这个有点记不起来了,可能意思不太对,求纠正),export-dir简单说就是hdfs上的目录,下面那个是指按什么分隔符来插入

几点问题

persons文件夹下放的是txt文件,或者csv文件,内容用\t分割的,说白了就是文本文件,这样语句才能执行成功,不然会报错

文本文件的字段列数和mysql一致

要保证访问hdfs的用户有权限操作,mysql用户也需要有读写权限

hdfs数据插入到mysql中必须类型能够匹配或转换,比如hdfs是字符串的"aaa",mysql里面是int,就会报错

缺点

hdfs的文本文件列可以和myslq不一致,他会默认依次丛左到右将hdfs的数据取出来,再从左到右的插入到mysql中去

上面的String数组里还可以加一个属性叫--columns可以指定要插入那列,这里指定的列是以mysql为基准的,但取数据依旧是从左到右的从hdfs中拿,所以可能数据会错位,所以建议结构两边一致,然后要取的时候全取,你们操作的时候可以看下,仅仅针对txt这类的文本文件

hive操作同理,textfile类型表的你就当在操作文本文件就好了


关于parquet文件,或者说hive的parquet表

sqoop1是可以解析parquet文件的,但在我试的过程中,有几个前提条件

首先需要多加一个依赖

compile group: 'org.kitesdk', name: 'kite-data-mapreduce', version: '1.1.0'

我怎么知道要加这个,因为在操作parquet文件的时候他给我报classNotFound,就是这个包

解析parquet文件或者hive的parquet表时候,上面的命令只需要将

"--input-fields-terminated-by","\t"

删除即可

解析parquet表,必须在数据的目录下有.metadata文件,不然无法解析,hive的parquet表同理,就是数据文件和.metadata处于一个目录下

它会根据metadata里面的规则去解析parquet文件,metadata里面定义了数据的

目前我之测试了XXX.parquet文件可以解析,其他的没有测试,例如avro,有一种情况不能解析,就是在hive下,表的类型是parquet,但是表里的数据是从别的表insert过来的,那么在hdfs上的文件就变成了0000_0这种样子,这样的解析不了,我在尝试的时候报错,即使有metadata文件,不过parquet表可以执行要导出那几列,不想txt的会错位

报错

我遇到的了下面几种错误

Path is not a file: /user/hive/warehouse/test.db/persons

提示找不到meatadata文件,如果你导出的是parquet表,目录下没有metadata会报这个错,同样没有数据文件会报找不到数据文件

Can not read value at 1 in block 0 in file

hive表是parquet表,而且也有metadata文件,会有这个错误,指不能解析文件,我这个文件下是0000_0这种样子的

classNotFound:java.sun.tools.xxxxxxxx

我第一次运行报了这个错误,需要将你jdk文件中的一个Tools.jar的jar包放到你的项目中去

numberformatexception

出现转换错误都是hive表或者文件内容和mysql结构不一致,无法自动转换的问题,改结构去,解析parquet文件的时候会出现找不到字段错误,可能是因为大小写的原因

Can't get Master Kerberos principal for use as renewer

hdfs需要过kerberos的时候,命名使用keytab文件已经登陆成功了,可是一直出这个问题,参考方法是将hadoop集群上的yarn-site.xml文件放到你项目的resource中去,然后在代码中加入

conf.addResource("yarn-site.xml"); // conf是你的Configuration对象

就没事了,该问题我是参考这里的,我把连接贴出来,感谢一下

http://www.kevin517.win/2017/11/21/%E8%BF%9E%E6%8E%A5%E5%B8%A6%E6%9C%89%20Kerberos%20%E8%AE%A4%E8%AF%81%E7%9A%84%20Hadoop%20%20HBase%20Spark/

其他错误

如果出现其他不知名的错误,请考虑版本是否一致,jar冲突这些问题


--hive和--hcatalog命令

我在找资料的时候有看到这两个命令,直接是hive-》mysql,但是经过试验,--hive显示无此命令,hcatalog命令无效,提示找不到表,也不知道是版本不对还是啥,放弃了暂时


关于打包项目出现的问题

上面的问题我解决了以后,发现打包的时候缺了一个包,具体解决如下

logredactor1.0.3

https://blog.csdn.net/u011856283/article/details/80690031

然后是打包以后运行不料sqoop1的程序,显示找不到jar,无法识别的符号,特别费解,最后找到了答案,参考网址如下

https://www.cnblogs.com/claren/p/7240735.html

对于这个问题,也可以在上面代码中使用如下方式解决(未测试)

SqoopOptions optiOns= sqoop.getOptions();
        options.setHadoopMapRedHome("/xxx");

===================================================================================================

sqoop2

sqoop2是直接使用程序连接到集群上的sqoop,远程操作,流程是需要先创建link也可以理解程要操作的对象,比如一个link是hdfs,一个link是mysql,有了link后需要创建job,创建job需要指定那两个link进行交互,设置from和to的关系,然后执行job就可以了(我觉得sqoop2更方便)

首先依赖和上面sqoop1的不一样,我们用的是一个sqoop-client的jar,我这里用的是apache的版本


    org.apache.sqoop
    sqoop-client
    1.99.7

不需要其他依赖

注意:这个jar的版本请务必和你集群上安装的sqoop版本一致,不然会出莫名的错误

这里贴一下官方的demo地址,官方写的很全面,可以直接参考,我下面的程序也源自这里改编

http://sqoop.apache.org/docs/1.99.6/ClientAPI.html

/**
	 * 创建连接
	 * @throws Exception
	 */
	public static void ImportTest() throws Exception{
		String url = "http://localhost:12000/sqoop/";
		SqoopClient client = new SqoopClient(url);
		// ==============================================
		// create a placeholder for link		
		long cOnnectorId= 1;
		MLink link = client.createLink("hdfs-connector"); 
		// ==============================================================================
		link.setName("HDFS");
		link.setCreationUser("hadoop");
		MLinkConfig linkCOnfig= link.getConnectorLinkConfig();
		linkConfig.getStringInput("linkConfig.uri").setValue("hdfs://127.0.0.1:9000");
		// ==============================================================================
		Status status = client.saveLink(link);
		if(status.canProceed()) {
		 System.out.println("Created Link with Link Id : " + link.getPersistenceId());
		} else {
		 System.out.println("Something went wrong creating the link");
		}
	}

这是创造了一个hdfs的link,上面localhost:12000/sqoop指的是你集群上的sqoop

这里有个小问题,官方的demo或者是别的地方找到的资料

MLink link = client.createLink("hdfs-connector"); 

这句话的createLink中写的都是数字,我这里写的是字符串,这是因为版本问题,最新的版本是不能写数字的,改成了字符串,这里的hdfs-connector指的是创建hdfs的link,瞎写会报错

下面贴一个mysql(通用jdbc)的link

	/**
	 * 创建连接
	 * @throws Exception
	 */
	public static void ImportTest() throws Exception{
		String url = "http://localhost:12000/sqoop/";
		SqoopClient client = new SqoopClient(url);
		// ==============================================
		// create a placeholder for link		
		long cOnnectorId= 1;
		MLink link = client.createLink("generic-jdbc-connector");
		// ==============================================================================
		link.setName("mysql_link");
		link.setCreationUser("hahaha");
		MLinkConfig linkCOnfig= link.getConnectorLinkConfig();
		linkConfig.getStringInput("linkConfig.connectionString").setValue("jdbc:mysql://localhost/test");
		linkConfig.getStringInput("linkConfig.jdbcDriver").setValue("com.mysql.jdbc.Driver");
		linkConfig.getStringInput("linkConfig.username").setValue("hadoop");
		linkConfig.getStringInput("linkConfig.password").setValue("hadoop");
		// ==============================================================================
		Status status = client.saveLink(link);
		if(status.canProceed()) {
		 System.out.println("Created Link with Link Id : " + link.getPersistenceId());
		} else {
		 System.out.println("Something went wrong creating the link");
		}
	}
这里换成了
generic-jdbc-connector

意思 是通用的jdbc,也就是说不止mysql,其他支持jdbc的也可以,虽然我没试,前提需要你在集群上的sqoop的lib下加入驱动文件,不然不能用。

然后就是创建一个任务了

/**
	 * 创建任务
	 */
	public static void saveJob() {
		String url = "http://localhost:12000/sqoop/";
		SqoopClient client = new SqoopClient(url);
		//Creating dummy job object
		MJob job = client.createJob("mysql", "HDFS");
		job.setName("myJobs");
		job.setCreationUser("myJobsUser");
		// set the "FROM" link job config values
		MFromConfig fromJobCOnfig= job.getFromJobConfig();
		fromJobConfig.getStringInput("fromJobConfig.schemaName").setValue("test");
		fromJobConfig.getStringInput("fromJobConfig.tableName").setValue("Persons");
		fromJobConfig.getStringInput("fromJobConfig.partitionColumn").setValue("Id_P");
		// set the "TO" link job config values
		MToConfig toJobCOnfig= job.getToJobConfig();
		toJobConfig.getStringInput("toJobConfig.outputDirectory").setValue("/sqoop");
		// set the driver config values
		MDriverConfig driverCOnfig= job.getDriverConfig();
		//driverConfig.getStringInput("throttlingConfig.numExtractors").setValue("3");

		Status status = client.saveJob(job);
		if(status.canProceed()) {
		 System.out.println("Created Job with Job Id: "+ job.getPersistenceId());
		} else {
		 System.out.println("Something went wrong creating the job");
		}
	}

这里设置from和to的关系

MJob job = client.createJob("mysql", "HDFS");

里面的值就是你刚刚创建link填写的name属性,这里指mysql到hdfs

运行任务(这段代码是我找到的,基本没有改)

/**
	 * 启动job
	 */
	public static void startJob() {
		String url = "http://localhost:12000/sqoop/";
		SqoopClient client = new SqoopClient(url);
		MJob job = client.getJob("myJobs");
		//启动任务
        long jobId = job.getPersistenceId();
        MSubmission submission = client.startJob("myJobs");
        System.out.println("JOB提交状态为 : " + submission.getStatus());
        while(submission.getStatus().isRunning() && submission.getProgress() != -1) {
          System.out.println("进度 : " + String.format("%.2f %%", submission.getProgress() * 100));
          //三秒报告一次进度
          try {
            Thread.sleep(3000);
          } catch (InterruptedException e) {
            e.printStackTrace();
          }
        }
        System.out.println("JOB执行结束... ...");
        System.out.println("Hadoop任务ID为 :" + submission.getExternalJobId());
        Counters counters = submission.getCounters();
        if(counters != null) {
          System.out.println("计数器:");
          for(CounterGroup group : counters) {
            System.out.print("\t");
            System.out.println(group.getName());
            for(Counter counter : group) {
              System.out.print("\t\t");
              System.out.print(counter.getName());
              System.out.print(": ");
              System.out.println(counter.getValue());
            }
          }
        }
        if(submission.getError() != null) {
          System.out.println("JOB执行异常,异常信息为 : " +submission.getError());
        }
        System.out.println("MySQL通过sqoop传输数据到HDFS统计执行完毕");
	}
具体参数和安装过程大家可以去自行了解,java操作大概就是这些了,有错请指正

推荐阅读
  • Presto:高效即席查询引擎的深度解析与应用
    本文深入解析了Presto这一高效的即席查询引擎,详细探讨了其架构设计及其优缺点。Presto通过内存到内存的数据处理方式,显著提升了查询性能,相比传统的MapReduce查询,不仅减少了数据传输的延迟,还提高了查询的准确性和效率。然而,Presto在大规模数据处理和容错机制方面仍存在一定的局限性。本文还介绍了Presto在实际应用中的多种场景,展示了其在大数据分析领域的强大潜力。 ... [详细]
  • 本文介绍了如何使用Flume从Linux文件系统收集日志并存储到HDFS,然后通过MapReduce清洗数据,使用Hive进行数据分析,并最终通过Sqoop将结果导出到MySQL数据库。 ... [详细]
  • 本文探讨了 Kafka 集群的高效部署与优化策略。首先介绍了 Kafka 的下载与安装步骤,包括从官方网站获取最新版本的压缩包并进行解压。随后详细讨论了集群配置的最佳实践,涵盖节点选择、网络优化和性能调优等方面,旨在提升系统的稳定性和处理能力。此外,还提供了常见的故障排查方法和监控方案,帮助运维人员更好地管理和维护 Kafka 集群。 ... [详细]
  • Hadoop 2.6 主要由 HDFS 和 YARN 两大部分组成,其中 YARN 包含了运行在 ResourceManager 的 JVM 中的组件以及在 NodeManager 中运行的部分。本文深入探讨了 Hadoop 2.6 日志文件的解析方法,并详细介绍了 MapReduce 日志管理的最佳实践,旨在帮助用户更好地理解和优化日志处理流程,提高系统运维效率。 ... [详细]
  • HBase在金融大数据迁移中的应用与挑战
    随着最后一台设备的下线,标志着超过10PB的HBase数据迁移项目顺利完成。目前,新的集群已在新机房稳定运行超过两个月,监控数据显示,新集群的查询响应时间显著降低,系统稳定性大幅提升。此外,数据消费的波动也变得更加平滑,整体性能得到了显著优化。 ... [详细]
  • Nacos 0.3 数据持久化详解与实践
    本文详细介绍了如何将 Nacos 0.3 的数据持久化到 MySQL 数据库,并提供了具体的步骤和注意事项。 ... [详细]
  • PHP 5.5.31 和 PHP 5.6.17 安全更新发布
    PHP 5.5.31 和 PHP 5.6.17 已正式发布,主要包含多个安全修复。强烈建议所有用户尽快升级至最新版本以确保系统安全。 ... [详细]
  • Hadoop的文件操作位于包org.apache.hadoop.fs里面,能够进行新建、删除、修改等操作。比较重要的几个类:(1)Configurati ... [详细]
  • 本文介绍如何使用 Python 的 DOM 和 SAX 方法解析 XML 文件,并通过示例展示了如何动态创建数据库表和处理大量数据的实时插入。 ... [详细]
  • 如何在Linux服务器上配置MySQL和Tomcat的开机自动启动
    在Linux服务器上部署Web项目时,通常需要确保MySQL和Tomcat服务能够随系统启动而自动运行。本文将详细介绍如何在Linux环境中配置MySQL和Tomcat的开机自启动,以确保服务的稳定性和可靠性。通过合理的配置,可以有效避免因服务未启动而导致的项目故障。 ... [详细]
  • 本文介绍了如何利用Shell脚本高效地部署MHA(MySQL High Availability)高可用集群。通过详细的脚本编写和配置示例,展示了自动化部署过程中的关键步骤和注意事项。该方法不仅简化了集群的部署流程,还提高了系统的稳定性和可用性。 ... [详细]
  • 本文详细介绍了在Linux系统上编译安装MySQL 5.5源码的步骤。首先,通过Yum安装必要的依赖软件包,如GCC、GCC-C++等,确保编译环境的完备。接着,下载并解压MySQL 5.5的源码包,配置编译选项,进行编译和安装。最后,完成安装后,进行基本的配置和启动测试,确保MySQL服务正常运行。 ... [详细]
  • HBase Java API 进阶:过滤器详解与应用实例
    本文详细探讨了HBase 1.2.6版本中Java API的高级应用,重点介绍了过滤器的使用方法和实际案例。首先,文章对几种常见的HBase过滤器进行了概述,包括列前缀过滤器(ColumnPrefixFilter)和时间戳过滤器(TimestampsFilter)。此外,还详细讲解了分页过滤器(PageFilter)的实现原理及其在大数据查询中的应用场景。通过具体的代码示例,读者可以更好地理解和掌握这些过滤器的使用技巧,从而提高数据处理的效率和灵活性。 ... [详细]
  • 技术日志:深入探讨Spark Streaming与Spark SQL的融合应用
    技术日志:深入探讨Spark Streaming与Spark SQL的融合应用 ... [详细]
  • 本文详细介绍了HDFS的基础知识及其数据读写机制。首先,文章阐述了HDFS的架构,包括其核心组件及其角色和功能。特别地,对NameNode进行了深入解析,指出其主要负责在内存中存储元数据、目录结构以及文件块的映射关系,并通过持久化方案确保数据的可靠性和高可用性。此外,还探讨了DataNode的角色及其在数据存储和读取过程中的关键作用。 ... [详细]
author-avatar
PHPdudu
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有