热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

大数据之mapreduce小实战

手写wordcount的程序1、pom.xml

手写wordcount的程序

1、pom.xml

<dependencies>
<dependency>
<groupId>org.apache.hadoopgroupId>
<artifactId>hadoop-clientartifactId>
<version>2.7.3version>
dependency>
dependencies>

2、新建Mapper类

package com.hsiehchou.wordcount;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* 海量数据
*
* hello hsiehchou
* nihao
*
* 数据的输入与输出以Key value进行传输
* keyIN:LongWritable(Long) 数据的起始偏移量
* valuewIN:具体数据
*
* mapper需要把数据传递到reducer阶段&#xff08;&#xff09;
* keyOut:单词 Text
* valueOut:出现的次数IntWritable
*
*/
public class WordCountMapper extends Mapper, Text, Text, IntWritable> {
//对数据进行打散 ctrl&#43;o
&#64;Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//1、接入数据 hello nihao
String line &#61; value.toString();
//2、对数据进行切分
String[] words &#61; line.split(" ");
//3、写出以
for (String w:words){
//写出reducer端
context.write(new Text(w), new IntWritable(1));
}
}
}

3、新建Reducer类

package com.hsiehchou.wordcount;
import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
* reducer阶段接收的是Mapper输出的数据
* mapper的输出是reducer输入
*
* keyIn:mapper输出的key的类型
* valueIn:mapper输出的value的类型
*
* reducer端输出的数据类型&#xff0c;想要一个什么样的结果
* keyOut:Text
* valueOut:IntWritalble
*
*/
public class WordCountReducer extends Reducer, IntWritable, Text, IntWritable> {
//key-->单词 value-->次数
&#64;Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
//1、记录出现的次数
int sum &#61; 0;
for (IntWritable v:values){
sum &#43;&#61; v.get();
}
//2、l累加求和输出
context.write(key, new IntWritable(sum));
}
}

4、新建驱动类

package com.hsiehchou.wordcount;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class WordCountDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//1、创建job任务
Configuration conf &#61; new Configuration();
Job job &#61; Job.getInstance(conf);
//2、指定jar包位置
job.setJarByClass(WordCountDriver.class);
//3、关联使用的Mapper类
job.setMapperClass(WordCountMapper.class);
//4、关联使用的Reducer类
job.setReducerClass(WordCountReducer.class);
//5、设置mapper阶段输出的数据类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//6、设置reducer阶段输出的数据类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//7、设置数据输入的路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
//8设置数据输出的路径
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//9、提交任务
boolean rs &#61; job.waitForCompletion(true);
System.exit(rs ? 0:1);
}
}
运行结果

