热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

hadoop学习之路(二)hadoop基本概念原理以及单词统计任务源码分析

在上一遍博文中,已经将hadoop集群环境搭建完毕。那么,接下来,笔者再根据安装过程中的一些名词对象进行解释,以及大致的运行

在上一遍博文中,已经将hadoop集群环境搭建完毕。那么,接下来,笔者再根据安装过程中的一些名词对象进行解释,以及大致的运行原理。最后,再获取hadoop-example jar中的单词计数源码,进行解释并在hadoop环境中运行。


一、hadoop基本概念

hadoop包括两个核心组成:
HDFS:分布式文件系统,存储海量的数据
MapReduce:并行处理框架,实现任务分解和调度。

整个HDFS三个重要角色:NameNode、DataNode和Client。NameNode可以看作是分布式文件系统中的管理者,主要负责管理文件系统的命名空间、集群配置信息和存储块的复制等。NameNode会将文件系统的Meta-data存储在内存中,这些信息主要包括了文件信息、每一个文件对应的文件块的信息和每一个文件块在DataNode的信息等。DataNode是文件存储的基本单元,它将Block存储在本地文件系统中,保存了Block的Meta-data,同时周期性地将所有存在的Block信息发送给NameNode。Client就是需要获取分布式文件系统文件的应用程序。

MapReduce 是现今一个非常流行的分布式计算框架,它被设计用于并行计算海量数据。第一个提出该技术框架的是Google 公司,而Google 的灵感则来自于函数式编程语言,如LISP,Scheme,ML 等。MapReduce 框架的核心步骤主要分两部分:Map 和Reduce。当你向MapReduce 框架提交一个计算作业时,它会首先把计算作业拆分成若干个Map 任务,然后分配到不同的节点上去执行,每一个Map 任务处理输入数据中的一部分,当Map 任务完成后,它会生成一些中间文件,这些中间文件将会作为Reduce 任务的输入数据。Reduce 任务的主要目标就是把前面若干个Map 的输出汇总到一起并输出。


二、hadoop运行机制

关于hadoop的运行机制,这里由于笔者还没真正弄透彻,只是知道一个大致的处理思想。下面就贴几张比较形象的图片:
HDFS:
这里写图片描述
文件写入:
Client向NameNode发起文件写入的请求。
NameNode根据文件大小和文件块配置情况,返回给Client它所管理部分DataNode的信息。
Client将文件划分为多个Block,根据DataNode的地址信息,按顺序写入到每一个DataNode块中。
文件读取:
Client向NameNode发起文件读取的请求。
NameNode返回文件存储的DataNode的信息。
Client读取文件信息。
文件Block复制:
NameNode发现部分文件的Block不符合最小复制数或者部分DataNode失效。
通知DataNode相互复制Block。
DataNode开始直接相互复制。

MapReduce工作原理:
这里写图片描述
一切都是从最上方的user program开始的,user program链接了MapReduce库,实现了最基本的Map函数和Reduce函数。

