作者:安 | 来源:互联网 | 2023-12-10 11:44
本文介绍了大数据Hadoop生态(20)MapReduce框架原理OutputFormat的开发笔记,包括outputFormat接口实现类、自定义outputFormat步骤和案例。案例中将包含nty的日志输出到nty.log文件,其他日志输出到other.log文件。同时提供了一些相关网址供参考。
篇首语:本文由编程笔记#小编为大家整理,主要介绍了大数据-Hadoop生态(20)-MapReduce框架原理-OutputFormat相关的知识,希望对你有一定的参考价值。
1.outputFormat接口实现类
2.自定义outputFormat
步骤:
1). 定义一个类继承FileOutputFormat
2). 定义一个类继承RecordWrite,重写write方法
3. 案例
有一个log文件,将包含nty的输出到nty.log文件,其他的输出到other.log
http://www.baidu.com
http://www.google.com
http://cn.bing.com
http://www.nty.com
http://www.sohu.com
http://www.sina.com
http://www.sin2a.com
http://www.sin2desa.com
http://www.sindsafa.com
自定义类继承FileOutputFormat
package com.nty.outputFormat;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
/**
* author nty
* date time 2018-12-12 19:28
*/
public class FilterOutputFormat extends FileOutputFormat {
@Override
public RecordWriter getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {
FilterRecordWrite frw = new FilterRecordWrite();
frw.init(job);
return frw;
}
}
自定义RecordWriter,重写write
package com.nty.outputFormat;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
/**
* author nty
* date time 2018-12-12 19:29
*/
public class FilterRecordWrite extends RecordWriter {
private FSDataOutputStream nty;
private FSDataOutputStream other;
//将job通过参数传递过来
public void init(TaskAttemptContext job) throws IOException {
String outDir = job.getConfiguration().get(FileOutputFormat.OUTDIR);
FileSystem fileSystem = FileSystem.get(job.getConfiguration());
nty = fileSystem.create(new Path(outDir + "/nty.log"));
other = fileSystem.create(new Path(outDir + "/other.log"));
}
@Override
public void write(LongWritable key, Text value) throws IOException, InterruptedException {
String address = value.toString() + "
";
if(address.contains("nty")) {
nty.write(address.getBytes());
} else {
other.write(address.getBytes());
}
}
@Override
public void close(TaskAttemptContext context) throws IOException, InterruptedException {
//关流
IOUtils.closeStream(nty);
IOUtils.closeStream(other);
}
}
Driver类设置
package com.nty.outputFormat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
/**
* author nty
* date time 2018-12-12 19:29
*/
public class FilterDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
job.setJarByClass(FilterDriver.class);
job.setOutputFormatClass(FilterOutputFormat.class);
FileInputFormat.setInputPaths(job, new Path("d:\\Hadoop_test"));
FileOutputFormat.setOutputPath(job, new Path("d:\\Hadoop_test_out"));
boolean b = job.waitForCompletion(true);
System.exit(b ? 0 : 1);
}
}
输出结果