热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

MapReduce设置输出文件到多个文件夹下(二)

app类packagemrtest.multipleout;importcom.zyr.baseutil.UrlUtil;importorg.apache.hadoop.conf.

app类

package mrtest.multipleout;
import com.zyr.baseutil.UrlUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.counters.Limits;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.orc.mapred.OrcStruct;
import util.MOrcOutputFormat;
import java.io.IOException;
/**
* 多输出orc文件
*
@author Administrator
* 开发时写orc时注意orc的版本问题 1.6.2
*
*/
public class MultipleOrcOutDriver {
public static String a = "a";
public static String b = "b";
public static String counterInPath = "/txj/a.txt";
public static String counterOutPath = "file:///F:/txj/test";
/**
* 程序入口
*
@param args
*
@throws IOException
*
@throws ClassNotFoundException
*
@throws InterruptedException
*/
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//1获取job对象信息
Configuration cOnf= new Configuration();
Job job
= Job.getInstance(conf);
//2设置加载jar位置
job.setJarByClass(MultipleOrcOutDriver.class);
//3设置mapper和reducer的class类
job.setMapperClass(MultipleOrcOutMapper.class);
job.setReducerClass(MultipleOrcOutReduce.
class);
//4设置输出mapper的数据类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.
class);
//5设置最终数据输出类型
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(OrcStruct.
class);
//6设置输入数据和输出数据路径
FileInputFormat.setInputPaths(job, new Path(counterInPath));
// FileOutputFormat.setOutputPath(job, new Path(counterOutPath));
MOrcOutputFormat.setOutputPath(job, new Path(counterOutPath));
job.setInputFormatClass(TextInputFormat.
class);
// job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputFormatClass(MOrcOutputFormat.class);
//
job.getConfiguration().set(a,counterOutPath +"/a/a");
job.getConfiguration().set(b,counterOutPath
+"/b/b");
MultipleOutputs.addNamedOutput(job, MultipleOrcOutDriver.a, MOrcOutputFormat.
class, NullWritable.class, OrcStruct.class);
MultipleOutputs.addNamedOutput(job, MultipleOrcOutDriver.b, MOrcOutputFormat.
class, NullWritable.class, OrcStruct.class);
//启用
MultipleOutputs.setCountersEnabled(job, true);
//懒加载output模式 防止因为多路输出时 没有文件但是依然创建旧目录和空文件
LazyOutputFormat.setOutputFormatClass(job, MOrcOutputFormat.class);
//重新设置文件输出个数控制,默认控制是120个
job.getConfiguration().setInt("mapreduce.job.counters.max", 1000000000);
job.getConfiguration().setInt(
"mapreduce.job.counters.limit", 1000000000);
     // 
Limits.init(job.getConfiguration());
//7提交
boolean result = job.waitForCompletion(true);
System.exit(result
?0:1);
}
}

mapper类

package mrtest.multipleout;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
/**
* 实现自己的mapper类
*
@author Administrator
*/
public class MultipleOrcOutMapper extends Mapper {
private Text outKey = new Text();
private IntWritable outVal = new IntWritable();
/**
* 分割获取到每个单词
*/
@Override
protected void map(LongWritable key, Text value, Context context)
throws java.io.IOException, InterruptedException {
// 获取这一行内容
String lineStr = value.toString();
// 获取每个单词
String[] words = lineStr.split(" ");
if (words.length==2){
outKey.set(words[
1]);
outVal.set(Integer.parseInt(words[
0]));
context.write(outKey, outVal);
}
};
}

reduce类

package mrtest.multipleout;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.orc.TypeDescription;
import org.apache.orc.mapred.OrcStruct;
import java.io.IOException;
import java.util.Iterator;
/**
*
*
@author Administrator
*
*/
public class MultipleOrcOutReduce extends Reducer{
//要创建的ORC文件中的字段类型
private TypeDescription schema = TypeDescription.fromString("struct");
private OrcStruct pair = (OrcStruct) OrcStruct.createValue(schema);
public static MultipleOutputs mo;
public static String a = "";
public static String b = "";
@Override
protected void setup(Context context) throws IOException, InterruptedException {
Configuration conf
= context.getConfiguration();
mo
= new MultipleOutputs<>(context);
a
= conf.get(MultipleOrcOutDriver.a);
b
= conf.get(MultipleOrcOutDriver.b);
}
@Override
protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
Iterator
it = values.iterator();
int count = 0;
while (it.hasNext()){
IntWritable val
= it.next();
count
+= val.get();
}
switch (key.toString()){
case "a":
pair.setFieldValue(
"val",key);
pair.setFieldValue(
"count",new LongWritable(count));
mo.write(MultipleOrcOutDriver.a,NullWritable.get(),pair,a);
break;
case "b":
pair.setFieldValue(
"val",key);
pair.setFieldValue(
"count",new LongWritable(count));
mo.write(MultipleOrcOutDriver.b,NullWritable.get(),pair,b);
break;
}
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
mo.close();
}
}