[root&#64;hsiehchou121 ~]# hadoop jar mapreduce-1.0-SNAPSHOT.jar com.hsiehchou.wordcount.WordCountDriver /wc/in /wc/out
[root&#64;hsiehchou121 ~]# hdfs dfs -cat /wc/out/part-r-00000
fd 1
fdgs 1
fdsbv 1
gd 1
hello 3

5、IDEA的相关使用

Ctrl&#43;O导入相关未实现的方法
Maven中的Lifecycle的package可以直接打包成jar

案例分析

需求&#xff1a;运营商流量日志
10086
计算每个用户当前使用的总流量
思路&#xff1f;总流量 &#61; 上行流量&#43;下行流量
三个字段&#xff1a;手机号 上行流量 下行流量
技术选型&#xff1a;PB&#43;
数据分析&#xff1a;海量数据(存储hdfs)
海量数据计算(分布式计算框架MapReduce)

实现

FlowBean类

package com.hsiehchou.logs;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/**
* 封装数据类型需要怎么做
* hadoop数据类型实现了序列化接口
* 如果自定义需要实现这个序列化接口
*/
public class FlowBean implements Writable {
//定义属性&#xff1a;上行流量 下行流量 总流量总和
private long upFlow;
private long dfFlow;
private long flowsum;
public FlowBean(){}
public FlowBean(long upFlow, long dfFlow){
this.upFlow &#61; upFlow;
this.dfFlow &#61; dfFlow;
this.flowsum &#61; upFlow &#43; dfFlow;
}
public long getUpFlow(){
return upFlow;
}
public void setUpFlow(long upFlow){
this.upFlow &#61; upFlow;
}
public long getDfFlow(){
return dfFlow;
}
public void setDfFlow(long dfFlow){
this.dfFlow &#61; dfFlow;
}
public long getFlowsum(){
return flowsum;
}
public void setFlowsum(long flowsum){
this.flowsum &#61; flowsum;
}
//序列化
public void write(DataOutput out) throws IOException {
out.writeLong(upFlow);
out.writeLong(dfFlow);
out.writeLong(flowsum);
}
//反序列化
public void readFields(DataInput in) throws IOException {
upFlow &#61; in.readLong();
dfFlow &#61; in.readLong();
flowsum &#61; in.readLong();
}
&#64;Override
public String toString() {
return upFlow &#43; "\t" &#43; dfFlow &#43; "\t" &#43; flowsum;
}
}
FlowCountMapper类

package com.hsiehchou.logs;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* keyIN:
* valueIN:
*
* 思路&#xff1a;根据想要的结果的kv类型 手机号 流量总和&#xff08;上行&#43;下行&#xff09;自定义类
* keyOut:
* valueOut:
*/
public class FlowCountMapper extends Mapper, Text, Text, FlowBean> {
&#64;Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//1、接入数据源
String line &#61; value.toString();
//2、切割 \t
String[] fields &#61; line.split("\t");
//3、拿到关键字段
String phoneNr &#61; fields[1];
long upFlow &#61; Long.parseLong(fields[fields.length - 3]);
long dfFlow &#61; Long.parseLong(fields[fields.length - 2]);
//4、写出到reducer
context.write(new Text(phoneNr), new FlowBean(upFlow,dfFlow));
}
}
FlowCountReducer类

package com.hsiehchou.logs;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class FlowCountReducer extends Reducer, FlowBean, Text, FlowBean> {
&#64;Override
protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {
long upFlow_sum &#61; 0;
long dfFlow_sum &#61; 0;
for (FlowBean v:values){
upFlow_sum &#43;&#61; v.getUpFlow();
dfFlow_sum &#43;&#61; v.getDfFlow();
}
FlowBean rsSum &#61; new FlowBean(upFlow_sum, dfFlow_sum);
//输出结果
context.write(key, rsSum);
}
}
FlowCountDriver类

package com.hsiehchou.logs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class FlowCountDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//1.创建job任务
Configuration conf &#61; new Configuration();
Job job &#61; Job.getInstance(conf);
//2.指定kjar包位置
job.setJarByClass(FlowCountDriver.class);
//3.关联使用的Mapper
job.setMapperClass(FlowCountMapper.class);
//4.关联使用的Reducer类
job.setReducerClass(FlowCountReducer.class);
//5.设置mapper阶段输出的数据类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);
//6.设置reducer阶段输出的数据类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
//优化含有大量小文件的数据
//设置读取数据切片的类
job.setInputFormatClass(CombineTextInputFormat.class);
//最大切片大小8M
CombineTextInputFormat.setMaxInputSplitSize(job, 8388608);
//最小切片大小6M
CombineTextInputFormat.setMinInputSplitSize(job, 6291456);
//7.设置数据输入的路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
//8.设置数据输出的路径
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//9.提交任务
boolean rs &#61; job.waitForCompletion(true);
System.exit(rs? 0:1);
}
}
运行结果

