热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Hadoop/R集成I:流处理

如果您平常一直使用MapReduce框架,那么您可能知道单词计数示例是MapReduce的相当于“HelloWorld!”的一个例子。在之前的帖子中

如果您平常一直使用MapReduce框架,那么您可能知道"单词计数示例"是MapReduce的相当于“Hello World!”的一个例子。在之前的帖子中,我试图稍作改动,但现在也有一个同样简单的问题 - 按州来计算,计算房利美(Fannie Mae)地产公司所募集的按揭证券的美元总价的新问题。

到目前为止,我已经使用了“直接的”Java和Pig,现在我将注意力转向R(语言).在这篇文章的例子完成之后,我们将讨论在该情况下R语言的独特之处,以及为什么字数统计类型的例子不会“真的做正义。在此之前,我会提及我在这里使用的主要参考资料是Joseph Adler的R in a Nutshell(请参阅第26章)和Tom White的Hadoop:权威指南(第2章)。

在这里我还是要推荐下我自己建的大数据学习交流qq裙:522189307 , 裙 里都是学大数据开发的,如果你正在学习大数据 ,小编欢迎你加入,大家都是软件开发党,不定期分享干货(只有大数据开发相关的),包括我自己整理的一份最新的大数据进阶资料和高级开发教程,欢迎进阶中和进想深入大数据的小伙伴。上述资料加群可以领取

有很多方法可以将R语言与Hadoop结合使用,其中包括:

Hadoop流媒体,这篇文章的主角
RHadoop,R/Hadoop的集成(请参阅RHadoop Wiki),这是将在未来发布的文章的主角。
RHIPE(发音为hree- pay),另一个R/Hadoop的集成。
由于我在本博客中试图涵盖的主题十分广泛,因此我将限制自己使用流式传输和RHadoop。

概览

在Hadoop流中,您的mapper,reducer和可选的组合器进程(combiner processes)被写入从标准输入读取并写入标准输出。一旦流程脚本和数据准备就绪,您只需使用带有一些命令行属性的流式二进制文件就能调用Hadoop。

正如在之前的文章中一样,我将从房利美的新问题统计库(NIPS,即New Issue Pool Statistics)文件中获取数据。想要了解更多信息,请参阅上 一篇文章。我将使用与该文章中相同的数据,因此我们可以期待结果能够与前面精确匹配。

The Mapper

NIPS文件有一点复杂,因为它们包含许多不同格式的记录(在 这里查看所有格式)。我们将查看记录类型9的数据来进行我们的分析,即“GEOGRAPHIC DISTRIBUTION(地理分布)”。我们感兴趣的是第3栏(州名)和第6栏(总的未付余额)。由于在单个文件中混合了多种记录格式,因此我们首先在管道定界符上将文件分割并丢弃非9类记录。我们需要做的就是输出状态名称和累加未付余额,每个类型9行包含了1个实例。

我使用RStudio来编写R脚本,这是一个我通过Coursera上的Roger Peng的Computing for Data Analysis课程了解到的IDE 。在RStudio中进行过交互式脚本构建会话后,我制作了以下测试脚本:

