热门标签 | HotTags
当前位置:  开发笔记 > 运维 > 正文

MR总结(一)-Mapreduce原理解析

本文主要内容:★理解MapReduce基本原理★了解MapReduce应用的执行★理解MapReduce应用的设计一、了解MapReduceMapReduce是一个框架,能够利用许多普通计算机对大规模的数据集进行高并发的、分布式的算法处理。用户的任务是实现mapper与reducer,这两

本文主要内容: ★理解MapReduce基本原理 ★了解MapReduce应用的执行 ★理解MapReduce应用的设计 一、了解MapReduce MapReduce是一个框架,能够利用许多普通计算机对大规模的数据集进行高并发的、分布式的算法处理。 用户的任务是实现mapper与reducer,这两

本文主要内容:

★理解MapReduce基本原理

★了解MapReduce应用的执行

★理解MapReduce应用的设计

一、了解MapReduce

MapReduce是一个框架,能够利用许多普通计算机对大规模的数据集进行高并发的、分布式的算法处理。

用户的任务是实现mapper与reducer,这两个类会继承Hadoop提供的基础类来解决特殊的问题。就像图3-1中所示的,Mapper将key/value(k1,v1)键值对形式的数据作为输入,然后将他们转变为另一种key/value键值对形式。MapReduce框架对所有mapper输出的key/value进行排序,并将key值相同的所有value值合并(k2,{v2,v2,…})。这些kye/value结合后的会被传递到reducer模块,reducer会将他们转化为另一种key/value对(k3,v3)。

maper和reducer

maper和reducer

核心组件:

Mapper与Reducer和MapReduce框架。

Mapper功能:

一个mapper与reducer一起组成一个Hadoop作业。Mapper是作业的强制性部分,可以产生0个或者更多个key/value(k2,v2)键值对。

Reducer功能:

Reducer是作业的可选择性部分,可以没有产出或者产出更多的key/value对(k3,v3)。

MapReduce功能:

调度、同步、容错

MapReduce框架的主要任务(根据用户提供的代码)是统筹所有任务的协调执行。

包括

1.选择合适的机器(节点)运行mapper、启动与监控mapper的执行

2.为reducer的执行选择合适的节点,对mapper的输出进行排序与拉去并且传送给reducer节点、?启动与监控reducer的执行。

3.Mapreduce框架负责调度和监控任务,再次执行失败的任务。

现在我们对MapReduce有了一些了解,下面让我们进一步看看MapReduce作业是如何执行的。

二、Mapreduce作业执行

高层次的hadoop执行框架

高层次的hadoop执行框架

下面介绍MapReduce执行管道线的主要组件。

★? Driver:这是主程序,用来初始化MapReduce job。它定义了job的个性化配置,并且标注了所有的组件(包括输入输出格式,mapper与reducer,使用结合器,使用定制的分片器等等)。Driver也可以获得job执行的状态。

★? Context:driver、mapper与reducer在不同的阶段被执行,一般情况下是在多台节点上执行。context对象在MapReduce执行的任何阶段都可以被使用。它为交换需要的系统与job内部信息提供一种方便的机制。要注意context协调只发生在MapReduce job 开始后合适的阶段(driver,map或者reduce)。这意味着在一个mapper中设置的值不可以在另一个mapper中使用(即使另一个mapper在第一个mapper完成后开始),但是在任何reducer中都是有效的。

★? Input Data:为MapReduce任务准备的最初存储数据。这些数据可以在HDFS,HBase,或者其他的仓库中。一般情况下,input data 是非常大的,几十个G或者更多。

InputFormat:如何对输入数据进行读取和切分

★ ?InputFormat类确定input data中数据输入哪个任务的InputSplit,并且提供一个生成RecordReader的工厂方法,这个对象主要是读取inputSplit指定的文件。Hadoop提供了一些InputFormat类。InputFormat直接被job的driver调用来决定map任务执行的数目与地点(根据InputSplit)。

