点击上方蓝色字体,选择“设为星标”
回复”面试“获取更多惊喜
问题是这样的:
HDFS上存储了一个大小10G不可分割压缩格式的文件(gzip格式),当有一个mr任务去读取这个文件的时候会产生多少个map task?spark去读取这种不可分割格式的大文件时是怎么处理的呢?
关于这个问题,大家应该都看过这个:
Hadoop所支持的几种压缩格式
gzip文件最大的特点在于:不可分割。
OK,我们知道gzip不可分割了。那么一个10G的gzip文件在HDFS是怎么存储的呢?
首先,一个10G的gzip文件在HDFS是放在一个DataNode上,但是blocks=ceil(10G/128M),副本还是3份(hadoop2.0 默认),因为gzip不可分割。
意思就是,这个gzip文件会被存储在一个DataNode上,但是占用的block数量还是 10G/每个block的大小(假设是128M),并且向上取整。
其次,MapReduce在读gzip文件的时候要指定解压方法,就是GzipCodec。然后用InputStream方法去读,MapTask的个数和读取一般文件的个数是一样的。
关于Hadoop Maptask个数,有一个计算公式。代码逻辑和计算公式如下:
作业从JobClient端的submitJobInternal()方法提交作业的同时,调用InputFormat接口的getSplits()方法来创建split。默认是使用InputFormat的子类FileInputFormat来计算分片,而split的默认实现为FileSplit(其父接口为InputSplit)。这里要注意,split只是逻辑上的概念,并不对文件做实际的切分。一个split记录了一个Map Task要处理的文件区间,所以分片要记录其对应的文件偏移量以及长度等。每个split由一个Map Task来处理,所以有多少split,就有多少Map Task。下面着重分析这个方法:
public List getSplits(JobContext job) throws IOException {//getFormatMinSplitSize():始终返回1//getMinSplitSize(job):获取” mapred.min.split.size”的值,默认为1long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));//getMaxSplitSize(job):获取"mapred.max.split.size"的值,//默认配置文件中并没有这一项,所以其默认值为” Long.MAX_VALUE”,即2^63 – 1long maxSize = getMaxSplitSize(job);// generate splitsList splits = new ArrayList();Listfiles = listStatus(job);for (FileStatus file: files) {Path path = file.getPath();FileSystem fs = path.getFileSystem(job.getConfiguration());long length = file.getLen();BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);if ((length != 0) && isSplitable(job, path)) {long blockSize = file.getBlockSize();//计算split大小long splitSize = computeSplitSize(blockSize, minSize, maxSize);//计算split个数long bytesRemaining = length; //bytesRemaining表示剩余字节数while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { //SPLIT_SLOP=1.1int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);splits.add(new FileSplit(path, length-bytesRemaining, splitSize,blkLocations[blkIndex].getHosts()));bytesRemaining -= splitSize;}if (bytesRemaining != 0) {splits.add(new FileSplit(path, length-bytesRemaining, bytesRemaining,blkLocations[blkLocations.length-1].getHosts()));}} else if (length != 0) {splits.add(new FileSplit(path, 0, length, blkLocations[0].getHosts()));} else {//Create empty hosts array for zero length filessplits.add(new FileSplit(path, 0, length, new String[0]));}}// Save the number of input files in the job-confjob.getConfiguration().setLong(NUM_INPUT_FILES, files.size());LOG.debug("Total # of splits: " + splits.size());return splits;}
首先计算分片的下限和上限:minSize和maxSize,具体的过程在注释中已经说清楚了。接下来用这两个值再加上blockSize来计算实际的split大小,过程也很简单,具体代码如下:
protected long computeSplitSize(long blockSize, long minSize, long maxSize) {return Math.max(minSize, Math.min(maxSize, blockSize));
}
接下来就是计算实际的分片个数了。针对每个输入文件,计算input split的个数。while循环的含义如下:
文件剩余字节数/splitSize > 1.1,创建一个split,这个split的字节数=splitSize,文件剩余字节数=文件大小 - splitSize;
文件剩余字节数/splitSize <1.1&#xff0c;剩余的部分全都作为一个split(这主要是考虑到&#xff0c;不用为剩余的很少的字节数一些启动一个Map Task)
我们发现&#xff0c;在默认配置下&#xff0c;split大小和block大小是相同的。这是不是为了防止这种情况&#xff1a;
一个split如果对应的多个block&#xff0c;若这些block大多不在本地&#xff0c;则会降低Map Task的本地性&#xff0c;降低效率。到这里split的划分就介绍完了&#xff0c;但是有两个问题需要考虑&#xff1a;
如果一个record跨越了两个block该怎么办&#xff1f;
这个可以看到&#xff0c;在Map Task读取block的时候&#xff0c;每次是读取一行的&#xff0c;如果发现块的开头不是上一个文件的结束&#xff0c;那么抛弃第一条record&#xff0c;因为这个record会被上一个block对应的Map Task来处理。那么&#xff0c;第二个问题来了&#xff1a;
上一个block对应的Map Task并没有最后一条完整的record&#xff0c;它又该怎么办&#xff1f;
一般来说&#xff0c;Map Task在读block的时候都会多读后续的几个block&#xff0c;以处理上面的这种情况。
最后&#xff0c;Spark在读取gzip这种不可分割文件的时候&#xff0c;就退化成从单个task读取、单个core执行任务&#xff0c;很容易产生性能瓶颈。你可以做个测试。在spark的页面上可以看到效果。
基于以上所以&#xff0c;gzip格式最好提前进行分割成小文件或者换格式&#xff0c;因多个文件可以并行读取。另一个办法是read文件后调用repartition操作强制将读取多数据重新均匀分配到不同的executor上&#xff0c;但这个操作会导致大量单节点性能占用&#xff0c;因此该格式建议不在spark上使用。
gzip问题这么多&#xff0c;常用的场景我能想到的只有一个&#xff0c;就是每天的日志文件。单个日志文件不太大&#xff0c;百兆以内。其他的场景暂时想不到。
八千里路云和月 | 从零到大数据专家学习路径指南
我们在学习Flink的时候&#xff0c;到底在学习什么&#xff1f;
193篇文章暴揍Flink&#xff0c;这个合集你需要关注一下
Flink生产环境TOP难题与优化&#xff0c;阿里巴巴藏经阁YYDS
Flink CDC我吃定了耶稣也留不住他&#xff01;| Flink CDC线上问题小盘点
我们在学习Spark的时候&#xff0c;到底在学习什么&#xff1f;
在所有Spark模块中&#xff0c;我愿称SparkSQL为最强&#xff01;
硬刚Hive | 4万字基础调优面试小总结
数据治理方法论和实践小百科全书
标签体系下的用户画像建设小指南
4万字长文 | ClickHouse基础&实践&调优全视角解析
【面试&个人成长】2021年过半&#xff0c;社招和校招的经验之谈
大数据方向另一个十年开启 |《硬刚系列》第一版完结
我写过的关于成长/面试/职场进阶的文章
当我们在学习Hive的时候在学习什么&#xff1f;「硬刚Hive续集」
你好&#xff0c;我是王知无&#xff0c;一个大数据领域的硬核原创作者。
做过后端架构、数据中间件、数据平台&架构、算法工程化。
专注大数据领域实时动态&技术提升&个人成长&职场进阶&#xff0c;欢迎关注。