热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

sqoop2API踩坑纪录

sqoop2相对sqoop1,最大的优势就是提供了API方式来调用,这样第三方用户就可以根据自己的逻辑进行订制。这里记录下使用sqoop2将mysql数据导入hdfs,hdfs导

sqoop2 相对 sqoop1,最大的优势就是提供了API方式来调用,这样第三方用户就可以根据自己的逻辑进行订制。这里记录下使用 sqoop2 将 mysql 数据导入 hdfs,hdfs 导出到 mysql 两种数据同步。

相关软件

  1. sqoop 1.99.7
  2. hadoop 2.6.0

相关代码

import org.apache.sqoop.client.SqoopClient;
import org.apache.sqoop.model.*;
import org.apache.sqoop.submission.counter.Counter;
import org.apache.sqoop.submission.counter.CounterGroup;
import org.apache.sqoop.submission.counter.Counters;
import org.apache.sqoop.validation.Status;

import java.util.Arrays;
import java.util.UUID;

public class Main {
    static SqoopClient client;

    public static MLink createMysqlLink() {
        MLink link = client.createLink("generic-jdbc-connector");
        // 随机生成名字,不能过长,否则会报错
        link.setName("jdbc-link" + UUID.randomUUID().toString().substring(0, 10));
        link.setCreationUser("xigua");
        MLinkConfig linkCOnfig= link.getConnectorLinkConfig();
        linkConfig.getStringInput("linkConfig.connectionString").setValue("jdbc:mysql://10.9.51.13:3008/data_platform_model");
        linkConfig.getStringInput("linkConfig.jdbcDriver").setValue("com.mysql.jdbc.Driver");
        linkConfig.getStringInput("linkConfig.username").setValue("hive");
        linkConfig.getStringInput("linkConfig.password").setValue("hive");
        // 这里必须指定 identifierEnclose, 他默认是双引号,也会报错
        linkConfig.getStringInput("dialect.identifierEnclose").setValue("`");
        Status status = client.saveLink(link);
        if (status.canProceed()) {
            System.out.println("Created Link with Link Name : " + link.getName());
            return link;
        } else {
            System.out.println("Something went wrong creating the link");
            return null;
        }
    }


    public static MLink createHdfsLink() {
        MLink link = client.createLink("hdfs-connector");
        link.setName("hdfs-link" + UUID.randomUUID().toString().substring(0, 10));
        link.setCreationUser("xigua");
        MLinkConfig linkCOnfig= link.getConnectorLinkConfig();
        linkConfig.getStringInput("linkConfig.uri").setValue("hdfs://qabb-qa-hdp-hbase0:8020");
        linkConfig.getStringInput("linkConfig.confDir").setValue("/data/users/huser/hadoop/etc/hadoop/");
        Status status = client.saveLink(link);
        if (status.canProceed()) {
            System.out.println("Created Link with Link Name : " + link.getName());
            return link;
        } else {
            System.out.println("Something went wrong creating the link");
            return null;
        }


    }

    public static String createMysql2HdfsJob(MLink fromLink, MLink toLink) {
        MJob job = client.createJob(fromLink.getName(), toLink.getName());
        job.setName("xigua-job" + UUID.randomUUID());
        job.setCreationUser("xigua");
        MFromConfig fromJobCOnfig= job.getFromJobConfig();

        fromJobConfig.getStringInput("fromJobConfig.schemaName").setValue("data_platform_model");
        fromJobConfig.getStringInput("fromJobConfig.tableName").setValue("data_import");
        fromJobConfig.getListInput("fromJobConfig.columnList").setValue(Arrays.asList("id", "owner", "alertUserList", "cron"));
        MToConfig toJobCOnfig= job.getToJobConfig();
        toJobConfig.getStringInput("toJobConfig.outputDirectory").setValue("/tmp/sqoop-job/" + UUID.randomUUID());
        toJobConfig.getEnumInput("toJobConfig.outputFormat").setValue("TEXT_FILE");
        toJobConfig.getEnumInput("toJobConfig.compression").setValue("NONE");
        toJobConfig.getBooleanInput("toJobConfig.overrideNullValue").setValue(true);
        MDriverConfig driverCOnfig= job.getDriverConfig();
        driverConfig.getIntegerInput("throttlingConfig.numExtractors").setValue(1);

        Status status = client.saveJob(job);
        if (status.canProceed()) {
            System.out.println("Created Job with Job Name: " + job.getName());
            return job.getName();
        } else {
            System.out.println("Something went wrong creating the job");
            return null;
        }


    }


    public static String createHdfs2MysqlJob(MLink fromLink, MLink toLink) {
        MJob job = client.createJob(fromLink.getName(), toLink.getName());
        job.setName("xigua-job" + UUID.randomUUID());
        job.setCreationUser("xigua");
        MFromConfig fromJobCOnfig= job.getFromJobConfig();

        fromJobConfig.getStringInput("fromJobConfig.inputDirectory").setValue("/tmp/sqoop-job/4/");
        MToConfig toJobCOnfig= job.getToJobConfig();
        toJobConfig.getStringInput("toJobConfig.tableName").setValue("sqoop_jobs");
        toJobConfig.getListInput("toJobConfig.columnList").setValue(Arrays.asList("id", "name", "alerts", "cron"));
        MDriverConfig driverCOnfig= job.getDriverConfig();
        driverConfig.getIntegerInput("throttlingConfig.numExtractors").setValue(1);

        Status status = client.saveJob(job);
        if (status.canProceed()) {
            System.out.println("Created Job with Job Name: " + job.getName());
            return job.getName();
        } else {
            System.out.println("Something went wrong creating the job");
            return null;
        }


    }

