热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Hadoop小小笔记

偶遇JobClient这两年在在整一个云计算的东西,但工作主要集中在Client端。对Hadoop早有耳闻,但一直没有机会,前几天看到

>> 偶遇JobClient

这两年在在整一个云计算的东西,但工作主要集中在Client端。

对Hadoop早有耳闻,但一直没有机会,前几天看到了JobClient这个熟悉的字眼,所以就把Hadoop的源代码拖来,找个机会看看。倒不是想用Hadoop干什么事情,了解了解,免得“云深不知处”。

虽然Hadoop是用Java开发的,但问题不大,基本上能看懂。Hadoop当然是博大精深,包含了conf/DFS/io/ipc/MapReduce几个部分,但我也只是挑了MapReduce的代码作为观摩对象:

感兴趣的文件夹:

...\src\mapred\org\apache\hadoop\mapred

...\src\mapred\org\apache\hadoop\mapreduce

感兴趣的类:

JobTracker/TaskTracker/

JobID/JobProfile/JobContext

JobInProgress/TaskInProgress/MapTask/ReduceTask

JobHistory/JobHistoryServer

 

>> 关于MapReduce

MapReduce模型隐藏了并行化,容错,位置优化和负载均衡的细节,使用起来比较简洁。

 1. MapReduce == Map -> Combine -> Reduce

Map-Reduce框架的运作完全基于对,也就是说数据的输入是一批对,生成的结果也是一批对,只是有时候它们的类型不一样而已。

由于Key和value的类需要支持被序列化操作,它们必须要实现Writable接口。此外,key的类还必须实现WritableComparable接口,以便可以让框架对数据集的执行排序操作。

一个Map-Reduce任务的执行过程以及数据输入输出的类型如下所示:

(input) -> map -> -> combine -> -> reduce -> (output)

 

 2. 例子: WordCount 1.0

MapReduce Tutorial中有一个WordCount的例子,要求从读取两个文本文件并计算文本中每个单词的总数。

 

源代码:

package org.myorg;

import java.io.IOException;
import java.util.*;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;

public class WordCount {

// Mapper之Map方法
public static class Map extends MapReduceBase implements Mapper {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();

public void map(LongWritable key, Text value, OutputCollector output, Reporter reporter) throws IOException {
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens()) {
word.set(tokenizer.nextToken());
output.collect(word, one);
}
}
}

// Reducer之Reduce方法
public static class Reduce extends MapReduceBase implements Reducer {
public void reduce(Text key, Iterator values, OutputCollector output, Reporter reporter) throws IOException {
int sum = 0;
while (values.hasNext()) {
sum += values.next().get();
}
output.collect(key, new IntWritable(sum));
}
}

public static void main(String[] args) throws Exception {
// Job Configuraion
JobConf conf = new JobConf(WordCount.class);
conf.setJobName("wordcount");

conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);

// 设置Mapper/Combiner/Reducer
conf.setMapperClass(Map.class);
conf.setCombinerClass(Reduce.class);
conf.setReducerClass(Reduce.class);

// 设置输入/输出的格式,此处均为Text
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);

FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));

// 运行Job
JobClient.runJob(conf);
}
}

 

Inputs(file01 & file02):

-------------------------------------------------------------------

../wordcount/input/file01    Hello World Bye World
../wordcount/input/file02:     Hello Hadoop Goodbye Hadoop

-------------------------------------------------------------------

Output:

------------------

    Bye 1
    Goodbye 1
    Hadoop 2
    Hello 2
    World 2

------------------

 

Workflow:

Step1: Mapper

Mapper通过map方法每次处理一行文本&#xff0c;然后利用StringTokenizer将其分离成Tokens&#xff0c;然后就将键值对<, 1>输出&#xff0c;它将作为Combine的输入。

----------------------------

the first map emits:



The second map emits:



-----------------------------

Step2: Combine

在WordCount这个例子中&#xff0c;Combiner与Reducer是一样的&#xff0c;Combiner类负责将相同key的值合并起来。

----------------------------------

The output of the first map:


The output of the second map:


 ----------------------------------

Step3&#xff1a; Reduce

Reducer类通过reduce方法&#xff0c;计算每个单词的总数&#xff0c;从而得到最终的输出。

-----------------------------------

Thus the output of the job is:




-----------------------------------

 

>> MapReduce Architecture

--------------------------------------------------------------------------------------------

--------------------------------------------------------------------------------------------

 

>> JobClient

每一个job都会在用户端通过JobClient类将应用程序以及配置参数Configuration打包成jar文件存储在HDFS&#xff0c;并把路径提交到 JobTracker的master服务&#xff0c;然后由master创建每一个Task&#xff08;即MapTask和ReduceTask&#xff09;将它们分发到各个 TaskTracker服务中去执行。

Methods:

JobClient.runJob()

JobClient.submitJob

JobClient.killJob()

 

>> JobTracker

它们都是由一个master服务JobTracker和多个运行于多个节点的slaver服务TaskTracker两个类提供的服务调度的。 master负责调度job的每一个子任务task运行于slave上&#xff0c;并监控它们&#xff0c;如果发现有失败的task就重新运行它&#xff0c;slave则负责直接执行每 一个task。TaskTracker都需要运行在HDFS的DataNode上&#xff0c;而JobTracker则不需要&#xff0c;一般情况应该把JobTracker 部署在单独的机器上。

JobTracker is a daemon per pool that administers all aspects of mapred activities.

JobTracker keeps all the current jobs by containing instances of JobInProgress.

Methods:

JobTracker.submitJob(): creates/adds a JobInProgress to jobs and jobsByArrival

JobTracker.pollForNewTask()

 