[root&#64;hsiehchou121 ~]# hdfs dfs -mkdir -p /flow/in
[root&#64;hsiehchou121 ~]# hdfs dfs -put HTTP_20180313143750.dat /flow/in
[root&#64;hsiehchou121 ~]# hadoop jar mapreduce-1.0-SNAPSHOT.jar com.hsiehchou.logs.FlowCountDriver /flow/in /flow/out
[root&#64;hsiehchou121 ~]# hdfs dfs -cat /flow/out/part-r-00000
13480253104 120 1320 1440
13502468823 735 11349 12084
13510439658 1116 954 2070
13560436326 1136 94 1230
13560436666 1136 94 1230
13560439658 918 4938 5856
13602846565 198 910 1108
13660577991 660 690 1350
13719199419 240 0 240
13726130503 299 681 980
13726238888 2481 24681 27162
13760778710 120 120 240
13822544101 264 0 264
13884138413 4116 1432 5548
13922314466 3008 3720 6728
13925057413 11058 4243 15301
13926251106 240 0 240
13926435656 132 1512 1644
15013685858 369 338 707
15889002119 938 380 1318
15920133257 316 296 612
18212575961 1527 2106 3633
18320173382 9531 212 9743

小文件优化

如果企业中存在海量的小文件数据
TextInputFormat按照文件规划切片&#xff0c;文件不管多小都是一个单独的切片&#xff0c;启动mapt
ask任务去执行&#xff0c;这样会产生大量的maptask&#xff0c;浪费资源。
优化手段&#xff1a;
小文件合并大文件&#xff0c;如果不动这个小文件内容。

转:https://www.cnblogs.com/hsiehchou/p/10403452.html



推荐阅读
  • 技术日志:深入探讨Spark Streaming与Spark SQL的融合应用
    技术日志:深入探讨Spark Streaming与Spark SQL的融合应用 ... [详细]
  • 如何高效启动大数据应用之旅?
    在前一篇文章中,我探讨了大数据的定义及其与数据挖掘的区别。本文将重点介绍如何高效启动大数据应用项目,涵盖关键步骤和最佳实践,帮助读者快速踏上大数据之旅。 ... [详细]
  • HBase在金融大数据迁移中的应用与挑战
    随着最后一台设备的下线,标志着超过10PB的HBase数据迁移项目顺利完成。目前,新的集群已在新机房稳定运行超过两个月,监控数据显示,新集群的查询响应时间显著降低,系统稳定性大幅提升。此外,数据消费的波动也变得更加平滑,整体性能得到了显著优化。 ... [详细]
  • com.sun.javadoc.PackageDoc.exceptions()方法的使用及代码示例 ... [详细]
  • 本文介绍如何使用 Python 的 DOM 和 SAX 方法解析 XML 文件,并通过示例展示了如何动态创建数据库表和处理大量数据的实时插入。 ... [详细]
  • 本文探讨了 Kafka 集群的高效部署与优化策略。首先介绍了 Kafka 的下载与安装步骤,包括从官方网站获取最新版本的压缩包并进行解压。随后详细讨论了集群配置的最佳实践,涵盖节点选择、网络优化和性能调优等方面,旨在提升系统的稳定性和处理能力。此外,还提供了常见的故障排查方法和监控方案,帮助运维人员更好地管理和维护 Kafka 集群。 ... [详细]
  • HBase Java API 进阶:过滤器详解与应用实例
    本文详细探讨了HBase 1.2.6版本中Java API的高级应用,重点介绍了过滤器的使用方法和实际案例。首先,文章对几种常见的HBase过滤器进行了概述,包括列前缀过滤器(ColumnPrefixFilter)和时间戳过滤器(TimestampsFilter)。此外,还详细讲解了分页过滤器(PageFilter)的实现原理及其在大数据查询中的应用场景。通过具体的代码示例,读者可以更好地理解和掌握这些过滤器的使用技巧,从而提高数据处理的效率和灵活性。 ... [详细]
  • Hadoop 2.6 主要由 HDFS 和 YARN 两大部分组成,其中 YARN 包含了运行在 ResourceManager 的 JVM 中的组件以及在 NodeManager 中运行的部分。本文深入探讨了 Hadoop 2.6 日志文件的解析方法,并详细介绍了 MapReduce 日志管理的最佳实践,旨在帮助用户更好地理解和优化日志处理流程,提高系统运维效率。 ... [详细]
  • 在Hive中合理配置Map和Reduce任务的数量对于优化不同场景下的性能至关重要。本文探讨了如何控制Hive任务中的Map数量,分析了当输入数据超过128MB时是否会自动拆分,以及Map数量是否越多越好的问题。通过实际案例和实验数据,本文提供了具体的配置建议,帮助用户在不同场景下实现最佳性能。 ... [详细]
  • 如何提升Python处理约1GB数据集时的运行效率?
    如何提升Python处理约1GB数据集时的运行效率?本文探讨了在后端开发中使用Python处理大规模数据集的优化方法。通过分析常见的性能瓶颈,介绍了多种提高数据处理速度的技术,包括使用高效的数据结构、并行计算、内存管理和代码优化策略。此外,文章还提供了在Ubuntu环境下配置和测试这些优化方案的具体步骤,适用于从事推荐系统等领域的开发者。 ... [详细]
  • 工作原理_一文理解 Spark 基础概念及工作原理
    篇首语:本文由编程笔记#小编为大家整理,主要介绍了一文理解Spark基础概念及工作原理相关的知识,希望对你有一定的参考价值。 ... [详细]
  • Hadoop的分布式架构改进与应用
    nsitionalENhttp:www.w3.orgTRxhtml1DTDxhtml1-transitional.dtd ... [详细]
  • 阿里云大数据计算服务MaxCompute (原名 ODPS)
     MaxCompute是阿里EB级计算平台,经过十年磨砺,它成为阿里巴巴集团数据中台的计算核心和阿里云大数据的基础服务。去年MaxCompute做了哪些工作,这些工作背后的原因是什 ... [详细]
  • 本文整理了Java中org.apache.hadoop.mapreduce.lib.input.MultipleInputs.addInputPath()方法的一些代码 ... [详细]
  • 安装hadoop2.9.2jdk1.8centos7
    安装JDK1.8查看JDK1.8的安装https:www.cnblogs.comTJ21p13208514.html安装hadoop上传hadoop下载hadoop地址http:m ... [详细]
author-avatar
葛菁昱
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有