热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

hadoop学习(四)Map/Reduce数据分析简述示例电话通讯清单

假如我们集群和伪分布式hadoop系统已经搭建完毕。我们都会根据官网或一些资料提供的wordcount函数来测试我们系统是否能正常工作。假设,我们在执行wordcount函数,都没

    假如我们集群和伪分布式hadoop系统已经搭建完毕。我们都会根据官网或一些资料提供的wordcount函数来测试我们系统是否能正常工作。假设,我们在执行wordcount函数,都没有问题。那我们就可以开始写M/R程序,开始数据分析了。

   因为,hadoop集群,还有其他一些组件需要我们去安装,这里还没有涉及,暂时不考虑。你要做的就是,把要分析的数据上传到HDFS中。至于其余组件,遇到的时候,在学习。这里对概念,不做太多的介绍。必要的概念,和程序执行步骤,这个是必须了解的。

任务要求:分析通话记录,查处每个手机号码有哪些打过来的号码

  — 现有电话通讯清单,记录用户A打给用户B的记录

  — 查找出没有电话号码,都有哪些电话来了

例如:120   13688893333 13733883333 1393302942  

说明:120 有后面3个电话打过来

要实现上述功能,传统处理方面同样也可以实现,但当数据大到一定程度,就会遇到瓶颈:

1、通过c,java程序直接截取上面的数据。

2、把数据直接导入数据库中,直接select也能解决问题。

3、但当数据TB级别的时候,简单的计算的话,传统方法就会遇到瓶颈。这时候M/R就起到作用了,hadoop会将我们上传到HDFS中的数据自动的分配到每一台机器中。

4、mapper程序,开始按行读取数据: 一行行的读入,然后做的是:一个个的分割原始数据、输出所需要数据,处理异常数据(导致数据的崩溃,如何处理异常数据)最后输出到HDFS上。每一行数据都会运行这个mapper,最后这个mapper输出到hdfs上。最后得到的结果在输出到hdfs上。

5、可以没有reduce函数,根据情况而定。不带mapper函数的输出发送到输出文件,map函数的输出格式必须与程序输出格式一致。

6、带有reduce函数任务,系统首先把mapper中输出的key相同的部分都发送到同一个reduce,然后再把reduce函数的结果输出,map函数的输出格式必须和reduce函数的输入格式一致。



Mapper的功能:


1、分割原始数据


2、输出所需数据


3、处理异常数据
Reduce的功能:
1、某一台机器负责一些key,最后通过reduce函数汇总,输出:
源数据:

13599999999 10086
13633333333 120
12678998778 dada13690378933
13833238233 13690378933
12678998778 120
19837366873 10086
16389323733 10086
18373984487 120
19877367388 13690378933
18937933444 13840238422
输出数据:中间用制表符分隔

10086 13599999999|19837366873|16389323733|
120 13633333333|12678998778|18373984487|
13690378933 13833238233|19877367388|
13840238422 18937933444|
测试代码:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;
public class PhoneTest extends Configured implements Tool {
enum Counter {
LINESKIP; // 出错的行
}
@Override
public int run(String[] args) throws Exception {
Configuration cOnf= getConf();
Job job = new Job(conf, "PhoneTest"); // 任务名
job.setJarByClass(PhoneTest.class); // 指定Class
FileInputFormat.addInputPath(job, new Path("hdfs://localhost:9000/home/zhangzhen/input")); // 输入路径
FileOutputFormat.setOutputPath(job, new Path("hdfs://localhost:9000/home/zhangzhen/output")); // 输出路径

job.setMapperClass(Map.class); // 调用Map类作为Mapper任务代码
job.setReducerClass(Reduce.class); // 调用Reduce类作为Reducer任务代码
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class); // 指定输出的Key的格式(KEYOUT)
job.setOutputValueClass(Text.class); // 指定输出的Value的格式(VALUEOUT)
job.waitForCompletion(true);

return job.isSuccessful() ? 0 : 1;
}
public static class Map extends
Mapper { //
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
try {
// key - 行号 value - 一行的文本
String line = value.toString(); //13510000000 10086(13510000000拨打10086)
// 数据处理
String[] lineSplit = line.split(" ");
String phone1 = lineSplit[0];
String phone2 = lineSplit[1];

context.write(new Text(phone2), new Text(phone1)); // 输出 key \t value
} catch (java.lang.ArrayIndexOutOfBoundsException e) {
context.getCounter(Counter.LINESKIP).increment(1); // 出错令计数器+1
}
}
} public static class Reduce extends Reducer {

@Override
protected void reduce(Text key, Iterable values,
Context context)
throws IOException, InterruptedException {
String valueStr;
String out = "";
for(Text value:values){
valueStr = value.toString() + "|";
out += valueStr;
}
// 输出 key \t value(如果我们的输出结果不是key \t value格式,那么我们的key可定义为NullWritable,而value使用key与value的组合。)
context.write(key, new Text(out));
}
} public static void main(String[] args) throws Exception {
//运行任务
int res = ToolRunner.run(new Configuration(), new PhoneTest(), args);
System.exit(res);
}
}
eclipse终端输出:

