热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

大数据之mapreduce小实战

手写wordcount的程序1、pom.xml

手写wordcount的程序

1、pom.xml

<dependencies>
<dependency>
<groupId>org.apache.hadoopgroupId>
<artifactId>hadoop-clientartifactId>
<version>2.7.3version>
dependency>
dependencies>

2、新建Mapper类

package com.hsiehchou.wordcount;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* 海量数据
*
* hello hsiehchou
* nihao
*
* 数据的输入与输出以Key value进行传输
* keyIN:LongWritable(Long) 数据的起始偏移量
* valuewIN:具体数据
*
* mapper需要把数据传递到reducer阶段&#xff08;&#xff09;
* keyOut:单词 Text
* valueOut:出现的次数IntWritable
*
*/
public class WordCountMapper extends Mapper, Text, Text, IntWritable> {
//对数据进行打散 ctrl&#43;o
&#64;Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//1、接入数据 hello nihao
String line &#61; value.toString();
//2、对数据进行切分
String[] words &#61; line.split(" ");
//3、写出以
for (String w:words){
//写出reducer端
context.write(new Text(w), new IntWritable(1));
}
}
}

3、新建Reducer类

package com.hsiehchou.wordcount;
import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
* reducer阶段接收的是Mapper输出的数据
* mapper的输出是reducer输入
*
* keyIn:mapper输出的key的类型
* valueIn:mapper输出的value的类型
*
* reducer端输出的数据类型&#xff0c;想要一个什么样的结果
* keyOut:Text
* valueOut:IntWritalble
*
*/
public class WordCountReducer extends Reducer, IntWritable, Text, IntWritable> {
//key-->单词 value-->次数
&#64;Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
//1、记录出现的次数
int sum &#61; 0;
for (IntWritable v:values){
sum &#43;&#61; v.get();
}
//2、l累加求和输出
context.write(key, new IntWritable(sum));
}
}

4、新建驱动类

package com.hsiehchou.wordcount;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class WordCountDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//1、创建job任务
Configuration conf &#61; new Configuration();
Job job &#61; Job.getInstance(conf);
//2、指定jar包位置
job.setJarByClass(WordCountDriver.class);
//3、关联使用的Mapper类
job.setMapperClass(WordCountMapper.class);
//4、关联使用的Reducer类
job.setReducerClass(WordCountReducer.class);
//5、设置mapper阶段输出的数据类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//6、设置reducer阶段输出的数据类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//7、设置数据输入的路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
//8设置数据输出的路径
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//9、提交任务
boolean rs &#61; job.waitForCompletion(true);
System.exit(rs ? 0:1);
}
}
运行结果