#! /usr/bin/env Rscript
conn <- file("/home/hduser/fannie-mae-nips/nips_12262012.txt", open&#61;"r")
while (length(next.line <- readLines(conn, n&#61;1)) > 0) {
split.line <- strsplit(next.line, "\\|")
if (as.numeric(split.line[[1]][2]) &#61;&#61; 9) {
write(paste(split.line[[1]][3],
gsub("[$,]", "", split.line[[1]][6]), sep&#61;"\t"), stdout())
}
}
close(conn)
然后我从shell中调用并得到以下输出&#xff0c;截取了一小段&#xff1a;

CALIFORNIA 167300.00
FLORIDA 395950.00
GEORGIA 69500.00
ILLINOIS 235200.00
MICHIGAN 781950.00
NEW JERSEY 284550.00
OHIO 334175.00
由于这看起来很干净&#xff0c;我稍微修改了mapper以生成最终版本&#xff1a;

#! /usr/bin/env Rscript
conn <- file("stdin", open&#61;"r")
while (length(next.line <- readLines(conn, n&#61;1)) > 0) {
split.line <- strsplit(next.line, "\\|")
if (as.numeric(split.line[[1]][2]) &#61;&#61; 9) {
write(paste(split.line[[1]][3],
gsub("[$,]", "", split.line[[1]][6]), sep&#61;"\t"), stdout())
}
}
close(conn)
请注意获取strsplit的结果的下标&#xff1a;strsplit返回一个列表&#xff0c;因此文件记录的第2个字段实际上是列表中第一个元素的元素2&#xff0c;它是解析字段的向量。如果您需要这个结果的详细说明&#xff0c;请参阅Phil Spector的Data Manipulation with R(使用R的数据操作)的“Subscripting(下标)”一章 _。另外请注意&#xff0c;gsubto的紧凑型使用可从汇总未付余额中删除美元符号和逗号。

The Reducer

我们的reducer也将从stdin中读取数据&#xff0c;其中Hadoop运行环境保证了以下内容&#xff1a;

如果reducer遇到一个关键字&#xff0c;那么就reducer知道带有该关键字的所有记录都被发送到了该reducer&#xff0c;所以它可以产生一个输出&#xff0c;并知道它已经被赋予了该关键字代表的所有记录;
陆续传入的记录会按键排序&#xff0c;因此reducer知道&#xff0c;当某个键发生更改时&#xff0c;未更改之前键的所有记录都已在流中遇到过。
在我们的reducer中&#xff0c;有两个变量&#xff1a;一个用于追踪哪个键正在被处理&#xff0c;另一个用于保存来自给定状态的抵押贷款的总的未支付余额。一旦某个键发生变化&#xff0c;我们将(使用其键)输出当前的running total出并重置running balance(解释看代码&#xff0c;running total和runing balance都是作者自己起的名字&#xff0c;并没有特殊含义)&#xff1a;

#! /usr/bin/env Rscript
current.key <- NA
current.upb <- 0.0
conn <- file("stdin", open&#61;"r")
while (length(next.line <- readLines(conn, n&#61;1)) > 0) {
split.line <- strsplit(next.line, "\t")
key <- split.line[[1]][1]
upb <- as.numeric(split.line[[1]][2])
if (is.na(current.key)) {
current.key <- key
current.upb <- upb
}
else {
if (current.key &#61;&#61; key) {
current.upb <- current.upb &#43; upb
}
else {
write(paste(current.key, current.upb, sep&#61;"\t"), stdout())
current.key <- key
current.upb <- upb
}
}
}
write(paste(current.key, current.upb, sep&#61;"\t"), stdout())
close(conn)
现在&#xff0c;如果我想在单个文件上测试这个reducer&#xff0c;但是我遇到了一个小问题 - 我的mapper没有对输出进行排序&#xff08;因为按常理来说不需要&#xff09;&#xff0c;但是我的reducer希望数据是按键排序的。我可以等着看最后的数字是怎么出来的&#xff0c;但由于流式传输只涉及stdin输出到标准输入&#xff0c;我有点好奇这个任务在Hadoop之外运行的速度可以有多快&#xff08;我没有真正去比较&#xff0c;针对简单的单节点集群; 我只是好奇&#xff09;。而且我还在学习R&#xff0c;所以接下来我编写了一个脚本来按记录键对行进行排序&#xff1a;

#! /usr/bin/env Rscript
conn <- file("stdin", open&#61;"r")
all.lines <- readLines(conn)
write(sort(all.lines), stdout())
close(conn)
在这种情况下&#xff0c;我能明确回忆起我为什么这么喜欢R语言(即你就能明白R的强大之处)&#xff01;接下来&#xff0c;我将使用mapper的“测试”版来处理单个文件&#xff1a;

./map.test.R | ./map.output.sorter.R | ./reduce.R
并为单个NIPS文件获得如下输出&#xff08;缩写&#xff09;&#xff1a;

ALABAMA 72699735.21
ALASKA 6883209.62
ARIZONA 287482321.1
ARKANSAS 21579003.98
CALIFORNIA 1811342276.77
...
VIRGIN ISLANDS 1021750
WASHINGTON 239648997.97
WEST VIRGINIA 9925894.94
WISCONSIN 72752945.87
WYOMING 6232557.56
使用R在Hadoop中进行流式传输

现在我们有了一个mapper和一个reducer&#xff0c;我们可以在Hadoop中处理整个数据集。我将处理与我之前的Hadoop-Java-Pig那个帖子中相同的数据集&#xff0c;即2012年8月23日至12月26日的NIPS数据。正如在那篇文章中所展示的&#xff0c;我以伪分布模式运行Hadoop&#xff0c;使用来自HDFS的数据。当然&#xff0c;这里的区别在于我指定了使用流式处理&#xff0c;并提供了我的mapper和Reducer R脚本。我从Hadoop主目录启动&#xff1a;

bin / hadoop jar $ HADOOP\_PREFIX / contrib / streaming / hadoop-streaming-1.1.0.jar -input / user / hduser / fannie-mae-nips -output / user / hduser / fannie-mae-nips -r-output -mapper /home/hduser/RScripts/map.R -reducer /home/hduser/RScripts/reduce.R
那么&#xff0c;我的努力取得了什么结果&#xff1f;从HDFS复制出我的结果文件&#xff1a;

bin / hadoop dfs -copyToLocal / user / hduser / fannie-mae-nips -r-output / part-00000 rResults.txt
产生以下输出&#xff08;这里是缩写&#xff09;&#xff1a;

ALABAMA 3242681838.89999
ALASKA 841797447.200001
ARIZONA 9340767235.06001
ARKANSAS 1452136751.9
CALIFORNIA 89114642822.0799
...
VERMONT 553060435.67
VIRGIN ISLANDS 33604327.46
VIRGINIA 12706719836.48
WASHINGTON 13194248475.54
WEST VIRGINIA 486889587.57
WISCONSIN 8140391871.79
WYOMING 720905726.84
我仍然保存着使用Java和Pig在这个相同的数据集上的输出; 仔细阅读此输出将凸显出以下输出&#xff08;请注意&#xff0c;由于数字以不同的格式输出&#xff0c;因此我没有区分这些文件&#xff09;&#xff1a;

ALABAMA 3.242681838899994E9
ALASKA 8.417974472000003E8
ARIZONA 9.340767235060005E9
ARKANSAS 1.452136751900001E9
CALIFORNIA 8.91146428220799E10
....
VERMONT 5.530604356700001E8
VIRGIN ISLANDS 3.360432746000001E7
VIRGINIA 1.2706719836479996E10
WASHINGTON 1.319424847554002E10
WEST VIRGINIA 4.868895875700002E8
WISCONSIN 8.140391871790002E9
WYOMING 7.209057268400007E8
因此&#xff0c;我成功地使用R和Hadoop流处理复制了使用Java和Pig进行的示例。

关于Hadoop和R的最终评论

如果你完全熟悉R&#xff0c;你就会明白R并不是一种你为了分割输出和数字求和而选择的语言; 该语言及其库包含丰富的功能。这篇文章的重点主要是过一遍R与Hadoop流处理的机械式细节(即使用R与流处理的固定步骤)。R真正发光的地方在于&#xff0c;如果是一些“繁重的工作”&#xff0c;R很容易就能将其分解为Mapper风格和Reducer风格的任务。例如&#xff0c;如果您正针对庞大的数据集进行线性回归操作&#xff0c;使用了大量的变量&#xff0c;或者如果您正在对大型数据集执行Shapiro-Wilk测试&#xff0c;则可以将作业分解为并行任务&#xff0c;最后将它们与Reducer相结合&#xff0c;这将成为Hadoop/R协同工作的一个很好的例子。有关R中的并行计算的更多信息&#xff0c;请查阅 R in a Nutshell&#xff0c;特别是他在本章最后的注明的“在哪里了解更多”部分。


推荐阅读
  • Linux服务器密码过期策略、登录次数限制、私钥登录等配置方法
    本文介绍了在Linux服务器上进行密码过期策略、登录次数限制、私钥登录等配置的方法。通过修改配置文件中的参数,可以设置密码的有效期、最小间隔时间、最小长度,并在密码过期前进行提示。同时还介绍了如何进行公钥登录和修改默认账户用户名的操作。详细步骤和注意事项可参考本文内容。 ... [详细]
  • 本文介绍了Web学习历程记录中关于Tomcat的基本概念和配置。首先解释了Web静态Web资源和动态Web资源的概念,以及C/S架构和B/S架构的区别。然后介绍了常见的Web服务器,包括Weblogic、WebSphere和Tomcat。接着详细讲解了Tomcat的虚拟主机、web应用和虚拟路径映射的概念和配置过程。最后简要介绍了http协议的作用。本文内容详实,适合初学者了解Tomcat的基础知识。 ... [详细]
  • 本文介绍了三种方法来实现在Win7系统中显示桌面的快捷方式,包括使用任务栏快速启动栏、运行命令和自己创建快捷方式的方法。具体操作步骤详细说明,并提供了保存图标的路径,方便以后使用。 ... [详细]
  • Go Cobra命令行工具入门教程
    本文介绍了Go语言实现的命令行工具Cobra的基本概念、安装方法和入门实践。Cobra被广泛应用于各种项目中,如Kubernetes、Hugo和Github CLI等。通过使用Cobra,我们可以快速创建命令行工具,适用于写测试脚本和各种服务的Admin CLI。文章还通过一个简单的demo演示了Cobra的使用方法。 ... [详细]
  • 本文介绍了Android中的assets目录和raw目录的共同点和区别,包括获取资源的方法、目录结构的限制以及列出资源的能力。同时,还解释了raw目录中资源文件生成的ID,并说明了这些目录的使用方法。 ... [详细]
  • 怎么在PHP项目中实现一个HTTP断点续传功能发布时间:2021-01-1916:26:06来源:亿速云阅读:96作者:Le ... [详细]
  • 深入理解Kafka服务端请求队列中请求的处理
    本文深入分析了Kafka服务端请求队列中请求的处理过程,详细介绍了请求的封装和放入请求队列的过程,以及处理请求的线程池的创建和容量设置。通过场景分析、图示说明和源码分析,帮助读者更好地理解Kafka服务端的工作原理。 ... [详细]
  • 本文介绍了如何使用Express App提供静态文件,同时提到了一些不需要使用的文件,如package.json和/.ssh/known_hosts,并解释了为什么app.get('*')无法捕获所有请求以及为什么app.use(express.static(__dirname))可能会提供不需要的文件。 ... [详细]
  • mac php错误日志配置方法及错误级别修改
    本文介绍了在mac环境下配置php错误日志的方法,包括修改php.ini文件和httpd.conf文件的操作步骤。同时还介绍了如何修改错误级别,以及相应的错误级别参考链接。 ... [详细]
  • 本文介绍了Swing组件的用法,重点讲解了图标接口的定义和创建方法。图标接口用来将图标与各种组件相关联,可以是简单的绘画或使用磁盘上的GIF格式图像。文章详细介绍了图标接口的属性和绘制方法,并给出了一个菱形图标的实现示例。该示例可以配置图标的尺寸、颜色和填充状态。 ... [详细]
  • SpringMVC接收请求参数的方式总结
    本文总结了在SpringMVC开发中处理控制器参数的各种方式,包括处理使用@RequestParam注解的参数、MultipartFile类型参数和Simple类型参数的RequestParamMethodArgumentResolver,处理@RequestBody注解的参数的RequestResponseBodyMethodProcessor,以及PathVariableMapMethodArgumentResol等子类。 ... [详细]
  • Android工程师面试准备及设计模式使用场景
    本文介绍了Android工程师面试准备的经验,包括面试流程和重点准备内容。同时,还介绍了建造者模式的使用场景,以及在Android开发中的具体应用。 ... [详细]
  • VueCLI多页分目录打包的步骤记录
    本文介绍了使用VueCLI进行多页分目录打包的步骤,包括页面目录结构、安装依赖、获取Vue CLI需要的多页对象等内容。同时还提供了自定义不同模块页面标题的方法。 ... [详细]
  • 本文介绍了一种轻巧方便的工具——集算器,通过使用集算器可以将文本日志变成结构化数据,然后可以使用SQL式查询。集算器利用集算语言的优点,将日志内容结构化为数据表结构,SPL支持直接对结构化的文件进行SQL查询,不再需要安装配置第三方数据库软件。本文还详细介绍了具体的实施过程。 ... [详细]
  • 微软评估和规划(MAP)的工具包介绍及应用实验手册
    本文介绍了微软评估和规划(MAP)的工具包,该工具包是一个无代理工具,旨在简化和精简通过网络范围内的自动发现和评估IT基础设施在多个方案规划进程。工具包支持库存和使用用于SQL Server和Windows Server迁移评估,以及评估服务器的信息最广泛使用微软的技术。此外,工具包还提供了服务器虚拟化方案,以帮助识别未被充分利用的资源和硬件需要成功巩固服务器使用微软的Hyper - V技术规格。 ... [详细]
author-avatar
青春梦敲门砖
我滴妈呀!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有