热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

执行MapReduce原码分析

job提交:   publicvoidsubmit()throwsIOException,InterruptedException,ClassNotFoundException{e

job提交:

 


执行MapReduce-原码分析
 

 

  public void submit() throws IOException, InterruptedException, 
                              ClassNotFoundException {
    ensureState(JobState.DEFINE);
    setUseNewAPI();
    
    // Connect to the JobTracker and submit the job
    connect();
    info =  jobClient.submitJobInternal(conf);;
    super.setJobID(info.getID());
    state = JobState.RUNNING;
   }

//创建一个client链接
  private synchronized void connect()
          throws IOException, InterruptedException, ClassNotFoundException {
    if (cluster == null) {
      cluster = 
        ugi.doAs(new PrivilegedExceptionAction() {
                   public Cluster run()
                          throws IOException, InterruptedException, 
                                 ClassNotFoundException {
                     return new Cluster(getConfiguration());
                   }
                 });
    }
  }

// 初始化个两个client
  public Cluster(InetSocketAddress jobTrackAddr, Configuration conf) 
      throws IOException {
    this.cOnf= conf;
    this.ugi = UserGroupInformation.getCurrentUser();
    initialize(jobTrackAddr, conf);
  }

//返回 client,这里判断了 是执行的是本地模式,还是RPC模式
private void initialize(InetSocketAddress jobTrackAddr, Configuration conf)
      throws IOException {

          if (jobTrackAddr == null) {
            clientProtocol = provider.create(conf);
          } else {
            clientProtocol = provider.create(jobTrackAddr, conf);
          }

          if (clientProtocol != null) {
            clientProtocolProvider = provider;
            client = clientProtocol;
            LOG.debug("Picked " + provider.getClass().getName()
                + " as the ClientProtocolProvider");
            break;
          }
         
      }
    }

 

 

job提交 jobClient.submitJobInternal(conf)

 

1.获得job运行时临时文件的地址,在hdfs上构造,之后会将一些运行时的信息写在这个文件中,

默认值是:/tmp/hadoop/mapred/staging  一般在使用的是配置中的:mapreduce.jobtracker.staging.root.dir

原码如下:

 

LocalJobRunner implements ClientProtocol 

 

RunningJob submitJobInternal(final JobConf job
                               ) 

        Path jobStagingArea = JobSubmissionFiles.getStagingDir(JobClient.this,
            jobCopy);



  /**
   * @see org.apache.hadoop.mapred.JobSubmissionProtocol#getStagingAreaDir()
   */
  public String getStagingAreaDir() throws IOException {
    Path stagingRootDir = 
      new Path(conf.get("mapreduce.jobtracker.staging.root.dir",
        "/tmp/hadoop/mapred/staging"));
    UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
    String user;
    randid = rand.nextInt(Integer.MAX_VALUE);
    if (ugi != null) {
      user = ugi.getShortUserName() + randid;
    } else {
      user = "dummy" + randid;
    }
    return fs.makeQualified(new Path(stagingRootDir, user+"/.staging")).toString();
  }

 

 

2.获得一个新的jobID,

本地文件+随机数+jobid:

 

 

  public synchronized JobID getNewJobId() {
    return new JobID("local" + randid, ++jobid);
  }

 

 

 

 3.构造  submitJobDir  使用的    1中返回的目录拼接jobid,并将这个值设置给当前job运行目录地址:

 

 

Path submitJobDir = new Path(jobStagingArea, jobId.toString());
        jobCopy.set("mapreduce.job.dir", submitJobDir.toString());
        JobStatus status = null;

 

 

4.添加认证和**信息

     a.从配置文件中读取token信息,如果没有之添加**信息即可

        在这里 将token和secret信息初始化到jobconf中了

 

populateTokenCache(jobCopy, jobCopy.getCredentials());

 5.拷贝client文件到hdfs

 

