/** * Generate the list of files and make them into FileSplits.* @param job the job context* @throws IOException* 切片获取的方法* InputSplit 代表文件切片对象* 返回值:List 所有文件切片的列表* 参数:JobContext job job的上下文对象*/public List getSplits(JobContext job) throws IOException {StopWatch sw = new StopWatch().start();//获取最小切片大小/*getFormatMinSplitSize() :1getMinSplitSize(job):0minSize =1*/long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));//切片最大大小long maxSize = getMaxSplitSize(job);// generate splits//最终返回的list集合List splits = new ArrayList();//获取FileInputFormat.addInputPath("")输入路径下的所有文件状态信息对象,包含块大小信息List files = listStatus(job);//循环遍历每一个文件的状态信息for (FileStatus file: files) {Path path = file.getPath();//获取文件的实际长度long length = file.getLen();if (length != 0) {BlockLocation[] blkLocations;if (file instanceof LocatedFileStatus) {blkLocations = ((LocatedFileStatus) file).getBlockLocations();} else {FileSystem fs = path.getFileSystem(job.getConfiguration());blkLocations = fs.getFileBlockLocations(file, 0, length);}//判断是否可以逻辑切片if (isSplitable(job, path)) {//获取配置的块大小,默认128Mlong blockSize = file.getBlockSize();//核心 计算每一个切片大小的/*blockSize:128MminSize:1maxSize:Long.MAX_VALUE*/long splitSize = computeSplitSize(blockSize, minSize, maxSize);//计算切片剩余大小long bytesRemaining = length;//切片/*是否还进行切分的条件SPLIT_SLOP //切片的临界值 =1.1剩余大小/128M > 1.1剩余大小>128M * 1.1 140+((double) bytesRemaining)/splitSize > SPLIT_SLOP*/while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);splits.add(makeSplit(path, length-bytesRemaining, splitSize,blkLocations[blkIndex].getHosts(),blkLocations[blkIndex].getCachedHosts()));bytesRemaining -= splitSize;}if (bytesRemaining != 0) {int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,blkLocations[blkIndex].getHosts(),blkLocations[blkIndex].getCachedHosts()));}} else { // not splitablesplits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),blkLocations[0].getCachedHosts()));}} else { //Create empty hosts array for zero length filessplits.add(makeSplit(path, 0, length, new String[0]));}}// Save the number of input files for metrics/loadgenjob.getConfiguration().setLong(NUM_INPUT_FILES, files.size());sw.stop();if (LOG.isDebugEnabled()) {LOG.debug("Total # of splits generated by getSplits: " + splits.size()+ ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS));}return splits;}
//最小切片大小 /*** Get the minimum split size* @param job the job* @return the minimum number of bytes that can be in a split* job.getConfiguration():获取job的所有配置文件信息,写到 job.xml中 * job.settJarByClass(Driver.class);jar.classDriver.class,没有配置的则使用默认值* getLong(配置属性名,默认返回值),先去配置文件中获取配置属性的值,获取到则返回,获取不到则返回默认值(参数2)。* mapreduce.input.fileinputformat.split.minsize,默认是:0* mapred-default.xml ------> return 0*r*/public static long getMinSplitSize(JobContext job) {return job.getConfiguration().getLong(SPLIT_MINSIZE, 1L);}
//最大切片大小 /*** Get the maximum split size.* @param context the job to look at.* @return the maximum number of bytes a split can include* */public static long getMaxSplitSize(JobContext context) {/*** context.getConfiguration(),获取job.xml* mapreduce.input.fileinputformat.split.maxsize,默认没有设置* return Long.MAX_VALUE*/return context.getConfiguration().getLong(SPLIT_MAXSIZE, Long.MAX_VALUE);}
//计算切片大小/*Math.max(minSize,blockSize)return blockSize 默认一个切片的大小就是一个块的大小调整切片大小大于128M ----->调整minSize小于128M ----->调整maxSize具体操作&#xff1a;//不推荐使用1&#xff09;修改配置文件&#xff1a;mapred-site.xml大于128Mmapreduce.input.fileinputformat.split.minsize130*1024*1024The minimum size chunk that map input should be splitinto. Note that some file formats may have minimum split sizes thattake priority over this setting.小于128Mmapreduce.input.fileinputformat.split.maxsize100*1024*1024The minimum size chunk that map input should be splitinto. Note that some file formats may have minimum split sizes thattake priority over this setting.2&#xff09;代码中修改//修改大于128MFileInputFormat.setMinInputSplitSize(job, size);//修改小于128MFileInputFormat.setMaxInputSplitSize(job, size);注意:1)一个文件如果不大于128M&#xff0c;这时候单独成一个切片文件大于128M&#xff0c;按照分块来2)文件切片划分的时候&#xff0c;最后一个切片有可能大于128M&#xff0c;128*1.1268M存储blk01 1-128Mblk02 129-256M 128Mblk03 257-268M 12M启动maptask时候 切片划分&#xff1a;split01 268/128>1.1 1-128剩余 268-128&#61;140Msplit02 140/128<1.1 不切 129-268M跨块或跨节点 比单独启动一个maptask划算*/protected long computeSplitSize(long blockSize, long minSize,long maxSize) {return Math.max(minSize, Math.min(maxSize, blockSize));}