蒂姆是我们最受欢迎和最多产的作家之一。 在developerWorks上浏览Tim的所有文章 。 查看Tim的个人资料,并与他,其他作者以及My developerWorks中的其他读者建立联系。
本系列的前两篇文章重点介绍了针对单节点和多节点集群的Hadoop的安装和配置。 这篇最后的文章探讨了Hadoop中的编程-特别是Ruby语言中的map和reduce应用程序的开发。 我之所以选择Ruby,是因为它首先是您应该了解的一种很棒的面向对象的脚本语言,其次,您将在“ 相关主题”部分中找到大量参考资料,以获取有关 Java™和Python语言的教程。 通过对MapReduce编程的探索,我还将向您介绍流应用程序编程接口(API)。 该API提供了使用Java语言以外的其他语言开发应用程序的方法。
首先,从功能的角度出发,简要介绍一下映射和缩减,然后更深入地研究Hadoop编程模型及其构架,分配,管理工作的架构和元素。
那么,启发MapReduce编程范例的功能要素是什么? 1958年,约翰·麦卡锡(John McCarthy)发明了一种名为Lisp的语言,该语言既实现了数值计算又实现了符号计算,但是以递归的形式实现,而这种形式对于当今使用的大多数语言都是陌生的。 (实际上,在Wikipedia上Lisp有着令人着迷的历史,其中包括有用的教程,非常值得阅读。)Lisp最早在IBM®704上实现,这是第一台大规模生产的计算机,它还支持另一个古老的收藏:FORTRAN。
map
函数起源于像Lisp这样的功能语言,但现在在许多其他语言中很常见,它是函数在一系列元素上的应用。 这是什么意思? 清单1提供了与Scheme Shell(SCSH)的解释后的会话,它是Lisp的派生词。 第一行定义了一个名为square
的函数,该函数接受一个参数并发出其平方根。 下一行说明了map
函数的用法。 如图所示,使用map
,您可以提供函数以及该函数所应用到的元素列表。 结果是一个包含平方元素的新列表。
> (define square (lambda (x) (* x x)))
> (map square '(1 3 5 7))
'(1 9 25 49)
>
减少也适用于列表,但通常将列表减少为标量值。 清单2中提供的示例说明了另一个SCSH函数,用于将列表简化为标量-在这种情况下,将值列表以(1 +(2 +(3 +(4 +(5)))))的形式求和。 请注意,这是经典的函数式编程,它依赖于迭代的递归。
> (define (list-sum lis) (if (null? lis) 0 (+ (car lis) (list-sum (cdr lis)))))
> (list-sum '(1 2 3 4 5))
15
>
有趣的是,在命令式语言中,递归与迭代一样有效,因为递归在后台被翻译成迭代。
Google引入了MapReduce的想法,将其作为用于处理或生成大量数据的编程模型。 在规范模型中, map
函数处理键值对,从而产生一组中间的键值对。 然后, reduce
函数处理这些中间键/值对,合并关联键的值(请参见图1)。 输入数据以这样一种方式进行分区,即可以将其分布在一组机器中以进行并行处理。 以相同的方式,并行处理生成的中间数据,使该方法非常适合处理大量数据。
为了快速上手,请从map的角度看一下图1的体系结构,并减少字数(因为在本文中您将开发map并减少应用程序)。 提供输入数据后(进入Hadoop文件系统[HDFS]),首先将其分区,然后(通过作业跟踪器)分配给地图工作人员。 尽管图2中的示例显示了一个简短的句子被分区的情况,但是通常由于以下原因,要分区的工作量在128MB大小范围内:花费很少的时间进行设置,因此有更多的工作要做以最小化这开销。 地图工作人员(在规范示例中)将工作分解为单个向量,这些向量包含标记词和初始值(在本例中为1)。 映射任务完成时(由任务跟踪程序在Hadoop中定义),将工作提供给reduce worker。 reduce worker将密钥还原为唯一的集合,该值表示找到的密钥数量。
请注意,此过程可以在同一台计算机上或不同计算机上进行,或者使用不同的数据分区依次或并行进行,结果仍然相同。
尽管规范视图(用于使用字数统计生成搜索索引)是查看Hadoop的一种方法,但事实证明,这种计算模型可以通用地应用于许多计算问题,您将看到。
从图2所示的简单示例中,请注意,两个主要元素是map
和reduce
流程。 尽管对于这些过程的工作方式有传统的看法,但map
和reduce
以这种方式运行并不需要体系结构。 这是Hadoop的真正功能-灵活地实施map
并reduce
以解决特定应用程序方式运行的流程。 字数示例非常有用,并且适用于许多问题,但是其他模型仍然适用于此通用框架。 所需要做的就是开发地图并减少应用程序,使流程对Hadoop可见。
在其他应用程序中,Hadoop甚至被用于实现诸如神经网络,支持向量机和k- means聚类之类的算法的机器学习应用程序( 有关更多信息,请参见参考资料一节)。
尽管Hadoop是基于Java的框架,但可以使用Java语言以外的其他语言编写地图并简化应用程序。 流使这成为可能。 Hadoop中的streaming
实用程序实现了一种数据流胶水。 使用streaming
实用程序,您可以定义自己的映射并减少可执行文件(每个文件都从标准输入[stdin]中获取输入,并通过标准输出[stdout]提供输出),并且streaming
实用程序适当地读写数据,从而以需要(请参见清单3)。
hadoop jar $HADOOP_HOME/hadoop-streaming.jar \-input inputData-output outputData-mapper map_exec-reducer reduce_exec
清单3展示了如何在Hadoop中使用streaming
实用程序,而图3以图形方式显示了如何定义流。 请注意,这是流使用的一个简单示例。 有许多选项可用于调整数据的解析方式,调整图像的调用方式,为分区器或合并器指定替换图像以及其他配置调整( 有关更多信息,请参阅“ 参考资料”部分)。
掌握了streaming
实用程序的基本知识之后,您就可以编写一个简单的Ruby映射并简化应用程序,并了解如何在Hadoop框架内使用这些流程。 此处的示例与规范的MapReduce应用程序一起使用,但是稍后您将看到其他应用程序(以及如何在map和reduce形式中实现它们)。
从映射器开始。 该脚本从stdin获取文本输入,对其进行标记化,然后向stdout发出一组键/值对。 像大多数面向对象的脚本语言一样,此任务几乎太简单了。 清单4中显示了mapper脚本(带有一些注释和空格,使它更大一些)。 该程序使用迭代器从stdin中读取一行,并使用另一个迭代器将该行拆分为单独的标记。 然后将每个标记(单词)以1的关联值(由制表符分隔)发送到stdout。
#!/usr/bin/env ruby# Our input comes from STDIN
STDIN.each_line do |line|# Iterate over the line, splitting the words from the line and emitting# as the word with a count of 1.line.split.each do |word|puts "#{word}\t1"endend
接下来,查看reduce应用程序。 这个稍微复杂一点,但是使用Ruby哈希(关联数组)来简化约简操作(参见清单5)。 该脚本再次处理来自stdin的输入数据(由streaming
实用程序传递),并将行拆分为一个单词和一个值。 然后检查哈希值中的单词; 如果找到,则将计数添加到元素。 否则,您将在单词的哈希中创建一个新条目,然后加载计数(在映射程序中应为1
)。 处理完所有输入后,您只需遍历哈希并将键值对发送到stdout。
#!/usr/bin/env ruby# Create an empty word hash
wordhash = {}# Our input comes from STDIN, operating on each line
STDIN.each_line do |line|# Each line will represent a word and countword, count = line.strip.split# If we have the word in the hash, add the count to it, otherwise# create a new one.if wordhash.has_key?(word)wordhash[word] += count.to_ielsewordhash[word] = count.to_iendend# Iterate through and emit the word counters
wordhash.each {|record, count| puts "#{record}\t#{count}"}
完成map和reduce脚本后,从命令行对其进行测试。 请记住使用chmod +x
将这些文件更改为可执行文件。 首先生成一个输入文件,如清单6所示。
# echo "Hadoop is an implementation of the map reduce framework for " \"distributed processing of large data sets." > input
#
使用此输入,您现在可以测试您的mapper脚本,如清单7所示。回想一下,该脚本只是将输入标记化为键值对,其中每个值将为1
(非唯一输入)。
# cat input | ruby map.rb
Hadoop 1
is 1
an 1
implementation 1
of 1
the 1
map 1
reduce 1
framework 1
for 1
distributed 1
processing 1
of 1
large 1
data 1
sets. 1
#
到目前为止,一切都很好。 现在,以原始流形式(Linux®管道)将整个应用程序组合在一起。 在清单8中,您将输入传递给地图脚本,对输出进行排序(可选步骤),然后将所得的中间数据传递给reducer脚本。
# cat input | ruby map.rb | sort | ruby reduce.rb
large 1
of 2
framework 1
distributed 1
data 1
an 1
the 1
reduce 1
map 1
sets. 1
Hadoop 1
implementation 1
for 1
processing 1
is 1
#
在外壳环境中按预期方式运行地图和归约脚本后,将其与Hadoop一起进行测试。 我将跳过Hadoop设置任务(请参阅本系列的第1 部分或第2部分 ,以启动并运行Hadoop)。
第一步是在HDFS中为您的输入数据创建一个输入目录,然后提供一个示例文件,您将在该文件上测试脚本。 清单9演示了此步骤(有关这些步骤的更多信息,请参见第1部分或第2部分 )。
# hadoop fs -mkdir input
# hadoop dfs -put /usr/src/linux-source-2.6.27/Documentation/memory-barriers.txt input
# hadoop fs -ls input
Found 1 items
-rw-r--r-- 1 root supergroup 78031 2010-06-04 17:36 /user/root/input/memory-barriers.txt
#
接下来,使用streaming
实用程序,使用自定义脚本调用Hadoop,指定输入数据和输出位置(请参见清单10)。 请注意,在此示例中, -file
选项只是告诉Hadoop将Ruby脚本打包为作业提交的一部分。
# hadoop jar /usr/lib/hadoop-0.20/contrib/streaming/hadoop-0.20.2+228-streaming.jar \-file /home/mtj/ruby/map.rb -mapper /home/mtj/ruby/map.rb \-file /home/mtj/ruby/reduce.rb -reducer /home/mtj/ruby/reduce.rb \-input input/* -output output
packageJobJar: [/home/mtj/ruby/map.rb, /home/mtj/ruby/reduce.rb, /var/lib/hadoop-0.20/...
10/06/04 17:42:38 INFO mapred.FileInputFormat: Total input paths to process : 1
10/06/04 17:42:39 INFO streaming.StreamJob: getLocalDirs(): [/var/lib/hadoop-0.20/...
10/06/04 17:42:39 INFO streaming.StreamJob: Running job: job_201006041053_0001
10/06/04 17:42:39 INFO streaming.StreamJob: To kill this job, run:
10/06/04 17:42:39 INFO streaming.StreamJob: /usr/lib/hadoop-0.20/bin/hadoop job ...
10/06/04 17:42:39 INFO streaming.StreamJob: Tracking URL: http://localhost:50030/...
10/06/04 17:42:40 INFO streaming.StreamJob: map 0% reduce 0%
10/06/04 17:43:17 INFO streaming.StreamJob: map 100% reduce 0%
10/06/04 17:43:26 INFO streaming.StreamJob: map 100% reduce 100%
10/06/04 17:43:29 INFO streaming.StreamJob: Job complete: job_201006041053_0001
10/06/04 17:43:29 INFO streaming.StreamJob: Output: output
#
最后,通过hadoop
实用程序使用cat
文件系统操作探索输出(请参见清单11)。
# hadoop fs -ls /user/root/output
Found 2 items
drwxr-xr-x - root supergroup 0 2010-06-04 17:42 /user/root/output/_logs
-rw-r--r-- 1 root supergroup 23014 2010-06-04 17:43 /user/root/output/part-00000
# hadoop fs -cat /user/root/output/part-00000 | head -12
+--->| 4
immediate 2
Alpha) 1
enable 1
_mandatory_ 1
Systems 1
DMA. 2
AMD64 1
{*C,*D}, 2
certainly 2
back 2
this 23
#
因此,在不到30行的脚本中,您已经实现了map
和reduce
元素,并演示了它们在Hadoop框架内的执行。 这是一个简单的示例,但它说明了Hadoop背后的强大功能,以及为什么它成为使用自定义或专有算法处理大型数据集的如此流行的框架。
Hadoop可以用于许多应用程序中,而不仅仅是简单地计算大型数据集的字数。 所需要做的就是以Hadoop基础架构可以使用的矢量形式表示数据。 尽管规范示例使用向量表示作为键和值,但是如何定义值(例如,多个值的集合)没有任何限制。 这种灵活性可以为Hadoop提供更丰富的应用程序的新机会。
正好适合MapReduce字数模型的一个有趣的应用程序是制表Web服务器访问的频率(在开创性的Google论文中讨论)。 对于此应用程序,URL用作键(从Web服务器访问日志中提取)。 reduce
过程的结果是基于Web服务器日志的给定Web站点每个URL的访问总数。
在机器学习应用程序中,Hadoop已被用作扩展遗传算法以处理大量GA个体的方法(潜在解决方案)。 map
过程执行传统的遗传算法,从本地池中寻求最佳个体解决方案。 然后,reduce应用程序将成为地图阶段中各个解决方案的竞赛。 这允许各个节点确定其最佳解决方案,然后允许这些解决方案在分布式的优胜劣汰展示中在还原阶段竞争。
创建了另一个有趣的应用程序来识别垃圾邮件的僵尸网络。 此过程的第一步是对电子邮件进行分类,以减少它们(基于一组指纹)为来自给定组织。 根据此过滤后的数据,为以某种方式连接的电子邮件(例如,引用电子邮件正文中的相同链接)构建了一个图形。 然后,将这些相关的电子邮件减少到主机(静态或动态IP地址)以识别相关的僵尸网络。
在通过地图和简化图元查看世界的应用程序之外,Hadoop可用作在计算机集群之间分配工作的一种方式。 映射和简化不一定强制特定类型的应用程序。 取而代之的是,可以将Hadoop视为将数据和算法分配到主机以进行更快的并行处理的一种方式。
尽管Hadoop提供了一个灵活的框架,但其他应用程序也可以将其接口转换为其他应用程序。 一个有趣的示例称为Hive,它是一种具有自己的查询语言(称为Hive QL )的数据仓库基础结构。 Hive使Hadoop对具有结构化查询语言(SQL)背景的人更加熟悉,但是它也支持传统的MapReduce基础结构来进行数据处理。
HBase是驻留在HDFS之上的另一个有趣的应用程序。 这是一个类似于Google BigTable的高性能数据库系统。 HBase代替了传统的文件处理,使数据库表成为MapReduce处理的输入和输出形式。
最后, Pig是Hadoop上用于分析大型数据集的平台。 Pig提供了一种高级语言,可以进行编译以映射和减少应用程序。
Hadoop 系列的最后一篇文章探讨了针对Hadoop框架的地图开发和减少Ruby中的应用程序。 希望从本文中,您可以看到Hadoop的真正功能。 尽管Hadoop限制您使用特定的编程模型,但是该模型非常灵活,可以应用于大量应用程序。
翻译自: https://www.ibm.com/developerworks/java/library/l-hadoop-3/index.html