首先我们阅读以下源码,类名是FileInputFormat.class
public List getSplits(JobContext job) throws IOException {
Stopwatch sw = (new Stopwatch()).start();
long minSize = Math.max(this.getFormatMinSplitSize(), getMinSplitSize(job));
long maxSize = getMaxSplitSize(job);
List splits = new ArrayList();
List files = this.listStatus(job);
Iterator i$ = files.iterator();
while(true) {
while(true) {
while(i$.hasNext()) {
FileStatus file = (FileStatus)i$.next();
Path path = file.getPath();
long length = file.getLen();
if (length != 0L) {
BlockLocation[] blkLocations;
if (file instanceof LocatedFileStatus) {
blkLocations = ((LocatedFileStatus)file).getBlockLocations();
} else {
FileSystem fs = path.getFileSystem(job.getConfiguration());
blkLocations = fs.getFileBlockLocations(file, 0L, length);
}
if (this.isSplitable(job, path)) {
long blockSize = file.getBlockSize();
long splitSize = this.computeSplitSize(blockSize, minSize, maxSize);
long bytesRemaining;
int blkIndex;
for(bytesRemaining = length; (double)bytesRemaining / (double)splitSize > 1.1D; bytesRemaining -= splitSize) {
blkIndex = this.getBlockIndex(blkLocations, length - bytesRemaining);
splits.add(this.makeSplit(path, length - bytesRemaining, splitSize, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts()));
}
if (bytesRemaining != 0L) {
blkIndex = this.getBlockIndex(blkLocations, length - bytesRemaining);
splits.add(this.makeSplit(path, length - bytesRemaining, bytesRemaining, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts()));
}
} else {
splits.add(this.makeSplit(path, 0L, length, blkLocations[0].getHosts(), blkLocations[0].getCachedHosts()));
}
} else {
splits.add(this.makeSplit(path, 0L, length, new String[0]));
}
}
job.getConfiguration().setLong("mapreduce.input.fileinputformat.numinputfiles", (long)files.size());
sw.stop();
if (LOG.isDebugEnabled()) {
LOG.debug("Total # of splits generated by getSplits: " + splits.size() + ", TimeTaken: " + sw.elapsedMillis());
}
return splits;
}
}
}
根据源代码而知:
max(minSize, min(maxSize,blockSize))
min(maxSize,blockSize)取maxSize,blockSize之间的最小值
max(minSize, min())取minSize, min()之间的最大值
blockSize=128MB
所以增加切片大小有要调整 min(maxSize,blockSize)中maxSize值,减小切片大小调整minSize值.
hadooop提供了一个设置map个数的参数mapred.map.tasks,我们可以通过这个参数来控制map的个数。但是通过这种方式设置map的个数,并不是每次都有效的。原因是mapred.map.tasks只是一个hadoop的参考数值,最终map的个数,还取决于其他的因素。
在设置map个数的时候,可以简单的总结为以下几点:
(1)如果想增加map个数,则设置mapred.map.tasks 为一个较大的值。
(2)如果想减小map个数,则设置mapred.min.split.size 为一个较大的值。
(3)如果输入中有很多小文件,依然想减少map个数,则需要将小文件merger为大文件,然后使用准则2.
(006)Hadoop基础之job的文件split计算法则