热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Hadoop知识分享文稿(byquqi99)

Hadoop知识分享文稿(byquqi99)作者:张华写于:2010-08-15发表于:2011-03-31版权声明:可

                              Hadoop知识分享文稿 ( by quqi99 )

 

 

作者:张华 写于:2010-08-15   发表于:2011-03-31
版权声明:可以任意转载,转载时请务必以超链接形式标明文章原始出处和作者信息及本版权声明

( http://blog.csdn.net/quqi99 )

 

内容目录

目 录

1hadoop理论基础3

1.1hadoop是什么3

1.2hadoop项目3

1.3Map/Reduce任务的运行流程4

1.4Map/Reduce任务的数据流图5

2hadoop入门实战7

2.1测试环境7

2.2测试程序7

2.3属性配置9

2.4免密码SSH设置10

2.5配置hosts11

2.6格式化HDFS文件系统11

2.7启动守护进程11

2.8运行程序11

3hadoop高级进阶12

4hadoop应用案例12

5参考文献12

 

 

 

1hadoop理论基础

1.1hadoop是什么

HadoopDougCutting开发的,他是一个相当牛的哥们,他同时是大名鼎鼎的LuceneNutch的作者。

我是这样理解hadoop的,它就是用来对海量数据进行存储与分析的一个开源软件。它包括两块:

1HDFS( Hadoop Distrubuted File System ),可以对重要数据进行冗余存储,有点类似于冗余磁盘陈列。

2)对Map/Reduce编程模型的一个实现。当然,关系型数据库(RDBMS)也能做类似的事情,但为什么不用RDBMS呢?我们知道,让计算移动于数据上比让数据移动到计算更有效率。这使得Map/Reduce适合数据被一次写入和多次读取的应用,而RDBMS更适合持续更新的数据集。

1.2hadoop项目

如今,广义上的Hadoop已经发展成为一个分布式计算基础架构这把“大伞”下相关子项目的集合,其技术栈如下图所示:

 

图:

                                        

 

                                                    图1 hadoop的子项目


  • Core:一系列分布式文件系统和通用I/O的组件和接口(序列化、JavaRPC和持久化数据结构)

  • Avro:用于数据的序列化,当然,JDK中也有Seriable接口,但hadoop中有它自己的序列化方式,具说更有效率。

  • MapReduce:分布式数据处理模式和执行环境,运行于大型商用机集群。

  • HDFS:分布式文件系统,运行于大型商用机集群。

  • PigHDFS上的数据检索语言,类似于RDBMS中的SQL语言。

  • Hbase:一个分布式的、列存储数据库。HBase使用HDFS作为底层存储,同时支持MapReduce的批量式计算和点查询(随机读取)

  • ZooKeeper:一个分布式的、高可用性的协调服务。ZooKeeper提供分布式锁之类的基本服务用于构建分布式应用。

  • Hive:分布式数据仓库。Hive管理HDFS中存储的数据,并提供基于SQL的查询语言(由运行时引擎翻译成MapReduce作业)用以查询数据。

  • Chukwa:分布式数据收集和分析系统。Chukwa运行HDFS中存储数据的收集器,它使用MapReduce来生成报告。

1.3Map/Reduce任务的运行流程


                    

JobClientsubmitJob()方法的作业提交过程如下:

1)向Jobtraker请求一个新作业ID

2)调用JobTrackergetNewJobId()

3JobClient进行作业划分,并将划分后的输入及作业的JAR文件、配置文件等复制到HDFS中去

4)提交作业,会把此调用放入到一个内部的队列中,交由作业调度器进行调度。值得一提的是,针对 Map任务与Reduce任务,任务调度器是优先选择Map任务的,另外,任务调度器在选择Reduce任务时并没有考虑数据的本地化。然而,针对一个Map任务,它考虑的是Tasktracker网络位置和选取一个距离其输入划分文件最近的Tasktracker,它可能是数据本地化的,也可能是机架本地化的,还可能得到不同的机架上取数据。

5)初始化包括创建一个代表该正在运行的作业的对象,它封装任务和记录信息,以便跟踪任务的状态和进度。

6JobTracker任务调度器首先从共享文件系统中获取JobClient已计算好的输入划分信息,然后为每个划分创建一个Map任务。创建的reduce任务的数量是由JobConfMapred.reduce.tasks属性决定,它是用setNumReduceTask()方法来设置的。

7TaskTracker执行一个简单的循环,定期发送心跳(Heartbeat)方法调用Jobtracker告诉是否还活着,同时,心跳还会报告任务运行的是否已经准备运行新的任务。

