热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

sqoop1.99.4JAVAAPI操作

sqoop1.99.4JAVAAPI操作如果你是MAVEN项目1<dependency>2<groupI

sqoop1.99.4 JAVA API操作
如果你是MAVEN项目
1 <dependency>
2   <groupId>org.apache.sqoopgroupId>
3     <artifactId>sqoop-clientartifactId>
4     <version>1.99.4version>
5 dependency>

如果你是java项目

导入sqoop1.99.4中shell目录下的lib里面全部jar包就行(不用server中的)


HDFS->MYSQL

package org.admln.sqoopOperate;

import org.apache.sqoop.client.SqoopClient;
import org.apache.sqoop.model.MFromConfig;
import org.apache.sqoop.model.MJob;
import org.apache.sqoop.model.MLink;
import org.apache.sqoop.model.MLinkConfig;
import org.apache.sqoop.model.MSubmission;
import org.apache.sqoop.model.MToConfig;
import org.apache.sqoop.submission.counter.Counter;
import org.apache.sqoop.submission.counter.CounterGroup;
import org.apache.sqoop.submission.counter.Counters;
import org.apache.sqoop.validation.Status;

public class HDFSToMysql {
    public static void main(String[] args) {
        sqoopTransfer();
    }
    public static void sqoopTransfer() {
        //初始化
        String url = "http://hadoop:12000/sqoop/";
        SqoopClient client = new SqoopClient(url);
        
        //创建一个源链接 HDFS
        long fromCOnnectorId= 1;
        MLink fromLink = client.createLink(fromConnectorId);
        fromLink.setName("HDFS connector");
        fromLink.setCreationUser("admln");
        MLinkConfig fromLinkCOnfig= fromLink.getConnectorLinkConfig();
        fromLinkConfig.getStringInput("linkConfig.uri").setValue("hdfs://hadoop:8020/");
        Status fromStatus = client.saveLink(fromLink);
        if(fromStatus.canProceed()) {
         System.out.println("创建HDFS Link成功,ID为: " + fromLink.getPersistenceId());
        } else {
         System.out.println("创建HDFS Link失败");
        }
        //创建一个目的地链接 JDBC
        long toCOnnectorId= 2;
        MLink toLink = client.createLink(toConnectorId);
        toLink.setName("JDBC connector");
        toLink.setCreationUser("admln");
        MLinkConfig toLinkCOnfig= toLink.getConnectorLinkConfig();
        toLinkConfig.getStringInput("linkConfig.connectionString").setValue("jdbc:mysql://hadoop:3306/hive");
        toLinkConfig.getStringInput("linkConfig.jdbcDriver").setValue("com.mysql.jdbc.Driver");
        toLinkConfig.getStringInput("linkConfig.username").setValue("hive");
        toLinkConfig.getStringInput("linkConfig.password").setValue("hive");
        Status toStatus = client.saveLink(toLink);
        if(toStatus.canProceed()) {
         System.out.println("创建JDBC Link成功,ID为: " + toLink.getPersistenceId());
        } else {
         System.out.println("创建JDBC Link失败");
        }
        
        //创建一个任务
        long fromLinkId = fromLink.getPersistenceId();
        long toLinkId = toLink.getPersistenceId();
        MJob job = client.createJob(fromLinkId, toLinkId);
        job.setName("HDFS to MySQL job");
        job.setCreationUser("admln");
        //设置源链接任务配置信息
        MFromConfig fromJobCOnfig= job.getFromJobConfig();
        fromJobConfig.getStringInput("fromJobConfig.inputDirectory").setValue("/out/aboutyunLog/HiveExport/ipstatistical/data");
        
        //创建目的地链接任务配置信息
        MToConfig toJobCOnfig= job.getToJobConfig();
        toJobConfig.getStringInput("toJobConfig.schemaName").setValue("aboutyunlog");
        toJobConfig.getStringInput("toJobConfig.tableName").setValue("ipstatistical");
        //toJobConfig.getStringInput("fromJobConfig.partitionColumn").setValue("id");
        // set the driver config values
        //MDriverConfig driverCOnfig= job.getDriverConfig();
        //driverConfig.getStringInput("throttlingConfig.numExtractors").setValue("3");//这句还没弄明白
        Status status = client.saveJob(job);
        if(status.canProceed()) {
         System.out.println("JOB创建成功,ID为: "+ job.getPersistenceId());
        } else {
         System.out.println("JOB创建失败。");
        }
        
        //启动任务
        long jobId = job.getPersistenceId();
        MSubmission submission = client.startJob(jobId);
        System.out.println("JOB提交状态为 : " + submission.getStatus());
        while(submission.getStatus().isRunning() && submission.getProgress() != -1) {
          System.out.println("进度 : " + String.format("%.2f %%", submission.getProgress() * 100));
          //三秒报告一次进度
          try {
            Thread.sleep(3000);
          } catch (InterruptedException e) {
            e.printStackTrace();
          }
        }
        System.out.println("JOB执行结束... ...");
        System.out.println("Hadoop任务ID为 :" + submission.getExternalId());
        Counters counters = submission.getCounters();
        if(counters != null) {
          System.out.println("计数器:");
          for(CounterGroup group : counters) {
            System.out.print("\t");
            System.out.println(group.getName());
            for(Counter counter : group) {
              System.out.print("\t\t");
              System.out.print(counter.getName());
              System.out.print(": ");
              System.out.println(counter.getValue());
            }
          }
        }
        if(submission.getExceptionInfo() != null) {
          System.out.println("JOB执行异常,异常信息为 : " +submission.getExceptionInfo());
        }
        System.out.println("HDFS通过sqoop传输数据到MySQL统计执行完毕");
    }
}

