热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

sqoop1.99.4JAVAAPI操作

sqoop1.99.4JAVAAPI操作如果你是MAVEN项目1<dependency>2<groupI

sqoop1.99.4 JAVA API操作
如果你是MAVEN项目
1 <dependency>
2   <groupId>org.apache.sqoopgroupId>
3     <artifactId>sqoop-clientartifactId>
4     <version>1.99.4version>
5 dependency>

如果你是java项目

导入sqoop1.99.4中shell目录下的lib里面全部jar包就行(不用server中的)


HDFS->MYSQL

package org.admln.sqoopOperate;

import org.apache.sqoop.client.SqoopClient;
import org.apache.sqoop.model.MFromConfig;
import org.apache.sqoop.model.MJob;
import org.apache.sqoop.model.MLink;
import org.apache.sqoop.model.MLinkConfig;
import org.apache.sqoop.model.MSubmission;
import org.apache.sqoop.model.MToConfig;
import org.apache.sqoop.submission.counter.Counter;
import org.apache.sqoop.submission.counter.CounterGroup;
import org.apache.sqoop.submission.counter.Counters;
import org.apache.sqoop.validation.Status;

public class HDFSToMysql {
    public static void main(String[] args) {
        sqoopTransfer();
    }
    public static void sqoopTransfer() {
        //初始化
        String url = "http://hadoop:12000/sqoop/";
        SqoopClient client = new SqoopClient(url);
        
        //创建一个源链接 HDFS
        long fromCOnnectorId= 1;
        MLink fromLink = client.createLink(fromConnectorId);
        fromLink.setName("HDFS connector");
        fromLink.setCreationUser("admln");
        MLinkConfig fromLinkCOnfig= fromLink.getConnectorLinkConfig();
        fromLinkConfig.getStringInput("linkConfig.uri").setValue("hdfs://hadoop:8020/");
        Status fromStatus = client.saveLink(fromLink);
        if(fromStatus.canProceed()) {
         System.out.println("创建HDFS Link成功,ID为: " + fromLink.getPersistenceId());
        } else {
         System.out.println("创建HDFS Link失败");
        }
        //创建一个目的地链接 JDBC
        long toCOnnectorId= 2;
        MLink toLink = client.createLink(toConnectorId);
        toLink.setName("JDBC connector");
        toLink.setCreationUser("admln");
        MLinkConfig toLinkCOnfig= toLink.getConnectorLinkConfig();
        toLinkConfig.getStringInput("linkConfig.connectionString").setValue("jdbc:mysql://hadoop:3306/hive");
        toLinkConfig.getStringInput("linkConfig.jdbcDriver").setValue("com.mysql.jdbc.Driver");
        toLinkConfig.getStringInput("linkConfig.username").setValue("hive");
        toLinkConfig.getStringInput("linkConfig.password").setValue("hive");
        Status toStatus = client.saveLink(toLink);
        if(toStatus.canProceed()) {
         System.out.println("创建JDBC Link成功,ID为: " + toLink.getPersistenceId());
        } else {
         System.out.println("创建JDBC Link失败");
        }
        
        //创建一个任务
        long fromLinkId = fromLink.getPersistenceId();
        long toLinkId = toLink.getPersistenceId();
        MJob job = client.createJob(fromLinkId, toLinkId);
        job.setName("HDFS to MySQL job");
        job.setCreationUser("admln");
        //设置源链接任务配置信息
        MFromConfig fromJobCOnfig= job.getFromJobConfig();
        fromJobConfig.getStringInput("fromJobConfig.inputDirectory").setValue("/out/aboutyunLog/HiveExport/ipstatistical/data");
        
        //创建目的地链接任务配置信息
        MToConfig toJobCOnfig= job.getToJobConfig();
        toJobConfig.getStringInput("toJobConfig.schemaName").setValue("aboutyunlog");
        toJobConfig.getStringInput("toJobConfig.tableName").setValue("ipstatistical");
        //toJobConfig.getStringInput("fromJobConfig.partitionColumn").setValue("id");
        // set the driver config values
        //MDriverConfig driverCOnfig= job.getDriverConfig();
        //driverConfig.getStringInput("throttlingConfig.numExtractors").setValue("3");//这句还没弄明白
        Status status = client.saveJob(job);
        if(status.canProceed()) {
         System.out.println("JOB创建成功,ID为: "+ job.getPersistenceId());
        } else {
         System.out.println("JOB创建失败。");
        }
        
        //启动任务
        long jobId = job.getPersistenceId();
        MSubmission submission = client.startJob(jobId);
        System.out.println("JOB提交状态为 : " + submission.getStatus());
        while(submission.getStatus().isRunning() && submission.getProgress() != -1) {
          System.out.println("进度 : " + String.format("%.2f %%", submission.getProgress() * 100));
          //三秒报告一次进度
          try {
            Thread.sleep(3000);
          } catch (InterruptedException e) {
            e.printStackTrace();
          }
        }
        System.out.println("JOB执行结束... ...");
        System.out.println("Hadoop任务ID为 :" + submission.getExternalId());
        Counters counters = submission.getCounters();
        if(counters != null) {
          System.out.println("计数器:");
          for(CounterGroup group : counters) {
            System.out.print("\t");
            System.out.println(group.getName());
            for(Counter counter : group) {
              System.out.print("\t\t");
              System.out.print(counter.getName());
              System.out.print(": ");
              System.out.println(counter.getValue());
            }
          }
        }
        if(submission.getExceptionInfo() != null) {
          System.out.println("JOB执行异常,异常信息为 : " +submission.getExceptionInfo());
        }
        System.out.println("HDFS通过sqoop传输数据到MySQL统计执行完毕");
    }
}