将运行作业所需要的资源(包括作业JAR文件、配置文件和计算所得的输入文件)复制到一个以作业ID命名的目录下jobtracker的文件系统。

 包含: -libjars, -files, -archives 三种类型的文件

这里有一个副本数量 默认是10 ,可以配置,

 

 

copyAndConfigureFiles(jobCopy, submitJobDir);

* configure the jobconf of the user with the command line options of 
   * -libjars, -files, -archives

private void copyAndConfigureFiles(JobConf job, Path jobSubmitDir) 
  throws IOException, InterruptedException {
    short replication = (short)job.getInt("mapred.submit.replication", 10);
    copyAndConfigureFiles(job, jobSubmitDir, replication);

    // Set the working directory
    if (job.getWorkingDirectory() == null) {
      job.setWorkingDirectory(fs.getWorkingDirectory());          
    }
  }

 

 

 

6.通过namenode获得token

 

TokenCache.obtainTokensForNamenodes(jobCopy.getCredentials(),
                                              new Path [] {submitJobDir},
                                              jobCopy);

 

 

7.初始化job执行时需要的文件路径信息 ,并将这些信息存放在 conf中

 

Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
          int reduces = jobCopy.getNumReduceTasks();
          InetAddress ip = InetAddress.getLocalHost();
          if (ip != null) {
            job.setJobSubmitHostAddress(ip.getHostAddress());
            job.setJobSubmitHostName(ip.getHostName());
          }
          JobContext cOntext= new JobContext(jobCopy, jobId);

 

8.检查输出文件信息,在这里我们会看到,如果输出目录不做设置或者输出目录已经存在的话就会报错了,系统就会退出

 

// Check the output specification
          if (reduces == 0 ? jobCopy.getUseNewMapper() : 
            jobCopy.getUseNewReducer()) {
            org.apache.hadoop.mapreduce.OutputFormat output =
              ReflectionUtils.newInstance(context.getOutputFormatClass(),
                  jobCopy);
            output.checkOutputSpecs(context);
          } else {
            jobCopy.getOutputFormat().checkOutputSpecs(fs, jobCopy);
          }



  public void checkOutputSpecs(JobContext job
                               ) throws FileAlreadyExistsException, IOException{
    // Ensure that the output directory is set and not already there
    Path outDir = getOutputPath(job);
    if (outDir == null) {
      throw new InvalidJobConfException("Output directory not set.");
    }
    
    // get delegation token for outDir's file system
    TokenCache.obtainTokensForNamenodes(job.getCredentials(), 
                                        new Path[] {outDir}, 
                                        job.getConfiguration());

    if (outDir.getFileSystem(job.getConfiguration()).exists(outDir)) {
      throw new FileAlreadyExistsException("Output directory " + outDir + 
                                           " already exists");
    }
  }

 

9.开始对输入文件做分片处理:

   这里需要说明一下 其中writeNewSplits  主哦功能 调用了List splits = input.getSplits(job); 这里就是我们在看的哦啊wordcount中 FileInputFormat中getSplits(conf)被调用的地方,可以看到map的数量就是有分片的数量决定的,具体分片操作参考:

http://younglibin.iteye.com/blog/1929255

http://younglibin.iteye.com/blog/1929278

 

 // Create the splits for the job
          FileSystem fs = submitJobDir.getFileSystem(jobCopy);
          LOG.debug("Creating splits at " + fs.makeQualified(submitJobDir));
          int maps = writeSplits(context, submitJobDir);
          jobCopy.setNumMapTasks(maps);

  private int writeSplits(org.apache.hadoop.mapreduce.JobContext job,
      Path jobSubmitDir) throws IOException,
      InterruptedException, ClassNotFoundException {
    JobConf jCOnf= (JobConf)job.getConfiguration();
    int maps;
    if (jConf.getUseNewMapper()) {
      maps = writeNewSplits(job, jobSubmitDir);
    } else {
      maps = writeOldSplits(jConf, jobSubmitDir);
    }
    return maps;
  }

  private 
  int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,
      InterruptedException, ClassNotFoundException {
    Configuration cOnf= job.getConfiguration();
    InputFormat input =
      ReflectionUtils.newInstance(job.getInputFormatClass(), conf);

    List splits = input.getSplits(job);
    T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);

    // sort the splits into order based on size, so that the biggest
    // go first
    Arrays.sort(array, new SplitComparator());
    JobSplitWriter.createSplitFiles(jobSubmitDir, conf,
        jobSubmitDir.getFileSystem(conf), array);
    return array.length;
  }

 

 

