热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

MapReduce之分区案例的代码实现

在此案例中,我们使用学生成绩实现分区功能(第三列为成绩):将原始数据上传至HDFS[roothadoop01test_da

在此案例中,我们使用学生成绩实现分区功能(第三列为成绩):

  •  将原始数据上传至HDFS

[root@hadoop01 test_data]# hdfs dfs -mkdir /test_partition_input
[root@hadoop01 test_data]# hdfs dfs -put test_partiton.csv /test_partition_input

新建project:

  • 引入pom依赖


4.0.0wyh.testtest_partition1.0-SNAPSHOT88jarorg.apache.hadoophadoop-common2.7.5org.apache.hadoophadoop-client2.7.5org.apache.hadoophadoop-hdfs2.7.5org.apache.hadoophadoop-mapreduce-client-core2.7.5junitjunitRELEASEorg.apache.maven.pluginsmaven-compiler-plugin3.11.81.8UTF-8org.apache.maven.pluginsmaven-shade-plugin2.4.3packageshadetrue

  • 创建自定义Mapper类

package wyh.test.partition;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;/*** 四个泛型,分别是K1,V1,K2,V2的类型(要分区的字段必须包含在K2里),这里我们可以暂时将V2置空*/
public class PartitionMapper extends Mapper {/*** map(...)用于将K1,V1转为K2,V2,在我们的案例中,K2直接使用V1的值即可。* K1为行偏移量,V1为行数据* K2为V1的值,也即行数据* V2置空*/@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {context.write(value, NullWritable.get());//NullWritable.get()需要使用get()方法拿到NullWritable对应的对象}
}

  • 创建自定义Partitioner

package wyh.test.partition;import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;/*** 两个泛型依次对应K2,V2的类型*/
public class PartitionPartitioner extends Partitioner {/*** 该方法用于定义分区的具体规则,并返回分区的编号* @param text - K2* @param nullWritable - V2* @param i - 分区个数* @return*/@Overridepublic int getPartition(Text text, NullWritable nullWritable, int i) {//获取原始行数据,并截取成绩值String[] split = text.toString().split(",");String gradeString = split[2];int grade=Integer.parseInt(gradeString);//定义分区规则if(grade > 90){return 1;//返回分区编号}else{return 0;}}
}

  • 自定义Reducer类

package wyh.test.partition;import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;/*** 四个泛型依次为K2,V2,K3,V3的类型* K2 - 行数据* V2 - 置空* K3 - 行数据(在我们的案例中此处的Reduce中不需要对数据进行处理),直接将数据进行传递即可。* V3 - 置空*/
public class PartitionReducer extends Reducer {@Overrideprotected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {context.write(key, NullWritable.get());}
}

  • 创建主类

package wyh.test.partition;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;public class PartitionJobMain extends Configured implements Tool {@Overridepublic int run(String[] strings) throws Exception {//创建job对象Job job = Job.getInstance(super.getConf(), "test_partition_job");//集群中必须配置job.setJarByClass(PartitionJobMain.class);//配置输入项job.setInputFormatClass(TextInputFormat.class);TextInputFormat.addInputPath(job, new Path("hdfs://192.168.126.132:8020/test_partition_input"));//配置Mapjob.setMapperClass(PartitionMapper.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(NullWritable.class);//配置分区job.setPartitionerClass(PartitionPartitioner.class);//配置Reducejob.setReducerClass(PartitionReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(NullWritable.class);job.setOutputFormatClass(TextOutputFormat.class);TextOutputFormat.setOutputPath(job, new Path("hdfs://192.168.126.132/test_partition_output"));//设置Reduce Task个数(默认是1),Reduce Task个数也即分区个数job.setNumReduceTasks(2);//等待job执行状态返回值boolean status = job.waitForCompletion(true);//三目运算的结果会引用到main()方法里的runStatusreturn status?0:1;}public static void main(String[] args) throws Exception {Configuration configuration = new Configuration();//启动job,返回任务执行状态int runStatus = ToolRunner.run(configuration, new PartitionJobMain(), args);System.exit(runStatus);}
}

  • 打包

先clean,清除编译过程中产生的中间文件,然后package进行打包:

  • 将打好的jar包上传至服务器

  •  执行jar

[root@hadoop01 test_jar]# hadoop jar test_partition-1.0-SNAPSHOT.jar wyh.test.partition.PartitionJobMain#最后面的值是主类的全路径

  •  查看目录树结构

由于我们设置的是两个分区,所以这里就会生成两个分区文件:

  •  查看分区结果

在PartitionPartitioner类中我们指定了成绩大于90的进入1号分区,否则进入0号分区:

 所以查看分区文件中0号文件得到的就是成绩<&#61;90的所有学生信息&#xff1a;

 查看分区文件中1号文件得到的就是成绩>90的所有学生信息&#xff1a;

 这样就简单地实现了MapReduce中分区的功能。




推荐阅读
  • hadoop3.1.2 first programdefault wordcount (Mac)
    hadoop3.1.2安装完成后的第一个实操示例程 ... [详细]
  • Hadoop 2.6 主要由 HDFS 和 YARN 两大部分组成,其中 YARN 包含了运行在 ResourceManager 的 JVM 中的组件以及在 NodeManager 中运行的部分。本文深入探讨了 Hadoop 2.6 日志文件的解析方法,并详细介绍了 MapReduce 日志管理的最佳实践,旨在帮助用户更好地理解和优化日志处理流程,提高系统运维效率。 ... [详细]
  • 如何高效启动大数据应用之旅?
    在前一篇文章中,我探讨了大数据的定义及其与数据挖掘的区别。本文将重点介绍如何高效启动大数据应用项目,涵盖关键步骤和最佳实践,帮助读者快速踏上大数据之旅。 ... [详细]
  • 技术日志:深入探讨Spark Streaming与Spark SQL的融合应用
    技术日志:深入探讨Spark Streaming与Spark SQL的融合应用 ... [详细]
  • Python 数据分析领域不仅拥有高质量的开发环境,还提供了众多功能强大的第三方库。本文将介绍六个关键步骤,帮助读者掌握 Python 数据分析的核心技能,并深入探讨六款虽不广为人知但却极具潜力的数据处理库,如 Pandas 的替代品和新兴的可视化工具,助力数据科学家和分析师提升工作效率。 ... [详细]
  • 本文介绍如何使用 Python 的 DOM 和 SAX 方法解析 XML 文件,并通过示例展示了如何动态创建数据库表和处理大量数据的实时插入。 ... [详细]
  • 从0到1搭建大数据平台
    从0到1搭建大数据平台 ... [详细]
  • php更新数据库字段的函数是,php更新数据库字段的函数是 ... [详细]
  • Presto:高效即席查询引擎的深度解析与应用
    本文深入解析了Presto这一高效的即席查询引擎,详细探讨了其架构设计及其优缺点。Presto通过内存到内存的数据处理方式,显著提升了查询性能,相比传统的MapReduce查询,不仅减少了数据传输的延迟,还提高了查询的准确性和效率。然而,Presto在大规模数据处理和容错机制方面仍存在一定的局限性。本文还介绍了Presto在实际应用中的多种场景,展示了其在大数据分析领域的强大潜力。 ... [详细]
  • 在搭建Hadoop集群以处理大规模数据存储和频繁读取需求的过程中,经常会遇到各种配置难题。本文总结了作者在实际部署中遇到的典型问题,并提供了详细的解决方案,帮助读者避免常见的配置陷阱。通过这些经验分享,希望读者能够更加顺利地完成Hadoop集群的搭建和配置。 ... [详细]
  • 在Hive中合理配置Map和Reduce任务的数量对于优化不同场景下的性能至关重要。本文探讨了如何控制Hive任务中的Map数量,分析了当输入数据超过128MB时是否会自动拆分,以及Map数量是否越多越好的问题。通过实际案例和实验数据,本文提供了具体的配置建议,帮助用户在不同场景下实现最佳性能。 ... [详细]
  • HBase在金融大数据迁移中的应用与挑战
    随着最后一台设备的下线,标志着超过10PB的HBase数据迁移项目顺利完成。目前,新的集群已在新机房稳定运行超过两个月,监控数据显示,新集群的查询响应时间显著降低,系统稳定性大幅提升。此外,数据消费的波动也变得更加平滑,整体性能得到了显著优化。 ... [详细]
  • 本文详细介绍了在Windows操作系统上使用Python 3.8.5编译支持CUDA 11和cuDNN 8.0.2的TensorFlow 2.3的步骤。文章不仅提供了详细的编译指南,还分享了编译后的文件下载链接,方便用户快速获取所需资源。此外,文中还涵盖了常见的编译问题及其解决方案,确保用户能够顺利进行编译和安装。 ... [详细]
  • Hadoop + Spark安装(三) —— 调hadoop
    ***************************测试hadoop及问题跟进***************************执行以下语句报错datahadoop-2.9. ... [详细]
  • 前期Linux环境准备1.修改Linux主机名2.修改IP3.修改主机名和IP的映射关系4.关闭防火墙5.ssh免登陆6.安装JDK,配置环境变量等集群规划主机 IP安装软件运行进 ... [详细]
author-avatar
阳吉登
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有