MYSQL->HDFS
package org.admln.sqoopOperate;

import org.apache.sqoop.client.SqoopClient;
import org.apache.sqoop.model.MDriverConfig;
import org.apache.sqoop.model.MFromConfig;
import org.apache.sqoop.model.MJob;
import org.apache.sqoop.model.MLink;
import org.apache.sqoop.model.MLinkConfig;
import org.apache.sqoop.model.MSubmission;
import org.apache.sqoop.model.MToConfig;
import org.apache.sqoop.submission.counter.Counter;
import org.apache.sqoop.submission.counter.CounterGroup;
import org.apache.sqoop.submission.counter.Counters;
import org.apache.sqoop.validation.Status;

public class MysqlToHDFS {
    public static void main(String[] args) {
        sqoopTransfer();
    }
    public static void sqoopTransfer() {
        //初始化
        String url = "http://hadoop:12000/sqoop/";
        SqoopClient client = new SqoopClient(url);
        
        //创建一个源链接 JDBC
        long fromCOnnectorId= 2;
        MLink fromLink = client.createLink(fromConnectorId);
        fromLink.setName("JDBC connector");
        fromLink.setCreationUser("admln");
        MLinkConfig fromLinkCOnfig= fromLink.getConnectorLinkConfig();
        fromLinkConfig.getStringInput("linkConfig.connectionString").setValue("jdbc:mysql://hadoop:3306/hive");
        fromLinkConfig.getStringInput("linkConfig.jdbcDriver").setValue("com.mysql.jdbc.Driver");
        fromLinkConfig.getStringInput("linkConfig.username").setValue("hive");
        fromLinkConfig.getStringInput("linkConfig.password").setValue("hive");
        Status fromStatus = client.saveLink(fromLink);
        if(fromStatus.canProceed()) {
         System.out.println("创建JDBC Link成功,ID为: " + fromLink.getPersistenceId());
        } else {
         System.out.println("创建JDBC Link失败");
        }
        //创建一个目的地链接HDFS
        long toCOnnectorId= 1;
        MLink toLink = client.createLink(toConnectorId);
        toLink.setName("HDFS connector");
        toLink.setCreationUser("admln");
        MLinkConfig toLinkCOnfig= toLink.getConnectorLinkConfig();
        toLinkConfig.getStringInput("linkConfig.uri").setValue("hdfs://hadoop:8020/");
        Status toStatus = client.saveLink(toLink);
        if(toStatus.canProceed()) {
         System.out.println("创建HDFS Link成功,ID为: " + toLink.getPersistenceId());
        } else {
         System.out.println("创建HDFS Link失败");
        }
        
        //创建一个任务
        long fromLinkId = fromLink.getPersistenceId();
        long toLinkId = toLink.getPersistenceId();
        MJob job = client.createJob(fromLinkId, toLinkId);
        job.setName("MySQL to HDFS job");
        job.setCreationUser("admln");
        //设置源链接任务配置信息
        MFromConfig fromJobCOnfig= job.getFromJobConfig();
        fromJobConfig.getStringInput("fromJobConfig.schemaName").setValue("sqoop");
        fromJobConfig.getStringInput("fromJobConfig.tableName").setValue("sqoop");
        fromJobConfig.getStringInput("fromJobConfig.partitionColumn").setValue("id");
        MToConfig toJobCOnfig= job.getToJobConfig();
        toJobConfig.getStringInput("toJobConfig.outputDirectory").setValue("/usr/tmp");
        MDriverConfig driverCOnfig= job.getDriverConfig();
        driverConfig.getStringInput("throttlingConfig.numExtractors").setValue("3");

        Status status = client.saveJob(job);
        if(status.canProceed()) {
         System.out.println("JOB创建成功,ID为: "+ job.getPersistenceId());
        } else {
         System.out.println("JOB创建失败。");
        }
        
        //启动任务
        long jobId = job.getPersistenceId();
        MSubmission submission = client.startJob(jobId);
        System.out.println("JOB提交状态为 : " + submission.getStatus());
        while(submission.getStatus().isRunning() && submission.getProgress() != -1) {
          System.out.println("进度 : " + String.format("%.2f %%", submission.getProgress() * 100));
          //三秒报告一次进度
          try {
            Thread.sleep(3000);
          } catch (InterruptedException e) {
            e.printStackTrace();
          }
        }
        System.out.println("JOB执行结束... ...");
        System.out.println("Hadoop任务ID为 :" + submission.getExternalId());
        Counters counters = submission.getCounters();
        if(counters != null) {
          System.out.println("计数器:");
          for(CounterGroup group : counters) {
            System.out.print("\t");
            System.out.println(group.getName());
            for(Counter counter : group) {
              System.out.print("\t\t");
              System.out.print(counter.getName());
              System.out.print(": ");
              System.out.println(counter.getValue());
            }
          }
        }
        if(submission.getExceptionInfo() != null) {
          System.out.println("JOB执行异常,异常信息为 : " +submission.getExceptionInfo());
        }
        System.out.println("MySQL通过sqoop传输数据到HDFS统计执行完毕");
    }
}

别问为什么没有MYSQL和HBASE、HIVE互导的代码


推荐阅读
author-avatar
昏暗的夜风刮过
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有