>> JobInProgress/TaskInProgress

 JobInProgress represents a job as it is being tracked by JobTracker.

 TaskInProgress represents a set of tasks for a given unique input, where input is a split for map task or a partition for reduce task.

 

>> MapTask/ReduceTask:
MapTask offers method run() that calls MapRunner.run(), which in turn calls the user-supplied Mapper.map().
ReduceTask offers run() that sorts input files using SequenceFile.Sorter.sort(), and then calls user-supplied Reducer.reduce().

 

>> 其他

Hadoop的Task Recovery机制还是比较有意思的&#xff0c;它可以重新尝试运行失败的Task&#xff0c;具体可以看看JobTracker.RecoveryManager。

 

// I should borrow some concept of Hadoop to SolidMCP
//    RunningJob
//    Reporter
//    JobClient
//    JobHistory.HistoryCleaner
//    JobHistory.JobInfo
//    JobHistory.Listener
//    JobProfile
//    TaskReport
//    TaskTracker
//    TaskLog
//    JobQueueInfo
//    JobContext
//    JobEndNotifier
//    JobControl

 

References:

http://wiki.apache.org/hadoop/HadoopMapRedClasses

http://sebug.net/paper/databases/nosql/Nosql.html

 

转:https://www.cnblogs.com/piaoger/archive/2012/01/31/2332655.html



推荐阅读
  • 对于开源的东东,尤其是刚出来不久,我认为最好的学习方式就是能够看源代码和doc,測试它的样例为了方便查看源代码,关联导入源代 ... [详细]
  • 大数据Hadoop生态(20)MapReduce框架原理OutputFormat的开发笔记
    本文介绍了大数据Hadoop生态(20)MapReduce框架原理OutputFormat的开发笔记,包括outputFormat接口实现类、自定义outputFormat步骤和案例。案例中将包含nty的日志输出到nty.log文件,其他日志输出到other.log文件。同时提供了一些相关网址供参考。 ... [详细]
  • java.lang.UnsatisfiedLinkError: …….io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z
    在利用hadoop运行MapReduce项目时,提示报错(注意最后是Z):Exceptioninthreadmainj ... [详细]
  • (三)多表代码生成的实现方法
    本文介绍了一种实现多表代码生成的方法,使用了java代码和org.jeecg框架中的相关类和接口。通过设置主表配置,可以生成父子表的数据模型。 ... [详细]
  • importorg.apache.hadoop.hdfs.DistributedFileSystem;导入方法依赖的package包类privatevoidtestHSyncOpe ... [详细]
  • MapReduce工作流程最详细解释
    MapReduce是我们再进行离线大数据处理的时候经常要使用的计算模型,MapReduce的计算过程被封装的很好,我们只用使用Map和Reduce函数,所以对其整体的计算过程不是太 ... [详细]
  • 一、Hadoop来历Hadoop的思想来源于Google在做搜索引擎的时候出现一个很大的问题就是这么多网页我如何才能以最快的速度来搜索到,由于这个问题Google发明 ... [详细]
  • Iamtryingtomakeaclassthatwillreadatextfileofnamesintoanarray,thenreturnthatarra ... [详细]
  • 本文介绍了如何在给定的有序字符序列中插入新字符,并保持序列的有序性。通过示例代码演示了插入过程,以及插入后的字符序列。 ... [详细]
  • Spring特性实现接口多类的动态调用详解
    本文详细介绍了如何使用Spring特性实现接口多类的动态调用。通过对Spring IoC容器的基础类BeanFactory和ApplicationContext的介绍,以及getBeansOfType方法的应用,解决了在实际工作中遇到的接口及多个实现类的问题。同时,文章还提到了SPI使用的不便之处,并介绍了借助ApplicationContext实现需求的方法。阅读本文,你将了解到Spring特性的实现原理和实际应用方式。 ... [详细]
  • [大整数乘法] java代码实现
    本文介绍了使用java代码实现大整数乘法的过程,同时也涉及到大整数加法和大整数减法的计算方法。通过分治算法来提高计算效率,并对算法的时间复杂度进行了研究。详细代码实现请参考文章链接。 ... [详细]
  • Android源码深入理解JNI技术的概述和应用
    本文介绍了Android源码中的JNI技术,包括概述和应用。JNI是Java Native Interface的缩写,是一种技术,可以实现Java程序调用Native语言写的函数,以及Native程序调用Java层的函数。在Android平台上,JNI充当了连接Java世界和Native世界的桥梁。本文通过分析Android源码中的相关文件和位置,深入探讨了JNI技术在Android开发中的重要性和应用场景。 ... [详细]
  • 本文介绍了使用Spark实现低配版高斯朴素贝叶斯模型的原因和原理。随着数据量的增大,单机上运行高斯朴素贝叶斯模型会变得很慢,因此考虑使用Spark来加速运行。然而,Spark的MLlib并没有实现高斯朴素贝叶斯模型,因此需要自己动手实现。文章还介绍了朴素贝叶斯的原理和公式,并对具有多个特征和类别的模型进行了讨论。最后,作者总结了实现低配版高斯朴素贝叶斯模型的步骤。 ... [详细]
  • Java如何导入和导出Excel文件的方法和步骤详解
    本文详细介绍了在SpringBoot中使用Java导入和导出Excel文件的方法和步骤,包括添加操作Excel的依赖、自定义注解等。文章还提供了示例代码,并将代码上传至GitHub供访问。 ... [详细]
  • 本文介绍了解决java开源项目apache commons email简单使用报错的方法,包括使用正确的JAR包和正确的代码配置,以及相关参数的设置。详细介绍了如何使用apache commons email发送邮件。 ... [详细]
author-avatar
阿离说你是宝贝
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有