MapReduce库先把user program的输入文件划分为M份(M为用户定义),每一份通常有16MB到64MB,如图左方所示分成了split0~4(文件块);然后使用fork将用户进程拷贝到集群内其它机器上。
user program的副本中有一个称为master,其余称为worker,master是负责调度的,为空闲worker分配作业(Map作业或Reduce作业),worker数量可由用户指定的。
被分配了Map作业的worker,开始读取对应文件块的输入数据,Map作业数量是由M决定的,和split一一对应;Map作业(包含多个map函数)从输入数据中抽取出键值对,每一个键值对都作为参数传递给map函数,map函数产生的中间键值对被缓存在内存中。
缓存的中间键值对会被定期写入本地磁盘。主控进程知道Reduce的个数,比如R个(通常用户指定)。然后主控进程通常选择一个哈希函数作用于键并产生0~R-1个桶编号。Map任务输出的每个键都被哈希起作用,根据哈希结果将Map的结果存放到R个本地文件中的一个(后来每个文件都会指派一个Reduce任务)。
master通知分配了Reduce作业的worker它负责的分区在什么位置。当Reduce worker把所有它负责的中间键值对都读过来后,先对它们进行排序,使得相同键的键值对聚集在一起。因为不同的键可能会映射到同一个分区也就是同一个Reduce作业(谁让分区少呢),所以排序是必须的。
reduce worker遍历排序后的中间键值对,对于每个唯一的键,都将键与关联的值传递给reduce函数,reduce函数产生的输出会添加到这个分区的输出文件中。
当所有的Map和Reduce作业都完成了,master唤醒正版的user program,MapReduce函数调用返回user program的代码。
所有执行完毕后,MapReduce输出放在了R个分区的输出文件中(分别对应一个Reduce作业)。用户通常并不需要合并这R个文件,而是将其作为输入交给另一个MapReduce程序处理。整个过程中,输入数据是来自底层分布式文件系统(GFS)的,中间数据是放在本地文件系统的,最终输出数据是写入底层分布式文件系统(GFS)的。而且我们要注意Map/Reduce作业和map/reduce函数的区别:Map作业处理一个输入数据的分片,可能需要调用多次map函数来处理每个输入键值对;Reduce作业处理一个分区的中间键值对,期间要对每个不同的键调用一次reduce函数,Reduce作业最终也对应一个输出文件。

函数说明 pid_t fork( void)

一个现有进程可以调用fork函数创建一个新进程。由fork创建的新进程被称为子进程。fork函数被调用一次但返回两次。两次返回的唯一区别是子进程中返回0值而父进程中返回子进程ID。子进程是父进程的副本,它将获得父进程数据空间、堆、栈等资源的副本。注意,子进程持有的是上述存储空间的“副本”,这意味着父子进程间不共享这些存储空间。
这里关于hadoop的原理参考了以下两篇博文:
HDFS的运行原理:http://www.cnblogs.com/laov/p/3434917.html,
MapReduce工作原理:http://www.cnblogs.com/kaituorensheng/p/3958862.html


三、单词统计源码分析

这里,笔者搭建一个简单的maven项目,添加hadoop依赖,将hadoop src下的单词计数代码迁移过来,稍作修改,结构如下:
这里写图片描述
pom.xml:


<project xsi:schemaLocation&#61;"http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns&#61;"http://maven.apache.org/POM/4.0.0"xmlns:xsi&#61;"http://www.w3.org/2001/XMLSchema-instance"><modelVersion>4.0.0modelVersion><parent><groupId>org.qiyongkang.hadoopgroupId><artifactId>qyk_hadoop_1.xartifactId><version>0.0.1-SNAPSHOTversion>parent><groupId>org.qiyongkang.hadoopgroupId><artifactId>qyk_hadoop_1_wordcountartifactId><version>0.0.1-SNAPSHOTversion><name>qyk_hadoop_1_wordcountname><url>http://maven.apache.orgurl><properties><project.build.sourceEncoding>UTF-8project.build.sourceEncoding>properties><dependencies><dependency><groupId>junitgroupId><artifactId>junitartifactId><version>3.8.1version><scope>testscope>dependency>dependencies><build><plugins><plugin><groupId>org.apache.maven.pluginsgroupId><artifactId>maven-jar-pluginartifactId><version>2.4version><configuration><archive><manifest><addClasspath>falseaddClasspath><classpathPrefix>lib/classpathPrefix><mainClass>org.qiyongkang.hadoop.wordcount.WordCountmainClass>manifest>archive>configuration>plugin>plugins>build>
project>

然后&#xff0c;再来看看主类WordCount.java&#xff1a;

