热门标签 | HotTags
当前位置:  开发笔记 > 运维 > 正文

MR总结(三)-MapReduce组件自定义

自定义InputFormatInputFormat主要包括:?????InputSplit和RecordReader??InputSplit用于定义Map的数目和确定最合适的执行节点(位置)??RecordReader负责从输入文件里读取数据记录,并把数据提交给Mapper处理。?一个自定义分片实现要继承与抽

自定义InputFormat InputFormat主要包括: ? ? ? ? ? InputSplit和RecordReader ? ? InputSplit用于定义Map的数目和确定最合适的执行节点(位置) ? ?RecordReader负责从输入文件里读取数据记录,并把数据提交给Mapper处理。 ? 一个自定义分片实现要继承与抽

自定义InputFormat

InputFormat主要包括:

? ? ? ? ? InputSplit和RecordReader

? ?InputSplit用于定义Map的数目和确定最合适的执行节点(位置)

? ?RecordReader负责从输入文件里读取数据记录,并把数据提交给Mapper处理。

? 一个自定义分片实现要继承与抽象类InputSplit,通过定义输入的长度和位置。分片的位置暗示调度器如何是放置一个分片的执行器(即,选择一个合适的TaskTracker)

? JobTracker处理分片的算法大致是:

  1. 通过TaskTracker节点的心跳获取可用的map槽资源
  2. 从排队等候的分片中找出那些可用的节点是本地的
  3. 向TaskTracker提交分片

? 基于存储机制和执行策略,分片的大小和位置是有不同的意思。例如在HDFS上,一个分片和一个物理数据块是一致的,分片的位置是这个数据块的物理存放位置的一个集合。

? 下面是FileInputFormat工作的机制

  1. 继承InputSplit,计算文件的信息。如文件中数据块的开始位置和块的长度
  2. 获取文件的数据块的一个集合
  3. 创建一个数据分片,这个分片的长度和块大小一样,分片位置为这个快的位置。此外还含有文件位置,块位移,长度等信息。

下面是FileInputFormat创建分片的代码:


/**
   * Generate the list of files and make them into FileSplits.
   * @param job the job context
   * @throws IOException
   */
  public List getSplits(JobContext job) throws IOException {
    Stopwatch sw = new Stopwatch().start();
    long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
    long maxSize = getMaxSplitSize(job);

    // generate splits
    List splits = new ArrayList();
    List files = listStatus(job);
    for (FileStatus file: files) {
      Path path = file.getPath();
      long length = file.getLen();
      if (length != 0) {
        BlockLocation[] blkLocations;
        if (file instanceof LocatedFileStatus) {
          blkLocatiOns= ((LocatedFileStatus) file).getBlockLocations();
        } else {
          FileSystem fs = path.getFileSystem(job.getConfiguration());
          blkLocatiOns= fs.getFileBlockLocations(file, 0, length);
        }

        if (isSplitable(job, path)) {
          long blockSize = file.getBlockSize();
          long splitSize = computeSplitSize(blockSize, minSize, maxSize);

          long bytesRemaining = length;
          while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
            int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
            splits.add(makeSplit(path, length-bytesRemaining, splitSize,
                        blkLocations[blkIndex].getHosts(),
                        blkLocations[blkIndex].getCachedHosts()));
            bytesRemaining -= splitSize;
          }

          if (bytesRemaining != 0) {
            int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
            splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
                       blkLocations[blkIndex].getHosts(),
                       blkLocations[blkIndex].getCachedHosts()));
          }
        } else { // not splitable
          splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
                      blkLocations[0].getCachedHosts()));
        }
      } else {
        //Create empty hosts array for zero length files
        splits.add(makeSplit(path, 0, length, new String[0]));
      }
    }
    // Save the number of input files for metrics/loadgen
    job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
    sw.stop();
    if (LOG.isDebugEnabled()) {
      LOG.debug("Total # of splits generated by getSplits: " + splits.size()
          + ", TimeTaken: " + sw.elapsedMillis());
    }
    return splits;
  }

方法创建分片过程主要做了下面几件事:1、首先从Job对象中获取输入文件的状态信息FileStatus。2.然后针对每个文件获取块信息。3.根据文件是否可分割按照分片大小进行切割,如果不能则不分割。

?案例:实现计算密集型InputFormat