14/01/22 10:44:47 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
14/01/22 10:44:47 WARN mapred.JobClient: No job jar file set. User classes may not be found. See JobConf(Class) or JobConf#setJar(String).
14/01/22 10:44:47 INFO input.FileInputFormat: Total input paths to process : 1
14/01/22 10:44:47 WARN snappy.LoadSnappy: Snappy native library not loaded
14/01/22 10:44:48 INFO mapred.JobClient: Running job: job_local1140955208_0001
14/01/22 10:44:48 INFO mapred.LocalJobRunner: Waiting for map tasks
14/01/22 10:44:48 INFO mapred.LocalJobRunner: Starting task: attempt_local1140955208_0001_m_000000_0
14/01/22 10:44:48 INFO util.ProcessTree: setsid exited with exit code 0
14/01/22 10:44:48 INFO mapred.Task: Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@99548b
14/01/22 10:44:48 INFO mapred.MapTask: Processing split: hdfs://localhost:9000/home/zhangzhen/input/tonghua.txt:0+202
14/01/22 10:44:48 INFO mapred.MapTask: io.sort.mb = 100
14/01/22 10:44:48 INFO mapred.MapTask: data buffer = 79691776/99614720
14/01/22 10:44:48 INFO mapred.MapTask: record buffer = 262144/327680
14/01/22 10:44:48 INFO mapred.MapTask: Starting flush of map output
14/01/22 10:44:48 INFO mapred.MapTask: Finished spill 0
14/01/22 10:44:48 INFO mapred.Task: Task:attempt_local1140955208_0001_m_000000_0 is done. And is in the process of commiting
14/01/22 10:44:48 INFO mapred.LocalJobRunner:
14/01/22 10:44:48 INFO mapred.Task: Task ‘attempt_local1140955208_0001_m_000000_0‘ done.
14/01/22 10:44:48 INFO mapred.LocalJobRunner: Finishing task: attempt_local1140955208_0001_m_000000_0
14/01/22 10:44:48 INFO mapred.LocalJobRunner: Map task executor complete.
14/01/22 10:44:48 INFO mapred.Task: Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@1457048
14/01/22 10:44:48 INFO mapred.LocalJobRunner:
14/01/22 10:44:48 INFO mapred.Merger: Merging 1 sorted segments
14/01/22 10:44:48 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 194 bytes
14/01/22 10:44:48 INFO mapred.LocalJobRunner:
14/01/22 10:44:48 INFO mapred.Task: Task:attempt_local1140955208_0001_r_000000_0 is done. And is in the process of commiting
14/01/22 10:44:48 INFO mapred.LocalJobRunner:
14/01/22 10:44:48 INFO mapred.Task: Task attempt_local1140955208_0001_r_000000_0 is allowed to commit now
14/01/22 10:44:48 INFO output.FileOutputCommitter: Saved output of task ‘attempt_local1140955208_0001_r_000000_0‘ to hdfs://localhost:9000/home/zhangzhen/output
14/01/22 10:44:49 INFO mapred.LocalJobRunner: reduce > reduce
14/01/22 10:44:49 INFO mapred.Task: Task ‘attempt_local1140955208_0001_r_000000_0‘ done.
14/01/22 10:44:49 INFO mapred.JobClient: map 100% reduce 100%
14/01/22 10:44:49 INFO mapred.JobClient: Job complete: job_local1140955208_0001
14/01/22 10:44:49 INFO mapred.JobClient: Counters: 23
14/01/22 10:44:49 INFO mapred.JobClient: PhoneAnalyzer$Counter
14/01/22 10:44:49 INFO mapred.JobClient: LINESKIP=1
14/01/22 10:44:49 INFO mapred.JobClient: File Output Format Counters
14/01/22 10:44:49 INFO mapred.JobClient: Bytes Written=146
14/01/22 10:44:49 INFO mapred.JobClient: File Input Format Counters
14/01/22 10:44:49 INFO mapred.JobClient: Bytes Read=202
14/01/22 10:44:49 INFO mapred.JobClient: FileSystemCounters
14/01/22 10:44:49 INFO mapred.JobClient: FILE_BYTES_READ=568
14/01/22 10:44:49 INFO mapred.JobClient: HDFS_BYTES_READ=404
14/01/22 10:44:49 INFO mapred.JobClient: FILE_BYTES_WRITTEN=136400
14/01/22 10:44:49 INFO mapred.JobClient: HDFS_BYTES_WRITTEN=146
14/01/22 10:44:49 INFO mapred.JobClient: Map-Reduce Framework
14/01/22 10:44:49 INFO mapred.JobClient: Reduce input groups=4
14/01/22 10:44:49 INFO mapred.JobClient: Map output materialized bytes=198
14/01/22 10:44:49 INFO mapred.JobClient: Combine output records=0
14/01/22 10:44:49 INFO mapred.JobClient: Map input records=10
14/01/22 10:44:49 INFO mapred.JobClient: Reduce shuffle bytes=0
14/01/22 10:44:49 INFO mapred.JobClient: Physical memory (bytes) snapshot=0
14/01/22 10:44:49 INFO mapred.JobClient: Reduce output records=4
14/01/22 10:44:49 INFO mapred.JobClient: Spilled Records=18
14/01/22 10:44:49 INFO mapred.JobClient: Map output bytes=174
14/01/22 10:44:49 INFO mapred.JobClient: Total committed heap usage (bytes)=340262912
14/01/22 10:44:49 INFO mapred.JobClient: CPU time spent (ms)=0
14/01/22 10:44:49 INFO mapred.JobClient: Virtual memory (bytes) snapshot=0
14/01/22 10:44:49 INFO mapred.JobClient: SPLIT_RAW_BYTES=119
14/01/22 10:44:49 INFO mapred.JobClient: Map output records=9
14/01/22 10:44:49 INFO mapred.JobClient: Combine input records=0
14/01/22 10:44:49 INFO mapred.JobClient: Reduce input records=9
运行结束后,你可以通过你设置的路径找到输出的文件,这里可以查看输出的文件;同样你也可以在eclipse中,如下图刷新DFS目录,就可以看到input和output两个目录,目录下的文件也可以在eclipse中打开查阅。
测试截图:
bubuko.com,布布扣
bubuko.com,布布扣
bubuko.com,布布扣