10.将将要执行的任务队列提交 到管理队列中 

 

// write "queue admins of the queue to which job is being submitted"
          // to job file.
          String queue = jobCopy.getQueueName();
          AccessControlList acl = jobSubmitClient.getQueueAdmins(queue);
          jobCopy.set(QueueManager.toFullPropertyName(queue,
              QueueACL.ADMINISTER_JOBS.getAclName()), acl.getACLString());

 

 

11.将这些文件的信息提交给job,在job执行的根据这写配置来获取文件内容

 

// Write job file to JobTracker's fs        
          FSDataOutputStream out = 
            FileSystem.create(fs, submitJobFile,
                new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION));

 

12. 将这写配置信息 输出到 文件中,我们可以在job运行的临时目录下看到有个job.xml文件 这个文件中存放了关于这个job的所有配置信息,也可以通过50030端口,查看到这个文件;

 

jobCopy.writeXml(out);

 

job的初始化完成了,接下来就是job的执行了

 

13.终于开始提交job任务了

 

 

status = jobSubmitClient.submitJob(
              jobId, submitJobDir.toString(), jobCopy.getCredentials());

  /**
   * @see org.apache.hadoop.mapred.JobSubmissionProtocol#getStagingAreaDir()
   */
public JobStatus submitJob(JobID jobid, String jobSubmitDir, 
                             Credentials credentials) 
  throws IOException {
    Job job = new Job(jobid, jobSubmitDir);
    job.job.setCredentials(credentials);
    return job.status;
  }

//

 

以上实现使用的是 一个local方式的,Job是 LocalJobRunner 的一个自己的类,
这个类 继承了一个Thread ,是多线程:

 

 

 private class Job extends Thread implements TaskUmbilicalProtocol {



public Job(JobID jobid, String jobSubmitDir) throws IOException {
     
      
      profile = new JobProfile(job.getUser(), id, systemJobFile.toString(), 
                               "http://localhost:8080/", job.getJobName());
      status = new JobStatus(id, 0.0f, 0.0f, JobStatus.RUNNING, 
          profile.getUser(), profile.getJobName(), profile.getJobFile(), 
          profile.getURL().toString());
      jobs.put(id, this);
      this.start();
    }
 @Override
 public void run() {
      ...............
        List taskRunnables = getMapTaskRunnables(taskSplitMetaInfos,
            jobId, mapOutputFiles);
        ExecutorService mapService = createMapExecutor(taskRunnables.size());

        // Start populating the executor with work units.
        // They may begin running immediately (in other threads).
        for (Runnable r : taskRunnables) {
          mapService.submit(r);
        }
.........................
              reduce.setJobFile(localJobFile.toString());
              localConf.setUser(reduce.getUser());
              reduce.localizeConfiguration(localConf);
              reduce.setConf(localConf);
              reduce_tasks += 1;
              myMetrics.launchReduce(reduce.getTaskID());
              reduce.run(localConf, this);
              myMetrics.completeReduce(reduce.getTaskID());
              reduce_tasks -= 1;
          
          }
        
    }
 

 

job中调用 Map 和reduce 

 

map:

 我们看到在job线程中执行了 mapService.submit(r); 中的r 是 MapTaskRunnable 对象,所以这里真正提交了 map人物执行