package org.qiyongkang.hadoop.wordcount;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;public class WordCount {public static void main(String[] args) throws Exception {Configuration conf &#61; new Configuration();String[] otherArgs &#61; new GenericOptionsParser(conf, args).getRemainingArgs();//如果没有两个输入参数&#xff0c;则不执行if (otherArgs.length !&#61; 2) {System.err.println("Usage: wordcount ");System.exit(2);}//创建一个作业Job job &#61; new Job(conf, "word count");//设置job的main classjob.setJarByClass(WordCount.class);//设置Mapperjob.setMapperClass(TokenizerMapper.class);//在MapReduce中&#xff0c;当map生成的数据过大时&#xff0c;带宽就成了瓶颈&#xff0c;怎样精简压缩传给Reduce的数据&#xff0c;有不影响最终的结果呢。//有一种方法就是使用Combiner&#xff0c;Combiner号称本地的Reduce&#xff0c;Reduce最终的输入&#xff0c;是Combiner的输出job.setCombinerClass(IntSumReducer.class);//设置Reducerjob.setReducerClass(IntSumReducer.class);//设置输出key和value的类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);//设置输入文件所在路径FileInputFormat.addInputPath(job, new Path(otherArgs[0]));//设置输出结果所在路径FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));System.exit(job.waitForCompletion(true) ? 0 : 1);}
}

再就是Mapper类&#xff1a;

package org.qiyongkang.hadoop.wordcount;import java.io.IOException;
import java.util.StringTokenizer;import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;/*** ClassName:TokenizerMapper
* Function: 分.
* Date: 2016年2月23日 下午3:20:19
* &#64;author qiyongkang* &#64;version * &#64;since JDK 1.6* &#64;see */

public class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{private final static IntWritable one &#61; new IntWritable(1);private Text word &#61; new Text();&#64;Overrideprotected void map(Object key, Text value, Mapper.Context context)throws IOException, InterruptedException {//我们当时放入到hdfs中input目录下的文件&#xff0c;每个文件都是包含单词的字符文本StringTokenizer itr &#61; new StringTokenizer(value.toString());while (itr.hasMoreTokens()) {//这里取到每一个单词&#xff0c;作为map的输出&#xff0c;也就是reduce的输入word.set(itr.nextToken());//单词作为key&#xff0c;1作为value&#xff0c;因为每获取一个单词就记1context.write(word, one);}}}

最后&#xff0c;就是Reducer类&#xff1a;

package org.qiyongkang.hadoop.wordcount;import java.io.IOException;import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;/*** ClassName:IntSumReducer
* Date: 2016年2月23日 下午3:23:34
* &#64;author qiyongkang* &#64;version * &#64;since JDK 1.6* &#64;see */

public class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> {private IntWritable result &#61; new IntWritable();&#64;Overrideprotected void reduce(Text key, Iterable values,Reducer.Context context) throws IOException, InterruptedException {//这里map的输出都是单词&#xff0c;1的形势&#xff0c;洗牌后&#xff0c;我们把相同key也就是单词进行汇总int sum &#61; 0;for (IntWritable val : values) {sum &#43;&#61; val.get();}result.set(sum);//以单词 数量的格式输出context.write(key, result);}}

相关的解释已在注解中标明。
然后&#xff0c;运行mvn package便可打成jar包&#xff0c;再将此包上传到服务器&#xff0c;这里笔者放在200服务器的/root目录下。


四、运行单词统计并查看作业运行状态

启动hadoop后&#xff0c;运行hadoop jar /root/qyk_hadoop_1_wordcount-0.0.1-SNAPSHOT.jar input output&#xff0c;可以看到&#xff1a;
这里写图片描述&#xff0c;
然后&#xff0c;运行hadoop fs -cat output/*&#xff0c;查看统计结果&#xff1a;
这里写图片描述&#xff0c;
最后访问http://172.31.26.200:50030/可以查看此次job运行的情况&#xff1a;
这里写图片描述
还可以点击查看任务运行详情&#xff1a;
这里写图片描述
好了&#xff0c;今天就介绍到这儿了。


推荐阅读
  • 本文介绍了Java工具类库Hutool,该工具包封装了对文件、流、加密解密、转码、正则、线程、XML等JDK方法的封装,并提供了各种Util工具类。同时,还介绍了Hutool的组件,包括动态代理、布隆过滤、缓存、定时任务等功能。该工具包可以简化Java代码,提高开发效率。 ... [详细]
  • 本文介绍了在sqoop1.4.*版本中,如何实现自定义分隔符的方法及步骤。通过修改sqoop生成的java文件,并重新编译,可以满足实际开发中对分隔符的需求。具体步骤包括修改java文件中的一行代码,重新编译所需的hadoop包等。详细步骤和编译方法在本文中都有详细说明。 ... [详细]
  • mapreduce源码分析总结
    这篇文章总结的非常到位,故而转之一MapReduce概述MapReduce是一个用于大规模数据处理的分布式计算模型,它最初是由Google工程师设计并实现的ÿ ... [详细]
  • Request对象和Response对象request:(请求)当一个页面被请求时,Django就会创建一个包含本次请求原信息的HttpRequest对象。Djang ... [详细]
  • MR程序的几种提交运行模式本地模型运行1在windows的eclipse里面直接运行main方法,就会将job提交给本地执行器localjobrunner执行-- ... [详细]
  • t-io 2.0.0发布-法网天眼第一版的回顾和更新说明
    本文回顾了t-io 1.x版本的工程结构和性能数据,并介绍了t-io在码云上的成绩和用户反馈。同时,还提到了@openSeLi同学发布的t-io 30W长连接并发压力测试报告。最后,详细介绍了t-io 2.0.0版本的更新内容,包括更简洁的使用方式和内置的httpsession功能。 ... [详细]
  • eclipse学习(第三章:ssh中的Hibernate)——11.Hibernate的缓存(2级缓存,get和load)
    本文介绍了eclipse学习中的第三章内容,主要讲解了ssh中的Hibernate的缓存,包括2级缓存和get方法、load方法的区别。文章还涉及了项目实践和相关知识点的讲解。 ... [详细]
  • HTML学习02 图像标签的使用和属性
    本文介绍了HTML中图像标签的使用和属性,包括定义图像、定义图像地图、使用源属性和替换文本属性。同时提供了相关实例和注意事项,帮助读者更好地理解和应用图像标签。 ... [详细]
  • 本文介绍了关于smarty自定义缓存名的解决思路,通过放弃生成缓存,直接生成html的静态页面来提高速度。同时提供了一个参考链接供参考。 ... [详细]
  • 一句话解决高并发的核心原则
    本文介绍了解决高并发的核心原则,即将用户访问请求尽量往前推,避免访问CDN、静态服务器、动态服务器、数据库和存储,从而实现高性能、高并发、高可扩展的网站架构。同时提到了Google的成功案例,以及适用于千万级别PV站和亿级PV网站的架构层次。 ... [详细]
  • Vagrant虚拟化工具的安装和使用教程
    本文介绍了Vagrant虚拟化工具的安装和使用教程。首先介绍了安装virtualBox和Vagrant的步骤。然后详细说明了Vagrant的安装和使用方法,包括如何检查安装是否成功。最后介绍了下载虚拟机镜像的步骤,以及Vagrant镜像网站的相关信息。 ... [详细]
  • Maven构建Hadoop,
    Maven构建Hadoop工程阅读目录序Maven安装构建示例下载系列索引 序  上一篇,我们编写了第一个MapReduce,并且成功的运行了Job,Hadoop1.x是通过ant ... [详细]
  • Hadoop源码解析1Hadoop工程包架构解析
    1 Hadoop中各工程包依赖简述   Google的核心竞争技术是它的计算平台。Google的大牛们用了下面5篇文章,介绍了它们的计算设施。   GoogleCluster:ht ... [详细]
  • 用pandas库修改excel文件里的内容,并把excel文件格式存为csv格式,再将csv格式改为html格式
    假设有Excel文件data.xlsx,其中内容为:     ID age height    sex weight张三  1  39    181 female     85李四  2  40    180   male     80王五  3  38    178 female     78赵六  4  59    1 ... [详细]
  •        在搭建Hadoop环境之前,请先阅读如下博文,把搭建Hadoop环境之前的准备工作做好,博文如下:       1、CentOS6.7下安装JDK,地址:http:b ... [详细]
author-avatar
手机用户2502908277
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有