★? InputSplit:InputSplit确定一个在MapReduce中map任务的作业单元。处理一个数据集的MapReduce程序由几个(也可能是几百个)map任务组成。InputFormat(直接被job driver调用)确定在map阶段中map任务的数目。每个map任务操作一个单独的InputSplit。完成InputSplits的计算后,MapReduce框架会再合适的节点启动期望数目的map任务。

★? RecorReader:InputSplit确定map任务的工作机,但没有描述如何获得该数据。RecordReader类是真正从数据源读取数据的类(在map 任务中),并将数据转化为设和map执行的key/value对,并将他们传递给map方法。RecordReader由InputFormat定义。

★? Mapper:mapper负责在MapReduce程序中第一个阶段用户自定义作业的执行。从实现的角度看,mapper实现负责将输入数据转化成一些列的key/value对(k1,v1),这些键值对将被用于单个map的执行。一般情况下mapp会将输入的键值对转化为另一种输出键值对(k2,v2),这些输出键值对将会作为reduce阶段shuffle与sort阶段的输入。一个新的mapper实例在每个map任务的单独的JVM实体中被实例化, 这些map任务构成所有作业输出的一部分。独立的mapper是不会提供任何与其他mapper通信的机制。这一点保证每个map任务的可靠性仅仅由本地节点的可靠性决定。

★? Partition:由所有独立的mapper产生的中间数据(k2,v2)的子集会被分配到一个reducer上执行。这些子集(或者partitions)会作为reduce任务的输入。具备相同键的数值会被一个reduce处理,而不会考虑他们有哪个mapper产生。这样的结果是,所有的map节点必须判断产生的中间数据将有哪个reducer执行。Partitioner类决定特定的key/value对将由哪个reducer执行。默认的Partitioner会为每个key计算一个哈希值,并根据这个值作为分配的依据。

★? Shuffle:在Hadoop集群中,每个节点可能会执行某个job的几个map任务。一旦至少有一个map函数执行完成,产生的中间输出就会根据key值进行分片,并将由map产生的的分片分发至需要它们的reducer。将map的输出传递到reducer的过程叫做shuffling。

★? Sort:每个reduce任务负责处理与部分key值相对应的value。中间key/value数据集,在被传递给reducer前会由Hadoop框架自动排序,组装成(k2,{v2,v2,…})的形式。

★? Reducer:reducer负责执行由用户提供的用于完成某个作业第二阶段任务的代码。对于分配到某个reducer中的每个key,reducer的reduce()方法都会被调用一次。这个方法接收一个key值,由迭代器遍历与它绑定在一起的所有value值,并无序的返回与这个key值相关的value值。一般情况下,reducer将输入的key/value转化成输出键值对(k3,v3)。

★? OutputFormat:job的输出(job的输出可以由reducer产生,若没有reducer也可由map产生)记录的方式有OutputFormat控制。OutputFormat负责确定输出数据的地址,由RecordWriter负责将数据结果写入。

★? RecordWriter:RecordWriter定义每条output记录如何写入。

下面将介绍MapReduce执行时两个可选的组件

★? Combiner:这是一个可以优化MapReduce job执行的可选执行步骤。如何选择后,combiner运行在mapper执行后,reduce执行前。Combiner的实例会运行在每个map任务中与部分reduce任务中。Combiner接收由mapper实例输出的所有数据作为输入,并且尝试将具有相同key值的value整合,以此来减少key值的存储空间和减少必须存储的(实际上不必须)key值的数目。Combiner的输出会被排序并发送给reducer。

★? Distribute cache:另一个常用与MapReduce job中的工具是distribute cache。这个组件可以使得集群中所有节点共享数据。Distribute cache可以是能够被所有任务都能可获得的共享库,包含key/value对的全局查找文件,jar文件(或者是archives)包含可执行代码等等。该工具会将这些文件拷贝至实际执行任务的节点,并使它们可以在本地使用。

三、MapReduce编程模型

MapReduce编程模型主要包括Mapper和Reducer两个内部类和主方法。

下面直接看代码:

package com.sven.mrlearn;

import java.io.IOException;

import java.util.Iterator;