protected class MapTaskRunnable implements Runnable {
      public void run() {
            map_tasks.getAndIncrement();
            myMetrics.launchMap(mapId);
            map.run(localConf, Job.this);
            myMetrics.completeMap(mapId);
          
    }

 

 

 

 

我们看到上边方法调用了 MapTask类 的 run

 

@Override
  public void run(final JobConf job, final TaskUmbilicalProtocol umbilical)
    throws IOException, ClassNotFoundException, InterruptedException {
    ..............................................
    if (useNewApi) {
      runNewMapper(job, splitMetaInfo, umbilical, reporter);
    } else {
      runOldMapper(job, splitMetaInfo, umbilical, reporter);
    }
    done(umbilical, reporter);
  }


@SuppressWarnings("unchecked")
  private 
  void runNewMapper(final JobConf job,
                    final TaskSplitIndex splitIndex,
                    final TaskUmbilicalProtocol umbilical,
                    TaskReporter reporter
                    ) throws IOException, ClassNotFoundException,
                             InterruptedException {
    // make a task context so we can get the classes
    // make a mapper
    // make the input format
    // rebuild the input split
    // get an output object
    input.initialize(split, mapperContext);
    mapper.run(mapperContext);
    mapPhase.complete();
    setPhase(TaskStatus.Phase.SORT);
    statusUpdate(umbilical);
    input.close();
    output.close(mapperContext);
  }

input.initialize(split, mapperContext);

调用的是: LineRecordReader
我们知道 FileInputForamt  的 子类 默认使用了 TextInputFormat  在  TextInputFormat 中我们构造了  return new LineRecordReader(recordDelimiterBytes);

所有我们在读取数据的时候我们使用的是: LineRecordReader

 

 

以上代码 有一段是  mapper.run(mapperContext); 在这里我们终于知道 谁调用了 这个run方法了吧,到这里,一个本地运行的maoreduce就可以串起来了

public void run(Context context) throws IOException, InterruptedException {
    setup(context);
    while (context.nextKeyValue()) {
      map(context.getCurrentKey(), context.getCurrentValue(), context);
    }
    cleanup(context);
  }

 