8TaskTracker已经被分配了任务,下一步是运行任务。首先它需要将它所需的全部文件从HDFS中复制到本地磁盘。

9)紧接着,它要启动一个新的Java虚拟机来运行每个任务,这使得用户所定义的MapReduce函数的任务缺陷都不会影响TaskTracker(比如导致它崩溃或者挂起)

10)运行Map任务或者Reduce任务,值得一提的是,这些任务使用标准输入与输出流,换句话说,你可以用任务语言(如JAVAC++Shell等)来实现MapReduce,只要保证它们也使用标准输入与输出流,就可以将输出的键值对传回给JAVA进程了。

1.4Map/Reduce任务的数据流图


 

 

        图3Map/Reduce中单一Reduce任务的数据流图


 

 

                 图4Map/Reduce中多个Reduce任务的数据流图


 

                图5MapReduce中没有Reduce任务的数据流图


 

任务粒度:分片的个数,在将原始大数据切割成小数据集时,通常让小数据集小于或等于HDFS中的一个Block的大小(缺省是64M),这样能够保证一个小数据集位于一台计算机上,便于本地计算。M 小数据集待处理,就启动M Map 任务,注意这M Map 任务分布于N 台计算机上并行运行,Reduce任务的数量 R则可由用户指定

Map: 输入 输出List()

Reduce :输入 输出

分区(Partition): Map 任务输出的中间结果按key 的范围划分成R (R 是预先定义的 Reduce任务的个数),划分时通常使用hash 函数如:hash(key) mod R,这样可以保证某一段范围内的key,一定是由一个Reduce 任务来处理,可以简化Reduce 的过程

Combine: partition之前,还可以对中间结果先做combine,即将中间结果中有相同key 对合并成一对。combine的过程与Reduce的过程类似,很多情况下就可以直接使用Reduce函数,但combine是作为 Map任务的一部分,在执行完Map函数后紧接着执行的。Combine能够减少中间结果中 对的数目,从而减少网络流量。

下面举个例子来着重说明Combinehadoop允许用户声明一个combiner运行在Map的输出上,它的输出再作为Reduce的输入。例如,找出每一年的最调气温:

假如用户的输入的分片数是2,那么:

1)第一个Map的输出如下:

19500

195020

195010

2)第二个Map的输出如下:

195025

195015

3Reduce的输入如下:

1950,[020102515])

注意:如果有combine的话,此时Reduce的输入应该是:

max(0,20, 10, 25, 15) = max(max(0,20,10), max(25,15)) = max(20,25)

combine并不能取代reduce,例如,如果我们计算平均气温,便不能使用combine,因为:

mean(0,20,10,25,15)= 14

但是:

mean(mean(0,20,10),mean(25,15)) = mean(10,20) = 15

4Reduce的输出如下:

195025

2hadoop入门实战

hadoop有三种部署模式:

  • 单机模式:没有守护进程,一切都运行在单个JVM上,适合测试与调试。

  • 伪集群模式:守护进程在本地运行,适合模拟集群。

  • 集群模式:守护进程运行在集群的某台机器上。

所以,在以上任一特定模式运行hadoop时,只需要做两件事情:

1)设置适当属性

2)启动hadoop的守护进程(名称节点,二级名称节名,数据节点)

hadoop默认的是单机模式,下面,我们将着重介绍在集群模式是如何部署?

2.1测试环境

用两台机器做为测试环境,通常,集群里的一台机器被指定为NameNode,另一台不同的机器被指定为JobTracker,这些机器是masters;余下的机器即作为DataNode作为TaskTracker,这些机器是slaves

1master(JobTracker & NameNode):我的工作机( zhanghua.quqi.com)

2slave(TaskTracker & DataNode):我的开发机(tadev03.quqi.com)

3) 两机均已安装sshrsync

2.2测试程序

1/home/workspace/hadoopExample/input/file01:

HelloWorld Bye World

2)/home/workspace/hadoopExample/input/file02:

HelloHadoopGoodbye Hadoop

  1. WordCount.java

     

packagecom.TripResearch.hadoop;

 

importjava.io.IOException;

importjava.util.Iterator;

importjava.util.StringTokenizer;

 

importorg.apache.hadoop.fs.Path;

importorg.apache.hadoop.io.IntWritable;

importorg.apache.hadoop.io.LongWritable;

importorg.apache.hadoop.io.Text;

importorg.apache.hadoop.mapred.FileInputFormat;

importorg.apache.hadoop.mapred.FileOutputFormat;

importorg.apache.hadoop.mapred.JobClient;

importorg.apache.hadoop.mapred.JobConf;

importorg.apache.hadoop.mapred.MapReduceBase;