Copyright?BUAA



推荐阅读
  • 本文详细介绍了PHP中的几种超全局变量,包括$GLOBAL、$_SERVER、$_POST、$_GET等,并探讨了AJAX的工作原理及其优缺点。通过具体示例,帮助读者更好地理解和应用这些技术。 ... [详细]
  • 使用 ModelAttribute 实现页面数据自动填充
    本文介绍了如何利用 Spring MVC 中的 ModelAttribute 注解,在页面跳转后自动填充表单数据。主要探讨了两种实现方法及其背后的原理。 ... [详细]
  • 使用IntelliJ IDEA高效开发与运行Shell脚本
    本文介绍了如何利用IntelliJ IDEA中的BashSupport插件来增强Shell脚本的开发体验,包括插件的安装、配置以及脚本的运行方法。 ... [详细]
  • 本文详细介绍了Python中的生成器表达式、列表推导式、字典推导式及集合推导式等,探讨了它们之间的差异,并提供了丰富的代码示例。 ... [详细]
  • 利用Cookie实现用户登录状态的持久化
    本文探讨了如何使用Cookie技术在Web应用中实现用户登录状态的持久化,包括Cookie的基本概念、优势及主要操作方法,并通过一个简单的Java Web项目示例展示了具体实现过程。 ... [详细]
  • 探索OpenWrt中的LuCI框架
    本文深入探讨了OpenWrt系统中轻量级HTTP服务器uhttpd的工作原理及其配置,重点介绍了LuCI界面的实现机制。 ... [详细]
  • 本文概述了在GNU/Linux系统中,动态库在链接和运行阶段的搜索路径及其指定方法,包括通过编译时参数、环境变量及系统配置文件等方式来控制动态库的查找路径。 ... [详细]
  • 使用REM和媒体查询实现响应式布局
    本文介绍如何利用REM单位和媒体查询(Media Queries)来创建适应不同屏幕尺寸的网页布局。通过具体示例,展示在不同屏幕宽度下如何调整页面元素的样式。 ... [详细]
  • 本文详细对比了HashMap和HashTable在多线程环境下的安全性、对null值的支持、性能表现以及方法同步等方面的特点,帮助开发者根据具体需求选择合适的数据结构。 ... [详细]
  • selenium通过JS语法操作页面元素
    做过web测试的小伙伴们都知道,web元素现在很多是JS写的,那么既然是JS写的,可以通过JS语言去操作页面,来帮助我们操作一些selenium不能覆盖的功能。问题来了我们能否通过 ... [详细]
  • IIS6批量添加主机头,修改IIS数据库
    首先,找到IIS的数据库。默认是在C:\WINDOWS\system32\inetsrv下的MetaBase.xml文件。如果找不到,请右键右键站点-》所有服务-》将配置保存到一个 ... [详细]
  • 本文介绍了基于Java的在线办公工作流系统的毕业设计方案,涵盖了MyBatis框架的应用、源代码分析、调试与部署流程、数据库设计以及相关论文撰写指导。 ... [详细]
  • 本文探讨了Java编程语言中常用的两个比较操作符==和equals方法的区别及其应用场景。通过具体示例分析,帮助开发者更好地理解和使用这两个概念,特别是在处理基本数据类型和引用数据类型的比较时。 ... [详细]
  • 本文详细介绍了在PHP中如何获取和处理HTTP头部信息,包括通过cURL获取请求头信息、使用header函数发送响应头以及获取客户端HTTP头部的方法。同时,还探讨了PHP中$_SERVER变量的使用,以获取客户端和服务器的相关信息。 ... [详细]
  • 将XML数据迁移至Oracle Autonomous Data Warehouse (ADW)
    随着Oracle ADW的推出,数据迁移至ADW成为业界关注的焦点。特别是XML和JSON这类结构化数据的迁移需求日益增长。本文将通过一个实际案例,探讨如何高效地将XML数据迁移至ADW。 ... [详细]
author-avatar
N01小贱_652
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有