[root&#64;hsiehchou121 ~]# hadoop jar mapreduce-1.0-SNAPSHOT.jar com.hsiehchou.wordcount.WordCountDriver /wc/in /wc/out
[root&#64;hsiehchou121 ~]# hdfs dfs -cat /wc/out/part-r-00000
fd 1
fdgs 1
fdsbv 1
gd 1
hello 3

5、IDEA的相关使用

Ctrl&#43;O导入相关未实现的方法
Maven中的Lifecycle的package可以直接打包成jar

案例分析

需求&#xff1a;运营商流量日志
10086
计算每个用户当前使用的总流量
思路&#xff1f;总流量 &#61; 上行流量&#43;下行流量
三个字段&#xff1a;手机号 上行流量 下行流量
技术选型&#xff1a;PB&#43;
数据分析&#xff1a;海量数据(存储hdfs)
海量数据计算(分布式计算框架MapReduce)

实现

FlowBean类

package com.hsiehchou.logs;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/**
* 封装数据类型需要怎么做
* hadoop数据类型实现了序列化接口
* 如果自定义需要实现这个序列化接口
*/
public class FlowBean implements Writable {
//定义属性&#xff1a;上行流量 下行流量 总流量总和
private long upFlow;
private long dfFlow;
private long flowsum;
public FlowBean(){}
public FlowBean(long upFlow, long dfFlow){
this.upFlow &#61; upFlow;
this.dfFlow &#61; dfFlow;
this.flowsum &#61; upFlow &#43; dfFlow;
}
public long getUpFlow(){
return upFlow;
}
public void setUpFlow(long upFlow){
this.upFlow &#61; upFlow;
}
public long getDfFlow(){
return dfFlow;
}
public void setDfFlow(long dfFlow){
this.dfFlow &#61; dfFlow;
}
public long getFlowsum(){
return flowsum;
}
public void setFlowsum(long flowsum){
this.flowsum &#61; flowsum;
}
//序列化
public void write(DataOutput out) throws IOException {
out.writeLong(upFlow);
out.writeLong(dfFlow);
out.writeLong(flowsum);
}
//反序列化
public void readFields(DataInput in) throws IOException {
upFlow &#61; in.readLong();
dfFlow &#61; in.readLong();
flowsum &#61; in.readLong();
}
&#64;Override
public String toString() {
return upFlow &#43; "\t" &#43; dfFlow &#43; "\t" &#43; flowsum;
}
}
FlowCountMapper类

package com.hsiehchou.logs;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* keyIN:
* valueIN:
*
* 思路&#xff1a;根据想要的结果的kv类型 手机号 流量总和&#xff08;上行&#43;下行&#xff09;自定义类
* keyOut:
* valueOut:
*/
public class FlowCountMapper extends Mapper, Text, Text, FlowBean> {
&#64;Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//1、接入数据源
String line &#61; value.toString();
//2、切割 \t
String[] fields &#61; line.split("\t");
//3、拿到关键字段
String phoneNr &#61; fields[1];
long upFlow &#61; Long.parseLong(fields[fields.length - 3]);
long dfFlow &#61; Long.parseLong(fields[fields.length - 2]);
//4、写出到reducer
context.write(new Text(phoneNr), new FlowBean(upFlow,dfFlow));
}
}
FlowCountReducer类

package com.hsiehchou.logs;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class FlowCountReducer extends Reducer, FlowBean, Text, FlowBean> {
&#64;Override
protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {
long upFlow_sum &#61; 0;
long dfFlow_sum &#61; 0;
for (FlowBean v:values){
upFlow_sum &#43;&#61; v.getUpFlow();
dfFlow_sum &#43;&#61; v.getDfFlow();
}
FlowBean rsSum &#61; new FlowBean(upFlow_sum, dfFlow_sum);
//输出结果
context.write(key, rsSum);
}
}
FlowCountDriver类

package com.hsiehchou.logs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class FlowCountDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//1.创建job任务
Configuration conf &#61; new Configuration();
Job job &#61; Job.getInstance(conf);
//2.指定kjar包位置
job.setJarByClass(FlowCountDriver.class);
//3.关联使用的Mapper
job.setMapperClass(FlowCountMapper.class);
//4.关联使用的Reducer类
job.setReducerClass(FlowCountReducer.class);
//5.设置mapper阶段输出的数据类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);
//6.设置reducer阶段输出的数据类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
//优化含有大量小文件的数据
//设置读取数据切片的类
job.setInputFormatClass(CombineTextInputFormat.class);
//最大切片大小8M
CombineTextInputFormat.setMaxInputSplitSize(job, 8388608);
//最小切片大小6M
CombineTextInputFormat.setMinInputSplitSize(job, 6291456);
//7.设置数据输入的路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
//8.设置数据输出的路径
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//9.提交任务
boolean rs &#61; job.waitForCompletion(true);
System.exit(rs? 0:1);
}
}
运行结果