import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.conf.Configured;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import org.apache.hadoop.util.Tool;

import org.apache.hadoop.util.ToolRunner;

public class WordCount extends Configured implements Tool {

public static class Map extends

Mapper {

private final static IntWritable One= new IntWritable(1);

private Text word = new Text();

@Override

public void map(LongWritable key, Text value, Context context)

throws IOException, InterruptedException {

String line = value.toString();

StringTokenizer tokenizer = new StringTokenizer(line);

while (tokenizer.hasMoreTokens()) {

word.set(tokenizer.nextToken());

context.write(word, one);

}

}

}

public static class Reduce extends

Reducer {

@Override

public void reduce(Text key, Iterable val, Context context)

throws IOException, InterruptedException {

int sum = 0;

Iterator values = val.iterator();

while (values.hasNext()) {

sum += values.next().get();

}

context.write(key, new IntWritable(sum));

}

}

public int run(String[] args) throws Exception {

Configuration cOnf= new Configuration();

Job job = new Job(conf, “Word Count”);

job.setJarByClass(WordCount.class);

// Set up the input

job.setInputFormatClass(TextInputFormat.class);

TextInputFormat.addInputPath(job, new Path(args[0]));

// Mapper

job.setMapperClass(Map.class);

// Reducer

job.setReducerClass(Reduce.class);

// Output

job.setOutputFormatClass(TextOutputFormat.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(IntWritable.class);

TextOutputFormat.setOutputPath(job, new Path(args[1]));

// Execute

boolean res = job.waitForCompletion(true);

if (res)

return 0;

else

return -1;

}

public static void main(String[] args) throws Exception {

int res = ToolRunner.run(new WordCount(), args);

System.exit(res);

}

}

Mapper静态内部类

mapper包含三个主要的方法:setup, cleanup, and map。其中map是我们必须手动实现的。

setup和cleanup在一个指定的mapper周期内只执行一次。因此我们可以再这里做一些任务初始化的工作,如在setup打开共享文件,打开hbase等数据库连接。

同理cleanup用于清理任务,释放资源。

map是这中做繁忙的方法,它不断接受接受键值对,不断处理键值对和通过context写出结果键值对。值得注意的是map并不直接读取记录,而是由reader(该组件可重写)读取

然后通过context传递给map。

那么map是怎么执行的呢?

我们打开Mapper类,有这么一个run方法:

/**

* Expert users can override this method for more complete control over the

* execution of the Mapper.

* @param context

* @throws IOException

*/

public void run(Context context) throws IOException, InterruptedException {

setup(context);

while (context.nextKeyValue()) {

map(context.getCurrentKey(), context.getCurrentValue(), context);

}

cleanup(context);

}

Reducer静态内部类

和Mapper类一样,Reducer类也有setup, cleanup, reduce,和一个run方法。

setup, cleanup, reduce方法和Mapper里面的setup, cleanup, map类似。唯一不同的是reduce接受的是一个key对应一个值的集合的迭代器。

(remember, a reducer is invoked ?after execution of shuffle and sort, at which point, all the input key/value pairs are sorted, and

all the values for the same key are partitioned to a single reducer and come together)

总结

这里是MapReduce的原理及编程模型的概要介绍。后面章节我们将介绍《MR总结-Mapreduce原理解析(一)》《MR总结-Mapreduce原理解析(一)》《MR总结-Mapreduce原理解析(一)》

推荐阅读
  • 从0到1搭建大数据平台
    从0到1搭建大数据平台 ... [详细]
  • 本文介绍了如何使用Flume从Linux文件系统收集日志并存储到HDFS,然后通过MapReduce清洗数据,使用Hive进行数据分析,并最终通过Sqoop将结果导出到MySQL数据库。 ... [详细]
  • 深入理解云计算与大数据技术
    本文详细探讨了云计算与大数据技术的关键知识点,包括大数据处理平台、社会网络大数据、城市大数据、工业大数据、教育大数据、数据开放与共享的应用,以及搜索引擎与Web挖掘、推荐技术的研究及应用。文章还涵盖了云计算的基础概念、特点和服务类型分类。 ... [详细]
  • 本文介绍了Hadoop的核心组件,包括高可靠性和高吞吐量的分布式文件系统HDFS、分布式的离线并行计算框架MapReduce、作业调度与集群资源管理框架YARN以及支持其他模块的工具模块Common。 ... [详细]
  • 大数据领域的职业路径与角色解析
    本文将深入探讨大数据领域的各种职业和工作角色,帮助读者全面了解大数据行业的需求、市场趋势,以及从入门到高级专业人士的职业发展路径。文章还将详细介绍不同公司对大数据人才的需求,并解析各岗位的具体职责、所需技能和经验。 ... [详细]
  • Hadoop的文件操作位于包org.apache.hadoop.fs里面,能够进行新建、删除、修改等操作。比较重要的几个类:(1)Configurati ... [详细]
  • Hadoop平台警告解决:无法加载本机Hadoop库的全面应对方案
    本文探讨了在Hadoop平台上遇到“无法加载本机Hadoop库”警告的多种解决方案。首先,通过修改日志配置文件来忽略该警告,这一方法被证明是有效的。其次,尝试指定本地库的路径,但未能解决问题。接着,尝试不使用Hadoop本地库,同样没有效果。然后,通过替换现有的Hadoop本地库,成功解决了问题。最后,根据Hadoop的源代码自行编译本地库,也达到了预期的效果。以上方法适用于macOS系统。 ... [详细]
  • 2012年9月12日优酷土豆校园招聘笔试题目解析与备考指南
    2012年9月12日,优酷土豆校园招聘笔试题目解析与备考指南。在选择题部分,有一道题目涉及中国人的血型分布情况,具体为A型30%、B型20%、O型40%、AB型10%。若需确保在随机选取的样本中,至少有一人为B型血的概率不低于90%,则需要选取的最少人数是多少?该问题不仅考察了概率统计的基本知识,还要求考生具备一定的逻辑推理能力。 ... [详细]
  • Zookeeper作为Apache Hadoop生态系统中的一个重要组件,主要致力于解决分布式应用中的常见数据管理难题。它提供了统一的命名服务、状态同步服务以及集群管理功能,有效提升了分布式系统的可靠性和可维护性。此外,Zookeeper还支持配置管理和临时节点管理,进一步增强了其在复杂分布式环境中的应用价值。 ... [详细]
  • Hadoop 2.6 主要由 HDFS 和 YARN 两大部分组成,其中 YARN 包含了运行在 ResourceManager 的 JVM 中的组件以及在 NodeManager 中运行的部分。本文深入探讨了 Hadoop 2.6 日志文件的解析方法,并详细介绍了 MapReduce 日志管理的最佳实践,旨在帮助用户更好地理解和优化日志处理流程,提高系统运维效率。 ... [详细]
  • 从理想主义者的内心深处萌发的技术信仰,推动了云原生技术在全球范围内的快速发展。本文将带你深入了解阿里巴巴在开源领域的贡献与成就。 ... [详细]
  • 精选10款Python框架助力并行与分布式机器学习
    随着神经网络模型的不断深化和复杂化,训练这些模型变得愈发具有挑战性,不仅需要处理大量的权重,还必须克服内存限制等问题。本文将介绍10款优秀的Python框架,帮助开发者高效地实现分布式和并行化的深度学习模型训练。 ... [详细]
  • 深入解析:存储技术的演变与发展
    本文探讨了从单机文件系统到分布式文件系统的存储技术发展过程,详细解释了各种存储模型及其特点。 ... [详细]
  • Hadoop Datanode DataXceiver 错误处理问题
    Ambari 每分钟会向 Datanode 发送一次“ping”请求以确保其正常运行。然而,Datanode 在处理空内容时没有相应的逻辑,导致出现错误。 ... [详细]
  • Python 数据可视化实战指南
    本文详细介绍如何使用 Python 进行数据可视化,涵盖从环境搭建到具体实例的全过程。 ... [详细]
author-avatar
縌风而行2010
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有