有一种比较常见的MapReduce程序:计算密集型程序。

计算密集型MR程序即指:对于与每一个输入的键值对需要复杂的计算算法去处理,主要特征是每一个map处理函数需要比获取该处理数据更长的时间,至少一个量级。比如人脸识别程序。

如果使用默认的FileInputFormat去处理该类型应用的话,很大情况下会出现部门机器cpu负载过高,而其他的则比较闲。(可以通过ganglia监控分析)

默认情况下由于FileInputFormat的实现会根据数据的本地性来创建分片数据。然而对于计算密集型的程序数据的本地性可能不适合了。那我们应该如何做出改变呢?我们获取所有可用服务器各自计算能力的情况,根据服务器来分配创建分片

因此我们需要重载分片函数。

下面我们重载SequenceFileInputFormat来演示如何实现上述要求:

ComputeIntensiveSequenceFileInputFormat继承SequenceFileInputFormat函数,重载gitSplits函数:

//重写分片函数
 @Override
 public List getSplits(JobContext job) throws IOException {
  String[] servers = getActiveServersList(job);
  if (servers == null)
   return null;
  List splits = new ArrayList();
  List files = listStatus(job);
  int currentServer = 0;
  for (FileStatus file : files) {
   Path path = file.getPath();
   long length = file.getLen();
   if ((length != 0) && isSplitable(job, path)) {
    long blockSize = file.getBlockSize();
    long splitSize = computeSplitSize(blockSize, minSize, maxSize);
    long bytesRemaining = length;
    while (((double) bytesRemaining) / splitSize > SPLIT_SLOP) {
     splits.add(new FileSplit(path, length - bytesRemaining,
       splitSize, new String[] { servers[currentServer] }));
     currentServer = getNextServer(currentServer, servers.length);
     bytesRemaining -= splitSize;
    }
   } else if (length != 0) {
    splits.add(new FileSplit(path, 0, length,
      new String[] { servers[currentServer] }));
    currentServer = getNextServer(currentServer, servers.length);
   }
  }
  // Save the number of input files in the job-conf
  job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
  return splits;
 }
 //获取服务器列表
 private String[] getActiveServersList(JobContext context) {
  String[] servers = null;
  try {
   JobClient jc = new JobClient((JobConf) context.getConfiguration());
   ClusterStatus status = jc.getClusterStatus(true);
   Collection atc = status.getActiveTrackerNames();
   servers = new String[atc.size()];
   int s = 0;
   for (String serverInfo : atc) {
    StringTokenizer st = new StringTokenizer(serverInfo, ":");
    String trackerName = st.nextToken();
    StringTokenizer st1 = new StringTokenizer(trackerName, "_");
    st1.nextToken();
    servers[s++] = st1.nextToken();
   }
  } catch (IOException e) {
   e.printStackTrace();
  }
  return servers;
 }
 //选择一个服务器
 private static int getNextServer(int current, int max) {
  current++;
  if (current >= max)
   current = 0;
  return current;
 }

这个类继承于SequenceFileInputFormat,重写了getSplits()函数。计算分片的和FileInputFormat一样。只是原来数据的物理本地性由可用的服务器资源代替。两个主要函数:
getActiveServersList() 查询集群状态,计算一个可用的服务器名字列表
getNextServer() 获取一个服务器

优化:
上面方案是否就已经很完美,没有问题了呢?答案是否定的。
在上面我们替换了数据物理本地性这个特性,那么会导致更多的数据传输的问题,会给网络io带来压力,影响io性能。

因此我们想到可以把两个策略综合起来。首先放置尽可能多的任务为本地,分发剩下的到其他节点。
下面是实现这个方案的程序ComputeIntensiveLocalizedSequenceFileInputFormat:

@Override
 public List getSplits(JobContext job) throws IOException {
  List originalSplits = super.getSplits(job);
  String[] servers = getActiveServersList(job);
  if (servers == null)
   return null;
  List splits = new ArrayList(
    originalSplits.size());
  int numSplits = originalSplits.size();
  int currentServer = 0;
  for (int i = 0; i 

这里第一步利用父类(FileInputFormat))获取分片来确保数据本地性。对于每一个服务器,首先试着去指定本地的split给它。其他没有本地分片的则随机分配剩下的分片。

总结:

MapReduce的输入格式的重写主要要主要两个组件:InputFormat和Recordreader。本文主要讲述InputFormat的原理及怎么来重写InputFormat,根据业务的特点选择创建分片的策略。

推荐阅读
  • 深入浅出:Hadoop架构详解
    Hadoop作为大数据处理的核心技术,包含了一系列组件如HDFS(分布式文件系统)、YARN(资源管理框架)和MapReduce(并行计算模型)。本文将通过实例解析Hadoop的工作原理及其优势。 ... [详细]
  • 深入解析:主流开源分布式文件系统综述
    本文详细探讨了几款主流的开源分布式文件系统,包括HDFS、MooseFS、Lustre、GlusterFS和CephFS,重点分析了它们的元数据管理和数据一致性机制,旨在为读者提供深入的技术见解。 ... [详细]
  • 本文详细介绍了Java编程语言中的核心概念和常见面试问题,包括集合类、数据结构、线程处理、Java虚拟机(JVM)、HTTP协议以及Git操作等方面的内容。通过深入分析每个主题,帮助读者更好地理解Java的关键特性和最佳实践。 ... [详细]
  • 本文探讨了Hive作业中Map任务数量的确定方式,主要涉及HiveInputFormat和CombineHiveInputFormat两种InputFormat的分片计算逻辑。通过调整相关参数,可以有效控制Map任务的数量,进而优化Hive作业的性能。 ... [详细]
  • 深入解析BookKeeper的设计与应用场景
    本文介绍了由Yahoo在2009年开发并于2011年开源的BookKeeper技术。BookKeeper是一种高效且可靠的日志流存储解决方案,广泛应用于需要高性能和强数据持久性的场景。 ... [详细]
  • databasesync适配openGauss使用指导书
    一、database-sync简介database-sync作为一种开源辅助工具,用于数据库之间的表同步,更确切的说法是复制,可以从一个数据库复制表到另一个数据库该工具支持的功能如 ... [详细]
  • 《计算机视觉:算法与应用》第二版初稿上线,全面更新迎接未来
    经典计算机视觉教材《计算机视觉:算法与应用》迎来了其第二版,现已开放初稿下载。本书由Facebook研究科学家Richard Szeliski撰写,自2010年首版以来,一直是该领域的标准参考书。 ... [详细]
  • 初探Hadoop:第一章概览
    本文深入探讨了《Hadoop》第一章的内容,重点介绍了Hadoop的基本概念及其如何解决大数据处理中的关键挑战。 ... [详细]
  • 深入解析Spark核心架构与部署策略
    本文详细探讨了Spark的核心架构,包括其运行机制、任务调度和内存管理等方面,以及四种主要的部署模式:Standalone、Apache Mesos、Hadoop YARN和Kubernetes。通过本文,读者可以深入了解Spark的工作原理及其在不同环境下的部署方式。 ... [详细]
  • 全面解读Apache Flink的核心架构与优势
    Apache Flink作为大数据处理领域的新兴力量,凭借其独特的流处理能力和高效的批处理性能,迅速获得了广泛的关注。本文旨在深入探讨Flink的关键技术特点及其应用场景,为大数据处理提供新的视角。 ... [详细]
  • 深入理解云计算与大数据技术
    本文详细探讨了云计算与大数据技术的关键知识点,包括大数据处理平台、社会网络大数据、城市大数据、工业大数据、教育大数据、数据开放与共享的应用,以及搜索引擎与Web挖掘、推荐技术的研究及应用。文章还涵盖了云计算的基础概念、特点和服务类型分类。 ... [详细]
  • 深入解析:存储技术的演变与发展
    本文探讨了从单机文件系统到分布式文件系统的存储技术发展过程,详细解释了各种存储模型及其特点。 ... [详细]
  • 本文介绍如何通过整合SparkSQL与Hive来构建高效的用户画像环境,提高数据处理速度和查询效率。 ... [详细]
  • 本文详细记录了 MIT 6.824 课程中 MapReduce 实验的开发过程,包括环境搭建、实验步骤和具体实现方法。 ... [详细]
  • 本文介绍了Hadoop的核心组件,包括高可靠性和高吞吐量的分布式文件系统HDFS、分布式的离线并行计算框架MapReduce、作业调度与集群资源管理框架YARN以及支持其他模块的工具模块Common。 ... [详细]
author-avatar
默一
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有