importorg.apache.hadoop.mapred.Mapper;

importorg.apache.hadoop.mapred.OutputCollector;

importorg.apache.hadoop.mapred.Reducer;

importorg.apache.hadoop.mapred.Reporter;

importorg.apache.hadoop.mapred.TextInputFormat;

importorg.apache.hadoop.mapred.TextOutputFormat;

/**

*@authorhuazhang

*/

@SuppressWarnings("deprecation")

publicclassWordCount {

 

publicstaticclassMyMap extendsMapReduceBaseimplements

Mapper {

privatefinalstaticIntWritable one= newIntWritable(1);

privateText word =newText();

 

publicvoidmap(LongWritable key, Text value,

OutputCollector output, Reporter reporter)

throwsIOException {

Stringline = value.toString();

StringTokenizertokenizer = newStringTokenizer(line);

while(tokenizer.hasMoreTokens()) {

word.set(tokenizer.nextToken());

output.collect(word,one);

}

}

}

 

publicstaticclassMyReduce extendsMapReduceBaseimplements

Reducer {

publicvoidreduce(Text key, Iterator values,

OutputCollector output, Reporter reporter)

throwsIOException {

intsum = 0;

while(values.hasNext()) {

sum+= values.next().get();

}

output.collect(key,newIntWritable(sum));

}

}

 

publicstaticvoidmain(String[] args) throwsException {

JobConfcOnf= newJobConf(WordCount.class);

conf.setJobName("wordcount");

 

conf.setOutputKeyClass(Text.class);

conf.setOutputValueClass(IntWritable.class);

 

conf.setMapperClass(MyMap.class);

conf.setCombinerClass(MyReduce.class);

conf.setReducerClass(MyReduce.class);

 

conf.setInputFormat(TextInputFormat.class);

conf.setOutputFormat(TextOutputFormat.class);

 

FileInputFormat.setInputPaths(conf,newPath(args[0]));

FileOutputFormat.setOutputPath(conf,newPath(args[1]));

 

JobClient.runJob(conf);

}

}

 


2.3属性配置

按下图所示修改至少3个属性, 如下图所示:

  

 

  1.  
    1. conf/core-site.xml

      fs.default.name

      hdfs://zhanghua.quqi.com:9000

      注意:此处如果是伪集群模式可配置为hdfs://localhost:9000,是本地模式则为:localhost:9000。另外,其他输入输入路径,是本地模式是本地文件系统的路径,是非地模式,用hdfs文件系统的路径格式。

    2. conf/hdfs-site.xml

      dfs.replication

      1

    3. conf/mapred-site.xml

      mapred.job.tracker

      zhanghua.quqi.com:8021

    4. masters

      zhanghua.quqi.com(伪分布模式就配成localhost)

    5. slaves

      tadev03.quqi.com(伪分布模式就配成localhost)

    6. 将以上配置好的hadoop文件夹拷到所有机器的相同目录下:

      scp-r /home/soft/hadoop-0.20.2root@tadev03.quqi.com:/home/soft/hadoop-0.20.2

      注意:确保两台机器的JAVA_HOME的路径一致,如果不一致,就要改

 

hadoop所有可配置的配置文件说明如下:

hadoop-env.sh 运行hadoop的脚本中使用的环境变量

core-site.xml hadoop的核心配置,如HDFSMapReduce中很普遍的I/O设置

hdfs-site.xml HDFS后台程序设置的配置:名称节点,第二名称节点及数据节点

mapred-site.xml MapReduce后台程序设置的配置:jobtrackertasktracker

masters 记录运行第二名称节点的机器(一行一个)的列表

slaves 记录运行数据节点的机器(一行一个)的列表

2.4免密码SSH设置

免密码ssh设置,保证至少从master可以不用口令登陆所有的slaves

1)生成密钥对:ssh-keygen-t rsa -P '' -f /root/.ssh/id_rsa (这样密钥就留在了客户端)

2)将公钥拷到要连接的服务器,

scp/root/.ssh/id_rsa.pub root@tadev03.quqi.com:/tmp

ssh-l root tadev03.quqi.com

more/tmp/id_rsa.pub >> /root/.ssh/authorized_keys

  1. sshtadev03.quqi.com 不需要输入密码即为成功。

(注意:伪分布模式也要配置sshlocalhost无密码登录,如果是mac,请将ssh打开)

(另外,在mac中请在hadoop-config.sh文件中配置exportJAVA_HOME=/System/Library/Frameworks/JavaVM.framework/Versions/1.6/Home)

 

三条控制线线:

SSH → 这样就可以直接从主节点远程启动从节点上的脚本,如sshtadev03.quqi.com '/var/aa.sh'

