热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

使用JobControl控制MapReduce任务

代码结构BeanWritable:往数据库读写使用的beanControlJobTest:JobControl任务控制DBInputFormatApp:将关系型数据库的数据导入HD

代码结构

使用JobControl控制MapReduce任务

BeanWritable:往数据库读写使用的bean

ControlJobTest:JobControl任务控制

DBInputFormatApp:将关系型数据库的数据导入HDFS,其中包含了Map、Reduce,内部静态类

DBOutputFormatApp:将HDFS的结构化数据导入关系型数据库

此处关系型数据库使用Mysql

代码如下

BeanWritable.java

/**
 * 
 */
package com.zhen.controlJobTest;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;

import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.lib.db.DBWritable;

/**
 * JavaBean
 * 需要实现Hadoop序列化接口Writable以及与数据库交互时的序列化接口DBWritable
 * 官方API中解释如下:
 * public class DBInputFormat
 *   extends InputFormat implements Configurable
 * 即Mapper的Key是LongWritable类型,不可改变;Value是继承自DBWritable接口的自定义JavaBean
 * 
 * @author FengZhen
 */
public class BeanWritable implements Writable, DBWritable {

    private int id;
    private String name;
    private double height;

    public void readFields(ResultSet resultSet) throws SQLException {
        this.id = resultSet.getInt(1);
        this.name = resultSet.getString(2);
        this.height = resultSet.getDouble(3);
    }

    public void write(PreparedStatement preparedStatement) throws SQLException {
        preparedStatement.setInt(1, id);
        preparedStatement.setString(2, name);
        preparedStatement.setDouble(3, height);
    }

    public void readFields(DataInput dataInput) throws IOException {
        this.id = dataInput.readInt();
        this.name = dataInput.readUTF();
        this.height = dataInput.readDouble();
    }

    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeInt(id);
        dataOutput.writeUTF(name);
        dataOutput.writeDouble(height);
    }

    public void set(int id,String name,double height){
        this.id = id;
        this.name = name;
        this.height = height;
    }
    
    @Override
    public String toString() {
        return id + "\t" + name + "\t" + height;
    }

}

DBInputFormatApp.java

package com.zhen.controlJobTest;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.lib.db.DBWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
 * @author FengZhen 
 * 将mysql数据导入hdfs
 */
public class DBInputFormatApp{

    /**
     * Map
     * 当Map的输出key为LongWritable,value为Text时,reduce可以省略不写,默认reduce也是输出LongWritable:Text
     * */
    public static class DBInputMapper extends Mapper {

        private LongWritable outputKey;
        private Text outputValue;

        @Override
        protected void setup(Mapper.Context context)
                throws IOException, InterruptedException {
            this.outputKey = new LongWritable();
            this.outputValue = new Text();
        }
        
        @Override
        protected void map(LongWritable key, BeanWritable value,
                Mapper.Context context)
                throws IOException, InterruptedException {
            outputKey.set(key.get());;
            outputValue.set(value.toString());
            context.write(outputKey, outputValue);
        }

    }
}

DBOutputFormatApp.java

package com.zhen.controlJobTest;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.lib.db.DBWritable;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;


/**
 * @author FengZhen
 * 将hdfs数据导入mysql
 * 使用DBOutputFormat将HDFS路径下的结构化数据写入mysql中,结构化数据如下,第一列为key,后边三列为数据
 * 0    1    Enzo    180.66
 * 1    2    Din    170.666
 * 
 */
public class DBOutputFormatApp{
    
    public static class DBOutputMapper extends Mapper{
        private NullWritable outputKey;
        private BeanWritable outputValue;

        @Override
        protected void setup(Mapper.Context context)
                throws IOException, InterruptedException {
            this.outputKey = NullWritable.get();
            this.outputValue = new BeanWritable();
        }
        @Override
        protected void map(LongWritable key, Text value,
                Mapper.Context context)
                throws IOException, InterruptedException {
            //插入数据库成功的计数器
            final Counter successCounter = context.getCounter("exec", "successfully");
            //插入数据库失败的计数器
            final Counter faildCounter = context.getCounter("exec", "faild");
            //解析结构化数据
            String[] fields = value.toString().split("\t");
            //DBOutputFormatApp这个MapReduce应用导出的数据包含long类型的key,所以忽略key从1开始
            if (fields.length > 3) {
                int id = Integer.parseInt(fields[1]);
                String name = fields[2];
                double height = Double.parseDouble(fields[3]);
                this.outputValue.set(id, name, height);
                context.write(outputKey, outputValue);
                //如果插入数据库成功则递增1,表示成功计数
                successCounter.increment(1L);
            }else{
                //如果插入数据库失败则递增1,表示失败计数
                faildCounter.increment(1L);
            }
            
        }
    }
    
