热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

MapReduce设置输出文件到多个文件夹下(二)

app类packagemrtest.multipleout;importcom.zyr.baseutil.UrlUtil;importorg.apache.hadoop.conf.

app类

package mrtest.multipleout;
import com.zyr.baseutil.UrlUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.counters.Limits;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.orc.mapred.OrcStruct;
import util.MOrcOutputFormat;
import java.io.IOException;
/**
* 多输出orc文件
*
@author Administrator
* 开发时写orc时注意orc的版本问题 1.6.2
*
*/
public class MultipleOrcOutDriver {
public static String a = "a";
public static String b = "b";
public static String counterInPath = "/txj/a.txt";
public static String counterOutPath = "file:///F:/txj/test";
/**
* 程序入口
*
@param args
*
@throws IOException
*
@throws ClassNotFoundException
*
@throws InterruptedException
*/
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//1获取job对象信息
Configuration cOnf= new Configuration();
Job job
= Job.getInstance(conf);
//2设置加载jar位置
job.setJarByClass(MultipleOrcOutDriver.class);
//3设置mapper和reducer的class类
job.setMapperClass(MultipleOrcOutMapper.class);
job.setReducerClass(MultipleOrcOutReduce.
class);
//4设置输出mapper的数据类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.
class);
//5设置最终数据输出类型
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(OrcStruct.
class);
//6设置输入数据和输出数据路径
FileInputFormat.setInputPaths(job, new Path(counterInPath));
// FileOutputFormat.setOutputPath(job, new Path(counterOutPath));
MOrcOutputFormat.setOutputPath(job, new Path(counterOutPath));
job.setInputFormatClass(TextInputFormat.
class);
// job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputFormatClass(MOrcOutputFormat.class);
//
job.getConfiguration().set(a,counterOutPath +"/a/a");
job.getConfiguration().set(b,counterOutPath
+"/b/b");
MultipleOutputs.addNamedOutput(job, MultipleOrcOutDriver.a, MOrcOutputFormat.
class, NullWritable.class, OrcStruct.class);
MultipleOutputs.addNamedOutput(job, MultipleOrcOutDriver.b, MOrcOutputFormat.
class, NullWritable.class, OrcStruct.class);
//启用
MultipleOutputs.setCountersEnabled(job, true);
//懒加载output模式 防止因为多路输出时 没有文件但是依然创建旧目录和空文件
LazyOutputFormat.setOutputFormatClass(job, MOrcOutputFormat.class);
//重新设置文件输出个数控制,默认控制是120个
job.getConfiguration().setInt("mapreduce.job.counters.max", 1000000000);
job.getConfiguration().setInt(
"mapreduce.job.counters.limit", 1000000000);
     // 
Limits.init(job.getConfiguration());
//7提交
boolean result = job.waitForCompletion(true);
System.exit(result
?0:1);
}
}

mapper类

package mrtest.multipleout;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
/**
* 实现自己的mapper类
*
@author Administrator
*/
public class MultipleOrcOutMapper extends Mapper {
private Text outKey = new Text();
private IntWritable outVal = new IntWritable();
/**
* 分割获取到每个单词
*/
@Override
protected void map(LongWritable key, Text value, Context context)
throws java.io.IOException, InterruptedException {
// 获取这一行内容
String lineStr = value.toString();
// 获取每个单词
String[] words = lineStr.split(" ");
if (words.length==2){
outKey.set(words[
1]);
outVal.set(Integer.parseInt(words[
0]));
context.write(outKey, outVal);
}
};
}

reduce类

package mrtest.multipleout;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.orc.TypeDescription;
import org.apache.orc.mapred.OrcStruct;
import java.io.IOException;
import java.util.Iterator;
/**
*
*
@author Administrator
*
*/
public class MultipleOrcOutReduce extends Reducer{
//要创建的ORC文件中的字段类型
private TypeDescription schema = TypeDescription.fromString("struct");
private OrcStruct pair = (OrcStruct) OrcStruct.createValue(schema);
public static MultipleOutputs mo;
public static String a = "";
public static String b = "";
@Override
protected void setup(Context context) throws IOException, InterruptedException {
Configuration conf
= context.getConfiguration();
mo
= new MultipleOutputs<>(context);
a
= conf.get(MultipleOrcOutDriver.a);
b
= conf.get(MultipleOrcOutDriver.b);
}
@Override
protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
Iterator
it = values.iterator();
int count = 0;
while (it.hasNext()){
IntWritable val
= it.next();
count
+= val.get();
}
switch (key.toString()){
case "a":
pair.setFieldValue(
"val",key);
pair.setFieldValue(
"count",new LongWritable(count));
mo.write(MultipleOrcOutDriver.a,NullWritable.get(),pair,a);
break;
case "b":
pair.setFieldValue(
"val",key);
pair.setFieldValue(
"count",new LongWritable(count));
mo.write(MultipleOrcOutDriver.b,NullWritable.get(),pair,b);
break;
}
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
mo.close();
}
}