[root&#64;hsiehchou121 ~]# hdfs dfs -mkdir -p /flow/in
[root&#64;hsiehchou121 ~]# hdfs dfs -put HTTP_20180313143750.dat /flow/in
[root&#64;hsiehchou121 ~]# hadoop jar mapreduce-1.0-SNAPSHOT.jar com.hsiehchou.logs.FlowCountDriver /flow/in /flow/out
[root&#64;hsiehchou121 ~]# hdfs dfs -cat /flow/out/part-r-00000
13480253104 120 1320 1440
13502468823 735 11349 12084
13510439658 1116 954 2070
13560436326 1136 94 1230
13560436666 1136 94 1230
13560439658 918 4938 5856
13602846565 198 910 1108
13660577991 660 690 1350
13719199419 240 0 240
13726130503 299 681 980
13726238888 2481 24681 27162
13760778710 120 120 240
13822544101 264 0 264
13884138413 4116 1432 5548
13922314466 3008 3720 6728
13925057413 11058 4243 15301
13926251106 240 0 240
13926435656 132 1512 1644
15013685858 369 338 707
15889002119 938 380 1318
15920133257 316 296 612
18212575961 1527 2106 3633
18320173382 9531 212 9743

小文件优化

如果企业中存在海量的小文件数据
TextInputFormat按照文件规划切片&#xff0c;文件不管多小都是一个单独的切片&#xff0c;启动mapt
ask任务去执行&#xff0c;这样会产生大量的maptask&#xff0c;浪费资源。
优化手段&#xff1a;
小文件合并大文件&#xff0c;如果不动这个小文件内容。

转:https://www.cnblogs.com/hsiehchou/p/10403452.html



推荐阅读
  • r2dbc配置多数据源
    R2dbc配置多数据源问题根据官网配置r2dbc连接mysql多数据源所遇到的问题pom配置可以参考官网,不过我这样配置会报错我并没有这样配置将以下内容添加到pom.xml文件d ... [详细]
  • 本文介绍了Android 7的学习笔记总结,包括最新的移动架构视频、大厂安卓面试真题和项目实战源码讲义。同时还分享了开源的完整内容,并提醒读者在使用FileProvider适配时要注意不同模块的AndroidManfiest.xml中配置的xml文件名必须不同,否则会出现问题。 ... [详细]
  • CEPH LIO iSCSI Gateway及其使用参考文档
    本文介绍了CEPH LIO iSCSI Gateway以及使用该网关的参考文档,包括Ceph Block Device、CEPH ISCSI GATEWAY、USING AN ISCSI GATEWAY等。同时提供了多个参考链接,详细介绍了CEPH LIO iSCSI Gateway的配置和使用方法。 ... [详细]
  • centos安装Mysql的方法及步骤详解
    本文介绍了centos安装Mysql的两种方式:rpm方式和绿色方式安装,详细介绍了安装所需的软件包以及安装过程中的注意事项,包括检查是否安装成功的方法。通过本文,读者可以了解到在centos系统上如何正确安装Mysql。 ... [详细]
  • Servlet多用户登录时HttpSession会话信息覆盖问题的解决方案
    本文讨论了在Servlet多用户登录时可能出现的HttpSession会话信息覆盖问题,并提供了解决方案。通过分析JSESSIONID的作用机制和编码方式,我们可以得出每个HttpSession对象都是通过客户端发送的唯一JSESSIONID来识别的,因此无需担心会话信息被覆盖的问题。需要注意的是,本文讨论的是多个客户端级别上的多用户登录,而非同一个浏览器级别上的多用户登录。 ... [详细]
  • 目前Miniconda3的主要版本已经不支持python3.6,以Windows为例,在官网Miniconda—Condadocumentation中只有python3.7 ... [详细]
  • 五、RabbitMQ Java Client基本使用详解
    JavaClient的5.x版本系列需要JDK8,用于编译和运行。在Android上,仅支持Android7.0或更高版本。4.x版本系列支持7.0之前 ... [详细]
  • 安装mysqlclient失败解决办法
    本文介绍了在MAC系统中,使用django使用mysql数据库报错的解决办法。通过源码安装mysqlclient或将mysql_config添加到系统环境变量中,可以解决安装mysqlclient失败的问题。同时,还介绍了查看mysql安装路径和使配置文件生效的方法。 ... [详细]
  • XML介绍与使用的概述及标签规则
    本文介绍了XML的基本概念和用途,包括XML的可扩展性和标签的自定义特性。同时还详细解释了XML标签的规则,包括标签的尖括号和合法标识符的组成,标签必须成对出现的原则以及特殊标签的使用方法。通过本文的阅读,读者可以对XML的基本知识有一个全面的了解。 ... [详细]
  • Java验证码——kaptcha的使用配置及样式
    本文介绍了如何使用kaptcha库来实现Java验证码的配置和样式设置,包括pom.xml的依赖配置和web.xml中servlet的配置。 ... [详细]
  • Android系统移植与调试之如何修改Android设备状态条上音量加减键在横竖屏切换的时候的显示于隐藏
    本文介绍了如何修改Android设备状态条上音量加减键在横竖屏切换时的显示与隐藏。通过修改系统文件system_bar.xml实现了该功能,并分享了解决思路和经验。 ... [详细]
  • Go Cobra命令行工具入门教程
    本文介绍了Go语言实现的命令行工具Cobra的基本概念、安装方法和入门实践。Cobra被广泛应用于各种项目中,如Kubernetes、Hugo和Github CLI等。通过使用Cobra,我们可以快速创建命令行工具,适用于写测试脚本和各种服务的Admin CLI。文章还通过一个简单的demo演示了Cobra的使用方法。 ... [详细]
  • 在Xamarin XAML语言中如何在页面级别构建ControlTemplate控件模板
    本文介绍了在Xamarin XAML语言中如何在页面级别构建ControlTemplate控件模板的方法和步骤,包括将ResourceDictionary添加到页面中以及在ResourceDictionary中实现模板的构建。通过本文的阅读,读者可以了解到在Xamarin XAML语言中构建控件模板的具体操作步骤和语法形式。 ... [详细]
  • MyBatis多表查询与动态SQL使用
    本文介绍了MyBatis多表查询与动态SQL的使用方法,包括一对一查询和一对多查询。同时还介绍了动态SQL的使用,包括if标签、trim标签、where标签、set标签和foreach标签的用法。文章还提供了相关的配置信息和示例代码。 ... [详细]
  • 本文介绍了一个适用于PHP应用快速接入TRX和TRC20数字资产的开发包,该开发包支持使用自有Tron区块链节点的应用场景,也支持基于Tron官方公共API服务的轻量级部署场景。提供的功能包括生成地址、验证地址、查询余额、交易转账、查询最新区块和查询交易信息等。详细信息可参考tron-php的Github地址:https://github.com/Fenguoz/tron-php。 ... [详细]
author-avatar
葛菁昱
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有