    /**
     * 输出的key必须是继承自DBWritable的类型,DBOutputFormat要求输出的key必须是DBWritable类型
     * */
    public static class DBOutputReducer extends Reducer{
        @Override
        protected void reduce(NullWritable key, Iterable values,
                Reducer.Context context)
                throws IOException, InterruptedException {
            for (BeanWritable beanWritable : values) {
                context.write(beanWritable, key);
            }
        }
    }
    
    
}

ControlJobTest.java

/**
 * 
 */
package com.zhen.controlJobTest;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import com.zhen.controlJobTest.DBInputFormatApp.DBInputMapper;
import com.zhen.controlJobTest.DBOutputFormatApp.DBOutputMapper;
import com.zhen.controlJobTest.DBOutputFormatApp.DBOutputReducer;

/**
 * @author FengZhen
 *
 */
public class ControlJobTest {

    public static void main(String[] args) throws IOException {
        //第一个任务,mysql导入到HDFS
        Configuration configuration1 = new Configuration();
        //配置当前作业需要使用的JDBC配置
        DBConfiguration.configureDB(configuration1, "com.mysql.jdbc.Driver", "jdbc:mysql://localhost:3306/hadoop",
                "root", "123qwe");
        Job job1 = Job.getInstance(configuration1, DBInputFormatApp.class.getSimpleName());

        job1.setJarByClass(DBInputFormatApp.class);
        job1.setMapperClass(DBInputMapper.class);
        job1.setMapOutputKeyClass(LongWritable.class);
        job1.setMapOutputValueClass(Text.class);

        job1.setOutputKeyClass(LongWritable.class);
        job1.setOutputValueClass(Text.class);

        //配置作业的输入数据格式
        job1.setInputFormatClass(DBInputFormat.class);
        //配置当前作业需要查询的sql语句及接收sql语句的bean
        DBInputFormat.setInput(
                job1, 
                BeanWritable.class, 
                "select * from people", 
                "select count(1) from people");
        
        FileOutputFormat.setOutputPath(job1, new Path(args[0]));
        
        //第二个任务 HDFS导出到mysql
        
        Configuration configuration2 = new Configuration();
        //在创建Configuration的时候紧接着配置数据库连接信息
        DBConfiguration.configureDB(configuration2, "com.mysql.jdbc.Driver", "jdbc:mysql://localhost:3306/hadoop", "root", "123qwe");
        Job job2 = Job.getInstance(configuration2, DBOutputFormatApp.class.getSimpleName());
        job2.setJarByClass(DBOutputFormatApp.class);
        job2.setMapperClass(DBOutputMapper.class);
        job2.setMapOutputKeyClass(NullWritable.class);
        job2.setMapOutputValueClass(BeanWritable.class);
        
        job2.setReducerClass(DBOutputReducer.class);
        job2.setOutputFormatClass(DBOutputFormat.class);
        job2.setOutputKeyClass(BeanWritable.class);
        job2.setOutputValueClass(NullWritable.class);
        
        job2.setInputFormatClass(TextInputFormat.class);
        FileInputFormat.setInputPaths(job2, args[0]);
        //配置当前作业输出到数据库表、字段信息
        DBOutputFormat.setOutput(job2, "people", new String[]{"id","name","height"});
        
        ControlledJob controlledJob1 = new ControlledJob(configuration1);
        controlledJob1.setJob(job1);
        
        ControlledJob controlledJob2 = new ControlledJob(configuration2);
        controlledJob2.setJob(job2);

        //如果两个任务有依赖关系,必须设置此选项
        controlledJob2.addDependingJob(controlledJob1);
        
        JobControl jobControl = new JobControl("groupName");
        jobControl.addJob(controlledJob1);
        jobControl.addJob(controlledJob2);
        jobControl.run();
        
        while(true){
            boolean allFinished = jobControl.allFinished();
            if (allFinished) {
                System.exit(0);
            }
        }
        
    }
    
}

mysql表结构如下