 reduce:

reduce.run(localConf, this);

 

public void run(JobConf job, final TaskUmbilicalProtocol umbilical)
    throws IOException, InterruptedException, ClassNotFoundException {
  
    if (useNewApi) {
      runNewReducer(job, umbilical, reporter, rIter, comparator, 
                    keyClass, valueClass);
    } else {
      runOldReducer(job, umbilical, reporter, rIter, comparator, 
                    keyClass, valueClass);
    }
    done(umbilical, reporter);
  }

private 
  void runNewReducer(JobConf job,
                     final TaskUmbilicalProtocol umbilical,
                     final TaskReporter reporter,
                     RawKeyValueIterator rIter,
                     RawComparator comparator,
                     Class keyClass,
                     Class valueClass
                     ) throws IOException,InterruptedException, 
                              ClassNotFoundException {
    reducer.run(reducerContext);
    trackedRW.close(reducerContext);
  }

 

 

在上比那 我们也看到了 reduce  调用 reducer.run  的地方, 终于把一个流程串起来了 。

 

 

 


推荐阅读
  • importorg.apache.hadoop.hdfs.DistributedFileSystem;导入方法依赖的package包类privatevoidtestHSyncOpe ... [详细]
  • 大数据Hadoop生态(20)MapReduce框架原理OutputFormat的开发笔记
    本文介绍了大数据Hadoop生态(20)MapReduce框架原理OutputFormat的开发笔记,包括outputFormat接口实现类、自定义outputFormat步骤和案例。案例中将包含nty的日志输出到nty.log文件,其他日志输出到other.log文件。同时提供了一些相关网址供参考。 ... [详细]
  • mapreduce源码分析总结
    这篇文章总结的非常到位,故而转之一MapReduce概述MapReduce是一个用于大规模数据处理的分布式计算模型,它最初是由Google工程师设计并实现的ÿ ... [详细]
  • 对于开源的东东,尤其是刚出来不久,我认为最好的学习方式就是能够看源代码和doc,測试它的样例为了方便查看源代码,关联导入源代 ... [详细]
  • 一、Hadoop来历Hadoop的思想来源于Google在做搜索引擎的时候出现一个很大的问题就是这么多网页我如何才能以最快的速度来搜索到,由于这个问题Google发明 ... [详细]
  • PHP图片截取方法及应用实例
    本文介绍了使用PHP动态切割JPEG图片的方法,并提供了应用实例,包括截取视频图、提取文章内容中的图片地址、裁切图片等问题。详细介绍了相关的PHP函数和参数的使用,以及图片切割的具体步骤。同时,还提供了一些注意事项和优化建议。通过本文的学习,读者可以掌握PHP图片截取的技巧,实现自己的需求。 ... [详细]
  • 本文讨论了clone的fork与pthread_create创建线程的不同之处。进程是一个指令执行流及其执行环境,其执行环境是一个系统资源的集合。在调用系统调用fork创建一个进程时,子进程只是完全复制父进程的资源,这样得到的子进程独立于父进程,具有良好的并发性。但是二者之间的通讯需要通过专门的通讯机制,另外通过fork创建子进程系统开销很大。因此,在某些情况下,使用clone或pthread_create创建线程可能更加高效。 ... [详细]
  • 本文介绍了如何使用C#制作Java+Mysql+Tomcat环境安装程序,实现一键式安装。通过将JDK、Mysql、Tomcat三者制作成一个安装包,解决了客户在安装软件时的复杂配置和繁琐问题,便于管理软件版本和系统集成。具体步骤包括配置JDK环境变量和安装Mysql服务,其中使用了MySQL Server 5.5社区版和my.ini文件。安装方法为通过命令行将目录转到mysql的bin目录下,执行mysqld --install MySQL5命令。 ... [详细]
  • 本文介绍了在Linux下安装和配置Kafka的方法,包括安装JDK、下载和解压Kafka、配置Kafka的参数,以及配置Kafka的日志目录、服务器IP和日志存放路径等。同时还提供了单机配置部署的方法和zookeeper地址和端口的配置。通过实操成功的案例,帮助读者快速完成Kafka的安装和配置。 ... [详细]
  • Hadoop2.6.0 + 云centos +伪分布式只谈部署
    3.0.3玩不好,现将2.6.0tar.gz上传到usr,chmod-Rhadoop:hadophadoop-2.6.0,rm掉3.0.32.在etcp ... [详细]
  • 精讲代理设计模式
    代理设计模式为其他对象提供一种代理以控制对这个对象的访问。代理模式实现原理代理模式主要包含三个角色,即抽象主题角色(Subject)、委托类角色(被代理角色ÿ ... [详细]
  • Hadoop源码解析1Hadoop工程包架构解析
    1 Hadoop中各工程包依赖简述   Google的核心竞争技术是它的计算平台。Google的大牛们用了下面5篇文章,介绍了它们的计算设施。   GoogleCluster:ht ... [详细]
  • java布尔字段用is前缀_POJO类中布尔类型的变量都不要加is前缀详解
    前言对应阿里巴巴开发手册第一章的命名风格的第八条。【强制】POJO类中布尔类型的变量都不要加is前缀,否则部分框架解析会引起序列化错误。反例:定义为基本 ... [详细]
  • Hadoop 源码学习笔记(4)Hdfs 数据读写流程分析
    Hdfs的数据模型在对读写流程进行分析之前,我们需要先对Hdfs的数据模型有一个简单的认知。数据模型如上图所示,在NameNode中有一个唯一的FSDirectory类负责维护文件 ... [详细]
  • Flink使用java实现读取csv文件简单实例首先我们来看官方文档中给出的几种方法:首先我们来看官方文档中给出的几种方法:第一种:Da ... [详细]
author-avatar
曾静ZHH_423
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有