NameNode (http://localhost:50070) →DataNode

JobTracker(http://localhost:50030)→TaskTracker (http://localhost:50060)

2.5配置hosts

必须配置masterslaves之间的双向hosts.修改/etc/hosts进行配置,略。

2.6格式化HDFS文件系统

和我们常见的NTFSFAT32文件系统一样,NDFS最开始也是需要格式化的。格式化过程用来创建存储目录以及名称节点的永久数据结构的初始版本来创建一个空的文件系统。命令如下:

hadoopnamenode -format

 

已知问题:在重新格式化时,可能会报:SHUTDOWN_MSG:Shutting down NameNode

解决办法:rm-rf /tmp/hadoop-root/dfs/name

2.7启动守护进程

1)启动HDFS守护进程:start-dfs.sh

(start-dfs.sh脚本会参照NameNode${HADOOP_CONF_DIR}/slaves文件的内容,在所有列出的slave上启动DataNode守护进程。)

已知问题:在已设置JAVA_HOME的情况下仍会报:Error:JAVA_HOME is not set

解决办法:我是在hadoop.sh文件中加下面一句解决的:

JAVA_HOME=/System/Library/Frameworks/JavaVM.framework/Versions/1.6/Home

2)启动Map/Reduce守护进程:start-mapred.sh

(start-mapred.sh脚本会参照JobTracker${HADOOP_CONF_DIR}/slaves文件的内容,在所有列出的slave上启动TaskTracker守护进程)

3)启动成功后,可以通过访问http://localhost:50030验证。

 

注意:也可直接使用start-all.shstop-all.sh脚本,在主节点master上面启动hadoop,主节点会启动/停止所有从节点的hadoop。会启动5java进程,同时会在/tmp目录下创建五个pid文件记录这些进程ID号。通过这五个文件,可以得知namenode,datanode, secondary namenode, jobtracker, tasktracker 分别对应于哪一个Java进程。

 

已知问题:启动后,日志中报:java.io.IOException:File /tmp/hadoop-root/mapred/system/jobtracker.info could only bereplicated to 0 nodes, instead of 1

解决办法:原因是tadev03.quqi.com机器上无法pingzhanghua.quqi.com

2.8运行程序

先将测试数据及其他输入由本地文件系统拷到HFDS文件系统中去(注意:jar除外

  1.  
    1. hadoopfs -mkdir input

    2. hadoopfs -ls .

    3. hadoopfs -copyFromLocal /home/workspace/hadoopExample/input/file01input/file01

    4. hadoopfs -copyFromLocal /home/workspace/hadoopExample/input/file02input/file02

       

      这时候就可以执行下列命令运行程序了,注意:后面的input, output等目录都是HDFS文件系统的路径。(如果是本地模式,就用本地文件系统的绝对路径)

  1.  

      hadoopjar/home/workspace/hadoopExample/hadoopExample.jarcom.TripResearch.hadoop.WordCount input/output

      已知问题:在集群模式下运行时任务会Pending

 

最后,运行下列命令查看结果:

/home/soft/hadoop-0.20.2/bin/hadoopfs -cat output/part-00000

 

也可访问下列地址查看状态:

NameNode– http://zhanghua.quqi.com:50070/

    JobTracker- http://zhanghua.quqi.com:50030/

常用命令说明如下:

hadoopdfs –ls 查看/usr/root目录下的内容径;
hadoop dfs –rmr xxx xxx
就是删除目录;
hadoop dfsadmin -report
这个命令可以全局的查看DataNode的情况;
hadoop job -list
后面增加参数是对于当前运行的Job的操作,例如list,kill等;
hadoop balancer
均衡磁盘负载的命令。

3hadoop高级进阶

4hadoop应用案例

5参考文献
  1. http://hadoop.apache.org/common/docs/r0.18.2/cn/

  2. hadoop0.20.2集群配置入门http://dev.firnow.com/course/3_program/java/javajs/

  3. Hadoop分布式文件系统(HDFS)初步实践http://huatai.me/?p=352

  4. Hadoop分布式部署实验2_格式化分布式文件系统http://hi.baidu.com/thinke365/blog/item/15602aa8f9074cf41e17a235.html

  5. hadoop安装出现问题(紧急),请前辈指教http://forum.hadoop.tw/viewtopic.php?f=4&t=90

  6. Hadoop进行分布式并行编程http://www.ibm.com/developerworks/cn/opensource/os-cn-hadoop1/index.html

  7. Hadoop进行分布式数据处理http://tech.ddvip.com/2010-06/1275983295155033.html

 


推荐阅读
author-avatar
mobiledu2502870957
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有