CREATE TABLE `people` (
  `id` int(11) NOT NULL,
  `name` varchar(255) DEFAULT NULL,
  `height` double DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8

先插入测试数据

然后将代码打包为jar,传到服务器,执行任务

hadoop jar /Users/FengZhen/Desktop/Hadoop/other/mapreduce_jar/JobControlTest.jar com.zhen.controlJobTest.ControlJobTest 
/user/hadoop/mapreduce/mysqlToHdfs/people

此任务包含了两个子任务,一个是将mysql数据导入HDFS,一个是将HDFS的数据导出Mysql,也可以写个简单的mapreduce任务来测试。

如果两个子任务有依赖关系,那么必须要设置

controlledJob2.addDependingJob(controlledJob1);

说明job2依赖于job1,当job1执行完之后才会去执行job2.

 


推荐阅读
  • importorg.apache.hadoop.hdfs.DistributedFileSystem;导入方法依赖的package包类privatevoidtestHSyncOpe ... [详细]
  • Hadoop2.6.0 + 云centos +伪分布式只谈部署
    3.0.3玩不好,现将2.6.0tar.gz上传到usr,chmod-Rhadoop:hadophadoop-2.6.0,rm掉3.0.32.在etcp ... [详细]
  • mapreduce源码分析总结
    这篇文章总结的非常到位,故而转之一MapReduce概述MapReduce是一个用于大规模数据处理的分布式计算模型,它最初是由Google工程师设计并实现的ÿ ... [详细]
  • 如何实现织梦DedeCms全站伪静态
    本文介绍了如何通过修改织梦DedeCms源代码来实现全站伪静态,以提高管理和SEO效果。全站伪静态可以避免重复URL的问题,同时通过使用mod_rewrite伪静态模块和.htaccess正则表达式,可以更好地适应搜索引擎的需求。文章还提到了一些相关的技术和工具,如Ubuntu、qt编程、tomcat端口、爬虫、php request根目录等。 ... [详细]
  • 本文介绍了在开发Android新闻App时,搭建本地服务器的步骤。通过使用XAMPP软件,可以一键式搭建起开发环境,包括Apache、MySQL、PHP、PERL。在本地服务器上新建数据库和表,并设置相应的属性。最后,给出了创建new表的SQL语句。这个教程适合初学者参考。 ... [详细]
  • Java序列化对象传给PHP的方法及原理解析
    本文介绍了Java序列化对象传给PHP的方法及原理,包括Java对象传递的方式、序列化的方式、PHP中的序列化用法介绍、Java是否能反序列化PHP的数据、Java序列化的原理以及解决Java序列化中的问题。同时还解释了序列化的概念和作用,以及代码执行序列化所需要的权限。最后指出,序列化会将对象实例的所有字段都进行序列化,使得数据能够被表示为实例的序列化数据,但只有能够解释该格式的代码才能够确定数据的内容。 ... [详细]
  • 在说Hibernate映射前,我们先来了解下对象关系映射ORM。ORM的实现思想就是将关系数据库中表的数据映射成对象,以对象的形式展现。这样开发人员就可以把对数据库的操作转化为对 ... [详细]
  • PDO MySQL
    PDOMySQL如果文章有成千上万篇,该怎样保存?数据保存有多种方式,比如单机文件、单机数据库(SQLite)、网络数据库(MySQL、MariaDB)等等。根据项目来选择,做We ... [详细]
  • React基础篇一 - JSX语法扩展与使用
    本文介绍了React基础篇一中的JSX语法扩展与使用。JSX是一种JavaScript的语法扩展,用于描述React中的用户界面。文章详细介绍了在JSX中使用表达式的方法,并给出了一个示例代码。最后,提到了JSX在编译后会被转化为普通的JavaScript对象。 ... [详细]
  • 大数据Hadoop生态(20)MapReduce框架原理OutputFormat的开发笔记
    本文介绍了大数据Hadoop生态(20)MapReduce框架原理OutputFormat的开发笔记,包括outputFormat接口实现类、自定义outputFormat步骤和案例。案例中将包含nty的日志输出到nty.log文件,其他日志输出到other.log文件。同时提供了一些相关网址供参考。 ... [详细]
  • 本文介绍了禅道作为一款国产开源免费的测试管理工具的特点和功能,并提供了禅道的搭建和调试方法。禅道是一款B/S结构的项目管理工具,可以实现组织管理、后台管理、产品管理、项目管理和测试管理等功能。同时,本文还介绍了其他软件测试相关工具,如功能自动化工具和性能自动化工具,以及白盒测试工具的使用。通过本文的阅读,读者可以了解禅道的基本使用方法和优势,从而更好地进行测试管理工作。 ... [详细]
  • 本文介绍了解决java开源项目apache commons email简单使用报错的方法,包括使用正确的JAR包和正确的代码配置,以及相关参数的设置。详细介绍了如何使用apache commons email发送邮件。 ... [详细]
  • Tomcat安装与配置教程及常见问题解决方法
    本文介绍了Tomcat的安装与配置教程,包括jdk版本的选择、域名解析、war文件的部署和访问、常见问题的解决方法等。其中涉及到的问题包括403问题、数据库连接问题、1130错误、2003错误、Java Runtime版本不兼容问题以及502错误等。最后还提到了项目的前后端连接代码的配置。通过本文的指导,读者可以顺利完成Tomcat的安装与配置,并解决常见的问题。 ... [详细]
  • 本文整理了Java中org.apache.solr.common.SolrDocument.setField()方法的一些代码示例,展示了SolrDocum ... [详细]
  • Hadoop源码解析1Hadoop工程包架构解析
    1 Hadoop中各工程包依赖简述   Google的核心竞争技术是它的计算平台。Google的大牛们用了下面5篇文章,介绍了它们的计算设施。   GoogleCluster:ht ... [详细]
author-avatar
秋日里的一抹阳光_797
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有