推荐阅读
  • Spring Boot 中配置全局文件上传路径并实现文件上传功能
    本文介绍如何在 Spring Boot 项目中配置全局文件上传路径,并通过读取配置项实现文件上传功能。通过这种方式,可以更好地管理和维护文件路径。 ... [详细]
  • 为了在Hadoop 2.7.2中实现对Snappy压缩和解压功能的原生支持,本文详细介绍了如何重新编译Hadoop源代码,并优化其Native编译过程。通过这一优化,可以显著提升数据处理的效率和性能。此外,还探讨了编译过程中可能遇到的问题及其解决方案,为用户提供了一套完整的操作指南。 ... [详细]
  • HBase Java API 进阶:过滤器详解与应用实例
    本文详细探讨了HBase 1.2.6版本中Java API的高级应用,重点介绍了过滤器的使用方法和实际案例。首先,文章对几种常见的HBase过滤器进行了概述,包括列前缀过滤器(ColumnPrefixFilter)和时间戳过滤器(TimestampsFilter)。此外,还详细讲解了分页过滤器(PageFilter)的实现原理及其在大数据查询中的应用场景。通过具体的代码示例,读者可以更好地理解和掌握这些过滤器的使用技巧,从而提高数据处理的效率和灵活性。 ... [详细]
  • Hadoop 2.6 主要由 HDFS 和 YARN 两大部分组成,其中 YARN 包含了运行在 ResourceManager 的 JVM 中的组件以及在 NodeManager 中运行的部分。本文深入探讨了 Hadoop 2.6 日志文件的解析方法,并详细介绍了 MapReduce 日志管理的最佳实践,旨在帮助用户更好地理解和优化日志处理流程,提高系统运维效率。 ... [详细]
  • HBase在金融大数据迁移中的应用与挑战
    随着最后一台设备的下线,标志着超过10PB的HBase数据迁移项目顺利完成。目前,新的集群已在新机房稳定运行超过两个月,监控数据显示,新集群的查询响应时间显著降低,系统稳定性大幅提升。此外,数据消费的波动也变得更加平滑,整体性能得到了显著优化。 ... [详细]
  • com.sun.javadoc.PackageDoc.exceptions()方法的使用及代码示例 ... [详细]
  • Spark与HBase结合处理大规模流量数据结构设计
    本文将详细介绍如何利用Spark和HBase进行大规模流量数据的分析与处理,包括数据结构的设计和优化方法。 ... [详细]
  • 在Java Web服务开发中,Apache CXF 和 Axis2 是两个广泛使用的框架。CXF 由于其与 Spring 框架的无缝集成能力,以及更简便的部署方式,成为了许多开发者的首选。本文将详细介绍如何使用 CXF 框架进行 Web 服务的开发,包括环境搭建、服务发布和客户端调用等关键步骤,为开发者提供一个全面的实践指南。 ... [详细]
  • 本文探讨了 Kafka 集群的高效部署与优化策略。首先介绍了 Kafka 的下载与安装步骤,包括从官方网站获取最新版本的压缩包并进行解压。随后详细讨论了集群配置的最佳实践,涵盖节点选择、网络优化和性能调优等方面,旨在提升系统的稳定性和处理能力。此外,还提供了常见的故障排查方法和监控方案,帮助运维人员更好地管理和维护 Kafka 集群。 ... [详细]
  • 2012年9月12日优酷土豆校园招聘笔试题目解析与备考指南
    2012年9月12日,优酷土豆校园招聘笔试题目解析与备考指南。在选择题部分,有一道题目涉及中国人的血型分布情况,具体为A型30%、B型20%、O型40%、AB型10%。若需确保在随机选取的样本中,至少有一人为B型血的概率不低于90%,则需要选取的最少人数是多少?该问题不仅考察了概率统计的基本知识,还要求考生具备一定的逻辑推理能力。 ... [详细]
  • 如何高效启动大数据应用之旅?
    在前一篇文章中,我探讨了大数据的定义及其与数据挖掘的区别。本文将重点介绍如何高效启动大数据应用项目,涵盖关键步骤和最佳实践,帮助读者快速踏上大数据之旅。 ... [详细]
  • 在搭建Hadoop集群以处理大规模数据存储和频繁读取需求的过程中,经常会遇到各种配置难题。本文总结了作者在实际部署中遇到的典型问题,并提供了详细的解决方案,帮助读者避免常见的配置陷阱。通过这些经验分享,希望读者能够更加顺利地完成Hadoop集群的搭建和配置。 ... [详细]
  • 在Hive中合理配置Map和Reduce任务的数量对于优化不同场景下的性能至关重要。本文探讨了如何控制Hive任务中的Map数量,分析了当输入数据超过128MB时是否会自动拆分,以及Map数量是否越多越好的问题。通过实际案例和实验数据,本文提供了具体的配置建议,帮助用户在不同场景下实现最佳性能。 ... [详细]
  • Python 数据分析领域不仅拥有高质量的开发环境,还提供了众多功能强大的第三方库。本文将介绍六个关键步骤,帮助读者掌握 Python 数据分析的核心技能,并深入探讨六款虽不广为人知但却极具潜力的数据处理库,如 Pandas 的替代品和新兴的可视化工具,助力数据科学家和分析师提升工作效率。 ... [详细]
  • 本文整理了Java中org.apache.hadoop.mapreduce.lib.input.MultipleInputs.addInputPath()方法的一些代码 ... [详细]
author-avatar
嗒嗒爱臭臭
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有