作者:嗒嗒爱臭臭 | 来源:互联网 | 2023-08-29 20:02
app类packagemrtest.multipleout;importcom.zyr.baseutil.UrlUtil;importorg.apache.hadoop.conf.
app类
package mrtest.multipleout;
import com.zyr.baseutil.UrlUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.counters.Limits;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.orc.mapred.OrcStruct;
import util.MOrcOutputFormat;
import java.io.IOException;
/**
* 多输出orc文件
* @author Administrator
* 开发时写orc时注意orc的版本问题 1.6.2
*
*/
public class MultipleOrcOutDriver {
public static String a = "a";
public static String b = "b";
public static String counterInPath = "/txj/a.txt";
public static String counterOutPath = "file:///F:/txj/test";
/**
* 程序入口
* @param args
* @throws IOException
* @throws ClassNotFoundException
* @throws InterruptedException
*/
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//1获取job对象信息
Configuration cOnf= new Configuration();
Job job = Job.getInstance(conf);
//2设置加载jar位置
job.setJarByClass(MultipleOrcOutDriver.class);
//3设置mapper和reducer的class类
job.setMapperClass(MultipleOrcOutMapper.class);
job.setReducerClass(MultipleOrcOutReduce.class);
//4设置输出mapper的数据类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//5设置最终数据输出类型
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(OrcStruct.class);
//6设置输入数据和输出数据路径
FileInputFormat.setInputPaths(job, new Path(counterInPath));
// FileOutputFormat.setOutputPath(job, new Path(counterOutPath));
MOrcOutputFormat.setOutputPath(job, new Path(counterOutPath));
job.setInputFormatClass(TextInputFormat.class);
// job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputFormatClass(MOrcOutputFormat.class);
//
job.getConfiguration().set(a,counterOutPath +"/a/a");
job.getConfiguration().set(b,counterOutPath +"/b/b");
MultipleOutputs.addNamedOutput(job, MultipleOrcOutDriver.a, MOrcOutputFormat.class, NullWritable.class, OrcStruct.class);
MultipleOutputs.addNamedOutput(job, MultipleOrcOutDriver.b, MOrcOutputFormat.class, NullWritable.class, OrcStruct.class);
//启用
MultipleOutputs.setCountersEnabled(job, true);
//懒加载output模式 防止因为多路输出时 没有文件但是依然创建旧目录和空文件
LazyOutputFormat.setOutputFormatClass(job, MOrcOutputFormat.class);
//重新设置文件输出个数控制,默认控制是120个
job.getConfiguration().setInt("mapreduce.job.counters.max", 1000000000);
job.getConfiguration().setInt("mapreduce.job.counters.limit", 1000000000);
//
Limits.init(job.getConfiguration());
//7提交
boolean result = job.waitForCompletion(true);
System.exit(result?0:1);
}
}
mapper类
package mrtest.multipleout;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
/**
* 实现自己的mapper类
* @author Administrator
*/
public class MultipleOrcOutMapper extends Mapper {
private Text outKey = new Text();
private IntWritable outVal = new IntWritable();
/**
* 分割获取到每个单词
*/
@Override
protected void map(LongWritable key, Text value, Context context)
throws java.io.IOException, InterruptedException {
// 获取这一行内容
String lineStr = value.toString();
// 获取每个单词
String[] words = lineStr.split(" ");
if (words.length==2){
outKey.set(words[1]);
outVal.set(Integer.parseInt(words[0]));
context.write(outKey, outVal);
}
};
}reduce类
package mrtest.multipleout;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.orc.TypeDescription;
import org.apache.orc.mapred.OrcStruct;
import java.io.IOException;
import java.util.Iterator;
/**
*
* @author Administrator
*
*/
public class MultipleOrcOutReduce extends Reducer{
//要创建的ORC文件中的字段类型
private TypeDescription schema = TypeDescription.fromString("struct");
private OrcStruct pair = (OrcStruct) OrcStruct.createValue(schema);
public static MultipleOutputs mo;
public static String a = "";
public static String b = "";
@Override
protected void setup(Context context) throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();
mo = new MultipleOutputs<>(context);
a = conf.get(MultipleOrcOutDriver.a);
b = conf.get(MultipleOrcOutDriver.b);
}
@Override
protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
Iterator it = values.iterator();
int count = 0;
while (it.hasNext()){
IntWritable val = it.next();
count += val.get();
}
switch (key.toString()){
case "a":
pair.setFieldValue("val",key);
pair.setFieldValue("count",new LongWritable(count));
mo.write(MultipleOrcOutDriver.a,NullWritable.get(),pair,a);
break;
case "b":
pair.setFieldValue("val",key);
pair.setFieldValue("count",new LongWritable(count));
mo.write(MultipleOrcOutDriver.b,NullWritable.get(),pair,b);
break;
}
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
mo.close();
}
}