作者:坚强萝卜_854 | 来源:互联网 | 2023-09-23 16:05
1、InputFormat
运行MapReduce程序时,输入的文件格式包括:基于行的日志文件、二进制格式文件、数据库表等。那么,针对不同的数据类型,MapReduce是如何读取这些数据的呢?
InputFormat是MapReduce框架用来读取数据的类。
InputFormat常见子类包括:
- TextInputFormat (普通文本文件,MR框架默认的读取实现类型)
- KeyValueTextInputFormat(读取一行文本数据按照指定分隔符,把数据封装为kv类型)
- NLineInputF ormat(读取数据按照行数进行划分分片)
- CombineTextInputFormat(合并小文件,避免启动过多MapTask任务)
- 自定义InputFormat
CombineTextInputFormat案例
MR框架默认的TextInputFormat切片机制按文件划分切片,文件无论多小,都是单独一个切片,然后由一个MapTask处理,如果有大量小文件,就对应的会生成并启动大量的 MapTask,而每个MapTask处理的数据量很小大量时间浪费在初始化资源启动收回等阶段,这种方式导致资源利用率不高。
CombineTextInputFormat用于小文件过多的场景,它可以将多个小文件从逻辑上划分成一个切片,这样多个小文件就可以交给一个MapTask处理,提高资源利用率。
需求:
将输入数据中的多个小文件合并为一个切片处理,运行WordCount案例,准备多个小文件
具体使用方式:
// 如果不设置InputFormat,它默认用的是TextInputFormat.class
job.setInputFormatClass(CombineTextInputFormat.class);
//虚拟存储切片最大值设置4m
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304); // 4m, 单位是kb
验证切片数量的变化!!
CombineTextInputFormat切片原理
切片生成过程分为两部分:虚拟存储过程和切片过程
假设设置setMaxInputSplitSize值为4M
四个小文件:1.txt -->2M ;2.txt-->7M;3.txt-->0.3M;4.txt--->8.2M
虚拟存储过程:把输入目录下所有文件大小,依次和设置的setMaxInputSplitSize值进行比较,如果不大于设置的最大值,逻辑上划分一个块。如果输入文件大于设置的最大值且大于两倍,那么以最大值切割一块;当剩余数据大小超过设置的最大值且不大于最大值2倍,此时将文件均分成2个虚拟存储块(防止出现太小切片)。
比如setMaxInputSplitSize值为4M,输入文件大小为8.02M,则先逻辑上分出一个4M的块。剩余的大小为4.02M,如果按照4M逻辑划分,就会出现0.02M的非常小的虚拟存储文件,所以将剩余的4.02M文件切分成(2.01M和2.01M)两个文件。
- 1.txt-->2M;2M<4M;一个块;
- 2.txt-->7M;7M>4M,但是不大于两倍,均匀分成两块;两块:每块3.5M;
- 3.txt-->0.3M;0.3<4M ,0.3M<4M ,一个块
- 4.txt-->8.2M;大于最大值且大于两倍;一个4M的块,剩余4.2M分成两块,每块2.1M
- 所有块信息:2M,3.5M,3.5M,0.3M,4M,2.1M,2.1M 共7个虚拟存储块。
切片过程:
- 判断虚拟存储的文件大小是否大于setMaxInputSplitSize值,大于等于则单独形成一个切片。
- 如果不大于则跟下一个虚拟存储文件进行合并,共同形成一个切片。
- 按照之前输入文件:有4个小文件大小分别为2M、7M、0.3M以及8.2M这四个小文件,则虚拟存储之后形成7个文件块,大小分别为:2M,3.5M,3.5M,0.3M,4M,2.1M,2.1M
最终会形成3个切片,大小分别为:
(2+3.5)M,(3.5+0.3+4)M,(2.1+2.1)M
注意:虚拟存储切片最大值设置最好根据实际的小文件大小情况来设置具体的值。
2、自定义InputFormat
HDFS还是MapReduce,在处理小文件时效率都非常低,但又难免面临处理大量小文件的场景,此时,就需要有相应解决方案。可以自定义InputFormat实现小文件的合并。
需求:
将多个小文件合并成一个SequenceFile文件(SequenceFile文件是Hadoop用来存储二进制形式的key-value对的文件格式),SequenceFile里面存储着多个文件,存储的形式为文件路径+名称为key,文件内容为value。
结果:得到一个合并了多个小文件的SequenceFile文件
整体思路:
- 定义一个类继承FileInputFormat(TextInputFormat的父类就是FileInputFormat)
- 重写isSplitable()指定为不可切分;重写createRecordReader()方法,创建自己的RecorderReader对象(实现数据自定义读取)
- 改变默认读取数据方式,实现一次读取一个完整文件作为kv输出;
- Driver指定使用的InputFormat类型
代码参考:
自定义InputFormat
package com.lagou.mr.sequence;import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.io.*;import java.io.IOException;// 自定义inputFormat读取多个小文件合并为一个SequenceFile文件
// SequenceFile文件中以kv形式存储文件,key --> 文件路径+文件名称, value --> 文件的整个内容
// TextInputFormat中泛型是LongWritable:文本的偏移量, Text:一行文本内容:指定当前inputFormat的输出数据类型
// 自定义inputFormat:key-->文件路径+名称,value-->整个文件内容
public class CustomInputFormat extends FileInputFormat {// 重写是否可切分@Overrideprotected boolean isSplitable(JobContext context, Path filename) {// 对于当前需求,不需要把文件切分,保证一个切片就是文件/*** 返回true允许切分* 返回false不允许切分*/return false;}// RecordReader就是用来读取数据的对象@Overridepublic RecordReader createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {CustomRecordReader recordReader = new CustomRecordReader();// 调用recordReader的初始化方法recordReader.initialize(split, context);return recordReader;}
}
自定义RecordReader
package com.lagou.mr.sequence;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;import java.io.IOException;// 附则读取数据,一次读取整个文件内容,封装为kv输出
public class CustomRecordReader extends RecordReader {private FileSplit split;// hadoop配置文件对象private Configuration conf;// 定义key,value的成员变量private Text key = new Text();private BytesWritable value = new BytesWritable();// 初始化方法,把切片以及上下文提升为全局@Overridepublic void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {this.split = (FileSplit) split;cOnf= context.getConfiguration();}private Boolean flag = true;// 用来读取数据的方法@Overridepublic boolean nextKeyValue() throws IOException, InterruptedException {// 对于当前split来说只需要读取一次即可,因为一次就把整个文件全部读取了if (flag) {// 准备一个数组存放读取到的数据,数据大小是多少?byte[] cOntent= new byte[(int) split.getLength()];Path path = split.getPath(); // 获取切片的path信息FileSystem fs = path.getFileSystem(conf); // 获取到文件系统对象FSDataInputStream fis = fs.open(path); // 获取到输入流IOUtils.readFully(fis, content, 0, content.length); // 读取数据并把数据放在数组中// 封装key和valuekey.set(path.toString());value.set(content, 0, content.length);// 关闭流IOUtils.closeStream(fis);// 把再次读取的开关置为falseflag = false;return true;}return false;}// 获取到key@Overridepublic Text getCurrentKey() throws IOException, InterruptedException {return key;}// 获取到value@Overridepublic BytesWritable getCurrentValue() throws IOException, InterruptedException {return value;}// 获取进度@Overridepublic float getProgress() throws IOException, InterruptedException {return 0;}// 关闭资源@Overridepublic void close() throws IOException {}
}
Mapper
package com.lagou.mr.sequence;import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;// Text:代表的是一个文件的path+名称,BytesWritable:代表的是一个文件的内容
public class SequenceFileMapper extends Mapper {@Overrideprotected void map(Text key, BytesWritable value, Mapper.Context context) throws IOException, InterruptedException {//读取内容直接输出context.write(key, value);}
}
Reducer
package com.lagou.mr.sequence;import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class SequenceFileReducer extends Reducer {@Overrideprotected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {//输出value值,其中只有一个BytesWritable 所以直接next取出即可context.write(key, values.iterator().next());}
}
Driver
package com.lagou.mr.sequence;import com.lagou.mr.wc.WordCountDriver;
import com.lagou.mr.wc.WordCountMapper;
import com.lagou.mr.wc.WordCountReducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class SequenceDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {/*1. 获取配置文件对象,获取job对象实例2. 指定程序jar的本地路径3. 指定Mapper/Reducer类4. 指定Mapper输出的kv数据类型5. 指定最终输出的kv数据类型6. 指定job处理的原始数据路径7. 指定job输出结果路径8. 提交作业*/
// 1. 获取配置文件对象,获取job对象实例final Configuration cOnf= new Configuration();final Job job = Job.getInstance(conf, "SequenceDriver");
// 2. 指定程序jar的本地路径job.setJarByClass(SequenceDriver.class);
// 3. 指定Mapper/Reducer类job.setMapperClass(SequenceFileMapper.class);job.setReducerClass(SequenceFileReducer.class);
// 4. 指定Mapper输出的kv数据类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(BytesWritable.class);
// 5. 指定最终输出的kv数据类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(BytesWritable.class);//设置使用自定义InputFormat读取数据job.setInputFormatClass(CustomInputFormat.class);FileInputFormat.setInputPaths(job, new Path("D:\\data\\小文件")); //指定读取数据的原始路径
// 7. 指定job输出结果路径FileOutputFormat.setOutputPath(job, new Path("D:\\out_file\\out_1")); //指定结果数据输出路径
// 8. 提交作业final boolean flag = job.waitForCompletion(true);//jvm退出:正常退出0,非0值则是错误退出System.exit(flag ? 0 : 1);}
}