推荐阅读
  • 大数据Hadoop生态(20)MapReduce框架原理OutputFormat的开发笔记
    本文介绍了大数据Hadoop生态(20)MapReduce框架原理OutputFormat的开发笔记,包括outputFormat接口实现类、自定义outputFormat步骤和案例。案例中将包含nty的日志输出到nty.log文件,其他日志输出到other.log文件。同时提供了一些相关网址供参考。 ... [详细]
  • 对于开源的东东,尤其是刚出来不久,我认为最好的学习方式就是能够看源代码和doc,測试它的样例为了方便查看源代码,关联导入源代 ... [详细]
  • Java如何导入和导出Excel文件的方法和步骤详解
    本文详细介绍了在SpringBoot中使用Java导入和导出Excel文件的方法和步骤,包括添加操作Excel的依赖、自定义注解等。文章还提供了示例代码,并将代码上传至GitHub供访问。 ... [详细]
  • Maven构建Hadoop,
    Maven构建Hadoop工程阅读目录序Maven安装构建示例下载系列索引 序  上一篇,我们编写了第一个MapReduce,并且成功的运行了Job,Hadoop1.x是通过ant ... [详细]
  • 一、Hadoop来历Hadoop的思想来源于Google在做搜索引擎的时候出现一个很大的问题就是这么多网页我如何才能以最快的速度来搜索到,由于这个问题Google发明 ... [详细]
  • Go Cobra命令行工具入门教程
    本文介绍了Go语言实现的命令行工具Cobra的基本概念、安装方法和入门实践。Cobra被广泛应用于各种项目中,如Kubernetes、Hugo和Github CLI等。通过使用Cobra,我们可以快速创建命令行工具,适用于写测试脚本和各种服务的Admin CLI。文章还通过一个简单的demo演示了Cobra的使用方法。 ... [详细]
  • 本文介绍了在sqoop1.4.*版本中,如何实现自定义分隔符的方法及步骤。通过修改sqoop生成的java文件,并重新编译,可以满足实际开发中对分隔符的需求。具体步骤包括修改java文件中的一行代码,重新编译所需的hadoop包等。详细步骤和编译方法在本文中都有详细说明。 ... [详细]
  • importorg.apache.hadoop.hdfs.DistributedFileSystem;导入方法依赖的package包类privatevoidtestHSyncOpe ... [详细]
  • Hadoop源码解析1Hadoop工程包架构解析
    1 Hadoop中各工程包依赖简述   Google的核心竞争技术是它的计算平台。Google的大牛们用了下面5篇文章,介绍了它们的计算设施。   GoogleCluster:ht ... [详细]
  • 前言折腾了一段时间hadoop的部署管理,写下此系列博客记录一下。为了避免各位做部署这种重复性的劳动,我已经把部署的步骤写成脚本,各位只需要按着本文把脚本执行完,整个环境基本就部署 ... [详细]
  • 本文介绍了Perl的测试框架Test::Base,它是一个数据驱动的测试框架,可以自动进行单元测试,省去手工编写测试程序的麻烦。与Test::More完全兼容,使用方法简单。以plural函数为例,展示了Test::Base的使用方法。 ... [详细]
  • Go GUIlxn/walk 学习3.菜单栏和工具栏的具体实现
    本文介绍了使用Go语言的GUI库lxn/walk实现菜单栏和工具栏的具体方法,包括消息窗口的产生、文件放置动作响应和提示框的应用。部分代码来自上一篇博客和lxn/walk官方示例。文章提供了学习GUI开发的实际案例和代码示例。 ... [详细]
  • 本文讨论了在openwrt-17.01版本中,mt7628设备上初始化启动时eth0的mac地址总是随机生成的问题。每次随机生成的eth0的mac地址都会写到/sys/class/net/eth0/address目录下,而openwrt-17.01原版的SDK会根据随机生成的eth0的mac地址再生成eth0.1、eth0.2等,生成后的mac地址会保存在/etc/config/network下。 ... [详细]
  • 本文总结了初学者在使用dubbo设计架构过程中遇到的问题,并提供了相应的解决方法。问题包括传输字节流限制、分布式事务、序列化、多点部署、zk端口冲突、服务失败请求3次机制以及启动时检查。通过解决这些问题,初学者能够更好地理解和应用dubbo设计架构。 ... [详细]
  • Hadoop2.6.0 + 云centos +伪分布式只谈部署
    3.0.3玩不好,现将2.6.0tar.gz上传到usr,chmod-Rhadoop:hadophadoop-2.6.0,rm掉3.0.32.在etcp ... [详细]
author-avatar
嗒嗒爱臭臭
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有