MYSQL->HDFS
package org.admln.sqoopOperate;

import org.apache.sqoop.client.SqoopClient;
import org.apache.sqoop.model.MDriverConfig;
import org.apache.sqoop.model.MFromConfig;
import org.apache.sqoop.model.MJob;
import org.apache.sqoop.model.MLink;
import org.apache.sqoop.model.MLinkConfig;
import org.apache.sqoop.model.MSubmission;
import org.apache.sqoop.model.MToConfig;
import org.apache.sqoop.submission.counter.Counter;
import org.apache.sqoop.submission.counter.CounterGroup;
import org.apache.sqoop.submission.counter.Counters;
import org.apache.sqoop.validation.Status;

public class MysqlToHDFS {
    public static void main(String[] args) {
        sqoopTransfer();
    }
    public static void sqoopTransfer() {
        //初始化
        String url = "http://hadoop:12000/sqoop/";
        SqoopClient client = new SqoopClient(url);
        
        //创建一个源链接 JDBC
        long fromCOnnectorId= 2;
        MLink fromLink = client.createLink(fromConnectorId);
        fromLink.setName("JDBC connector");
        fromLink.setCreationUser("admln");
        MLinkConfig fromLinkCOnfig= fromLink.getConnectorLinkConfig();
        fromLinkConfig.getStringInput("linkConfig.connectionString").setValue("jdbc:mysql://hadoop:3306/hive");
        fromLinkConfig.getStringInput("linkConfig.jdbcDriver").setValue("com.mysql.jdbc.Driver");
        fromLinkConfig.getStringInput("linkConfig.username").setValue("hive");
        fromLinkConfig.getStringInput("linkConfig.password").setValue("hive");
        Status fromStatus = client.saveLink(fromLink);
        if(fromStatus.canProceed()) {
         System.out.println("创建JDBC Link成功,ID为: " + fromLink.getPersistenceId());
        } else {
         System.out.println("创建JDBC Link失败");
        }
        //创建一个目的地链接HDFS
        long toCOnnectorId= 1;
        MLink toLink = client.createLink(toConnectorId);
        toLink.setName("HDFS connector");
        toLink.setCreationUser("admln");
        MLinkConfig toLinkCOnfig= toLink.getConnectorLinkConfig();
        toLinkConfig.getStringInput("linkConfig.uri").setValue("hdfs://hadoop:8020/");
        Status toStatus = client.saveLink(toLink);
        if(toStatus.canProceed()) {
         System.out.println("创建HDFS Link成功,ID为: " + toLink.getPersistenceId());
        } else {
         System.out.println("创建HDFS Link失败");
        }
        
        //创建一个任务
        long fromLinkId = fromLink.getPersistenceId();
        long toLinkId = toLink.getPersistenceId();
        MJob job = client.createJob(fromLinkId, toLinkId);
        job.setName("MySQL to HDFS job");
        job.setCreationUser("admln");
        //设置源链接任务配置信息
        MFromConfig fromJobCOnfig= job.getFromJobConfig();
        fromJobConfig.getStringInput("fromJobConfig.schemaName").setValue("sqoop");
        fromJobConfig.getStringInput("fromJobConfig.tableName").setValue("sqoop");
        fromJobConfig.getStringInput("fromJobConfig.partitionColumn").setValue("id");
        MToConfig toJobCOnfig= job.getToJobConfig();
        toJobConfig.getStringInput("toJobConfig.outputDirectory").setValue("/usr/tmp");
        MDriverConfig driverCOnfig= job.getDriverConfig();
        driverConfig.getStringInput("throttlingConfig.numExtractors").setValue("3");

        Status status = client.saveJob(job);
        if(status.canProceed()) {
         System.out.println("JOB创建成功,ID为: "+ job.getPersistenceId());
        } else {
         System.out.println("JOB创建失败。");
        }
        
        //启动任务
        long jobId = job.getPersistenceId();
        MSubmission submission = client.startJob(jobId);
        System.out.println("JOB提交状态为 : " + submission.getStatus());
        while(submission.getStatus().isRunning() && submission.getProgress() != -1) {
          System.out.println("进度 : " + String.format("%.2f %%", submission.getProgress() * 100));
          //三秒报告一次进度
          try {
            Thread.sleep(3000);
          } catch (InterruptedException e) {
            e.printStackTrace();
          }
        }
        System.out.println("JOB执行结束... ...");
        System.out.println("Hadoop任务ID为 :" + submission.getExternalId());
        Counters counters = submission.getCounters();
        if(counters != null) {
          System.out.println("计数器:");
          for(CounterGroup group : counters) {
            System.out.print("\t");
            System.out.println(group.getName());
            for(Counter counter : group) {
              System.out.print("\t\t");
              System.out.print(counter.getName());
              System.out.print(": ");
              System.out.println(counter.getValue());
            }
          }
        }
        if(submission.getExceptionInfo() != null) {
          System.out.println("JOB执行异常,异常信息为 : " +submission.getExceptionInfo());
        }
        System.out.println("MySQL通过sqoop传输数据到HDFS统计执行完毕");
    }
}