    static void startJob(String jobName) {
        //Job start
        MSubmission submission = client.startJob(jobName);
        System.out.println("Job Submission Status : " + submission.getStatus());
        if (submission.getStatus().isRunning() && submission.getProgress() != -1) {
            System.out.println("Progress : " + String.format("%.2f %%", submission.getProgress() * 100));
        }

        System.out.println("Hadoop job id :" + submission.getExternalJobId());
        System.out.println("Job link : " + submission.getExternalLink());
        Counters counters = submission.getCounters();
        if (counters != null) {
            System.out.println("Counters:");
            for (CounterGroup group : counters) {
                System.out.print("\t");
                System.out.println(group.getName());
                for (Counter counter : group) {
                    System.out.print("\t\t");
                    System.out.print(counter.getName());
                    System.out.print(": ");
                    System.out.println(counter.getValue());
                }
            }
        }

    }

    public static void main(String[] args) {
        // 注意sqoop 后面有个 /,如果没有会报下面的错,非常诡异
        // Exception: org.apache.sqoop.common.SqoopException Message: CLIENT_0004:Unable to find valid Kerberos ticket cache (kinit)
        String url = "http://10.9.57.158:12000/sqoop/";
        client = new SqoopClient(url);
        System.out.println(client);

        MLink mysqlLink = createMysqlLink();
        MLink hdfsLink = createHdfsLink();
        // 先把数据导入 hdfs
        startJob(createMysql2HdfsJob(mysqlLink, hdfsLink));
        // 然后再把数据导回 mysql
        startJob(createHdfs2MysqlJob(hdfsLink, mysqlLink));


    }
}

推荐阅读
  • 本文介绍如何通过Java代码调用阿里云短信服务API来实现短信验证码的发送功能,包括必要的依赖添加和关键代码示例。 ... [详细]
  • java datarow_DataSet  DataTable DataRow 深入浅出
    本篇文章适合有一定的基础的人去查看,最好学习过一定net编程基础在来查看此文章。1.概念DataSet是ADO.NET的中心概念。可以把DataSet当成内存中的数据 ... [详细]
  • 本文探讨了如何使用Scrapy框架构建高效的数据采集系统,以及如何通过异步处理技术提升数据存储的效率。同时,文章还介绍了针对不同网站采用的不同采集策略。 ... [详细]
  • 本文探讨了Android系统中联系人数据库的设计,特别是AbstractContactsProvider类的作用与实现。文章提供了对源代码的详细分析,并解释了该类如何支持跨数据库操作及事务处理。源代码可从官方Android网站下载。 ... [详细]
  • 本文介绍了如何使用Java编程语言实现凯撒密码的加密与解密功能。凯撒密码是一种替换式密码,通过将字母表中的每个字母向前或向后移动固定数量的位置来实现加密。 ... [详细]
  • 我在尝试将组合框转换为具有自动完成功能时遇到了一个问题,即页面上的列表框也被转换成了自动完成下拉框,而不是保持原有的多选列表框形式。 ... [详细]
  • 个人博客:打开链接依赖倒置原则定义依赖倒置原则(DependenceInversionPrinciple,DIP)定义如下:Highlevelmo ... [详细]
  • 本文探讨了如何选择一个合适的序列化版本ID(serialVersionUID),包括使用生成器还是简单的整数,以及在不同情况下应如何处理序列化版本ID。 ... [详细]
  • 本文旨在探讨Swift中的Closure与Objective-C中的Block之间的区别与联系,通过定义、使用方式以及外部变量捕获等方面的比较,帮助开发者更好地理解这两种机制的特点及应用场景。 ... [详细]
  • Kubernetes Services详解
    本文深入探讨了Kubernetes中的服务(Services)概念,解释了如何通过Services实现Pods之间的稳定通信,以及如何管理没有选择器的服务。 ... [详细]
  • 深入理解Java字节码:方法调用详解
    本文详细介绍了Java字节码中的方法调用机制,通过具体示例解析了字节码如何处理方法调用及其参数传递。文章由Mahmoud Anouti撰写,原文链接:https://dzone.com/articles/introduction-to-java-bytecode ... [详细]
  • 使用jQuery与百度地图API实现地址转经纬度功能
    本文详细介绍了如何利用jQuery和百度地图API将地址转换为经纬度,包括申请API密钥、页面构建及核心代码实现。 ... [详细]
  • 页面预渲染适用于主要包含静态内容的页面。对于依赖大量API调用的动态页面,建议采用SSR(服务器端渲染),如Nuxt等框架。更多优化策略可参见:https://github.com/HaoChuan9421/vue-cli3-optimization ... [详细]
  • 如何高效学习鸿蒙操作系统:开发者指南
    本文探讨了开发者如何更有效地学习鸿蒙操作系统,提供了来自行业专家的建议,包括系统化学习方法、职业规划建议以及具体的开发技巧。 ... [详细]
  • Java虚拟机及其发展历程
    Java虚拟机(JVM)是每个Java开发者日常工作中不可或缺的一部分,但其背后的运作机制却往往显得神秘莫测。本文将探讨Java及其虚拟机的发展历程,帮助读者深入了解这一关键技术。 ... [详细]
author-avatar
董福强
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有