别问为什么没有MYSQL和HBASE、HIVE互导的代码


推荐阅读
  • [root@cloud4conf]#sqoopexport--connectjdbc:mysql:192.168.56.1:3306hive--usernameroot--pas ... [详细]
  • 怎么快速学好大数据开发?
    新如何学习大数据技术?大数据怎么入门?怎么做大数据分析?数据科学需要学习那些技术?大数据的应用前景等等问题,已成为热门大数据领域热门问题,以下是对新手如何学习大数据技术问题的解答! ... [详细]
  • 本文介绍了闭包的定义和运转机制,重点解释了闭包如何能够接触外部函数的作用域中的变量。通过词法作用域的查找规则,闭包可以访问外部函数的作用域。同时还提到了闭包的作用和影响。 ... [详细]
  • Java容器中的compareto方法排序原理解析
    本文从源码解析Java容器中的compareto方法的排序原理,讲解了在使用数组存储数据时的限制以及存储效率的问题。同时提到了Redis的五大数据结构和list、set等知识点,回忆了作者大学时代的Java学习经历。文章以作者做的思维导图作为目录,展示了整个讲解过程。 ... [详细]
  • 使用在线工具jsonschema2pojo根据json生成java对象
    本文介绍了使用在线工具jsonschema2pojo根据json生成java对象的方法。通过该工具,用户只需将json字符串复制到输入框中,即可自动将其转换成java对象。该工具还能解析列表式的json数据,并将嵌套在内层的对象也解析出来。本文以请求github的api为例,展示了使用该工具的步骤和效果。 ... [详细]
  • XML介绍与使用的概述及标签规则
    本文介绍了XML的基本概念和用途,包括XML的可扩展性和标签的自定义特性。同时还详细解释了XML标签的规则,包括标签的尖括号和合法标识符的组成,标签必须成对出现的原则以及特殊标签的使用方法。通过本文的阅读,读者可以对XML的基本知识有一个全面的了解。 ... [详细]
  • 本文介绍了Web学习历程记录中关于Tomcat的基本概念和配置。首先解释了Web静态Web资源和动态Web资源的概念,以及C/S架构和B/S架构的区别。然后介绍了常见的Web服务器,包括Weblogic、WebSphere和Tomcat。接着详细讲解了Tomcat的虚拟主机、web应用和虚拟路径映射的概念和配置过程。最后简要介绍了http协议的作用。本文内容详实,适合初学者了解Tomcat的基础知识。 ... [详细]
  • Java太阳系小游戏分析和源码详解
    本文介绍了一个基于Java的太阳系小游戏的分析和源码详解。通过对面向对象的知识的学习和实践,作者实现了太阳系各行星绕太阳转的效果。文章详细介绍了游戏的设计思路和源码结构,包括工具类、常量、图片加载、面板等。通过这个小游戏的制作,读者可以巩固和应用所学的知识,如类的继承、方法的重载与重写、多态和封装等。 ... [详细]
  • Commit1ced2a7433ea8937a1b260ea65d708f32ca7c95eintroduceda+Clonetraitboundtom ... [详细]
  • 从接触DataX起就有一个疑问,它和Sqoop到底有什么区别,昨天部署好了DataX和Sqoop,就可以对两者进行更深入的了解了。两者从原理上看有点相似,都是解决异构环境的数据交换 ... [详细]
  • 本文目录一览:1、大数据培训课程大纲要学什么课程? ... [详细]
  •   大数据与云计算的就职方向有哪些,其实在找工作的时候,我们不仅要看我们所学的专业名称,更要看看哪些岗位的岗位要求符合我们所学的内容。  例如大数据开发工程师这一职位,其岗位职责包 ... [详细]
  • 数据仓库分层设计(基于Hive)
    1、数据仓库逻辑分层架构先来看数据仓库的逻辑分层架构:分层名称可能不一样,但基本是都是这样想要看懂数据仓库的逻辑分层架构,先要弄懂以下概念 ... [详细]
  • hive中创建带有主键的表要加DISABLE或enable作为约束条件
    问题:在hive中创建一个带有primarykey的表,发现报错。 ... [详细]
  • 先看下面这条语句,它实现的功能是将特定日期的数据从mysql表中直接导入hive$sqoopimport\--connectjdbc:mysql:192.168.xx ... [详细]
author-avatar
昏暗的夜风刮过
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有