热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

HadoopCompression解压缩架构的学习

Hadoop的Compressor解压缩模块是HadoopCommonIO模块中又一大的模块。虽然说在现实生活中,我们使用压缩工具等的使用场景并不是那么多。或许在我们

           Hadoop的Compressor解压缩模块是Hadoop Common IO模块中又一大的模块。虽然说在现实生活中,我们使用压缩工具等的使用场景并不是那么多。或许在我们潜在的意识里,压缩的概念就停留在一些压缩种类上,zip,gzip,bizp等等不同类型的压缩,分别具有不同的压缩比,效率比等等。也许当你看完本篇本人对于Hadoop的压缩框架的学习之后,你一定会有所收获。

          压缩对于数据的传输室至关重要的,同样对于存储来说也大大的提高了效率,在Hadoop系统中目前支持的压缩算法包括1,gzip 2.bzip 3.snappy4.default系统默认算法。这些压缩工具的体现都是通过一个叫CompressionCodec的对象来体现的。先来看看这个类:

/*** This class encapsulates a streaming compression/decompression pair.*/
public interface CompressionCodec {CompressionOutputStream createOutputStream(OutputStream out) throws IOException;CompressionOutputStream createOutputStream(OutputStream out, Compressor compressor) throws IOException;Class getCompressorType();Compressor createCompressor();CompressionInputStream createInputStream(InputStream in) throws IOException;CompressionInputStream createInputStream(InputStream in, Decompressor decompressor) throws IOException;Class getDecompressorType();Decompressor createDecompressor();String getDefaultExtension();
}
这是一个接口,里面定义了很多的方法,我主要把他归为2类,

1个是Compressor和Decompressor解压缩的构造,

1个是CompressionInputStream,CompressionOutputStream压缩输入输出流。

其实2者很像,因为压缩输入输出流的很多操作也是基于上面的压缩器,解压器的操作实现的。具体压缩算法的表现都是继承与这个基类。看一下比较庞大的结构图:


可以看到在每种Codec子类中,都会有解压缩器的实现和压缩输入输出流的构造。然后把这种压缩算法类保存在了一个Codec的工厂中,通过统一的接口调用。

public class CompressionCodecFactory {public static final Log LOG =LogFactory.getLog(CompressionCodecFactory.class.getName());/*** A map from the reversed filename suffixes to the codecs.* This is probably overkill, because the maps should be small, but it * automatically supports finding the longest matching suffix.* 所有的解压缩编码类放入 codecs Map图中,CompressionCodec是一个基类,* 允许添加上其所继承的子类*/private SortedMap codecs = null;初始化的时候,可以根据配置加入自己希望的压缩算法种类:

/*** Find the codecs specified in the config value io.compression.codecs * and register them. Defaults to gzip and zip.* 根据配置初始化压缩编码工厂,默认添加的是gzip和zip编码类*/public CompressionCodecFactory(Configuration conf) {codecs = new TreeMap();List> codecClasses = getCodecClasses(conf);if (codecClasses == null) {//如果没有编码类的设置,则加入gzip,defaultCodeaddCodec(new GzipCodec());addCodec(new DefaultCodec()); } else {Iterator> itr = codecClasses.iterator();while (itr.hasNext()) {CompressionCodec codec = ReflectionUtils.newInstance(itr.next(), conf);addCodec(codec); }}}然后是从工厂中取出压缩算法工具的时候了,通过名字获取,其实这种模式类似于享受 模式,达到对象的复用效果了。

/*** Find the relevant compression codec for the given file based on its* filename suffix.* @param file the filename to check* @return the codec object*/public CompressionCodec getCodec(Path file) {CompressionCodec result = null;if (codecs != null) {String filename = file.getName();String reversedFilename = new StringBuffer(filename).reverse().toString();SortedMap subMap = codecs.headMap(reversedFilename);if (!subMap.isEmpty()) {String potentialSuffix = subMap.lastKey();//根据比对名字,从codecs Map中取出对应的CompressionCodecif (reversedFilename.startsWith(potentialSuffix)) {result = codecs.get(potentialSuffix);}}}return result;}         下面应该仔细分析的是具体解压缩怎么实现的步骤,我从3种算法中选了zlib/gzip算法作为底层实现例子。他们都先继承了下面这个接口:

/*** Specification of a stream-based 'compressor' which can be * plugged into a {@link CompressionOutputStream} to compress data.* This is modelled after {@link java.util.zip.Deflater}* */
public interface Compressor {/*** Sets input data for compression. * This should be called whenever #needsInput() returns * true indicating that more input data is required.* 输入待压缩的数据* * @param b Input data* @param off Start offset* @param len Length*/public void setInput(byte[] b, int off, int len);/*** Returns true if the input data buffer is empty and * #setInput() should be called to provide more input. * 判断缓冲区中能否再输入数据* * @return true if the input data buffer is empty and * #setInput() should be called in order to provide more input.*/public boolean needsInput();/*** Sets preset dictionary for compression. A preset dictionary * is used when the history buffer can be predetermined. ** @param b Dictionary data bytes* @param off Start offset* @param len Length*/public void setDictionary(byte[] b, int off, int len);/*** Return number of uncompressed bytes input so far.* 返回未压缩的数据的字节长度*/public long getBytesRead();/*** Return number of compressed bytes output so far.* 返回已压缩字节的大小*/public long getBytesWritten();/*** When called, indicates that compression should end* with the current contents of the input buffer.* 代表输入的结束*/public void finish();/*** Returns true if the end of the compressed * data output stream has been reached.* @return true if the end of the compressed* data output stream has been reached.* 判断压缩器中还有没有未取出的压缩后的数据*/public boolean finished();/*** Fills specified buffer with compressed data. Returns actual number* of bytes of compressed data. A return value of 0 indicates that* needsInput() should be called in order to determine if more input* data is required.* 压缩处理方法,将输入的压缩数据压缩处理后输出到传入的输出缓冲中* * @param b Buffer for the compressed data* @param off Start offset of the data* @param len Size of the buffer* @return The actual number of bytes of compressed data.*/public int compress(byte[] b, int off, int len) throws IOException;/*** Resets compressor so that a new set of input data can be processed.* 压缩器重置方法*/public void reset();/*** Closes the compressor and discards any unprocessed input.* 关闭压缩器,一般在结束的时候调用*/public void end();/*** Prepare the compressor to be used in a new stream with settings defined in* the given Configuration* 根据配置重新初始化压缩器的实现* * @param conf Configuration from which new setting are fetched*/public void reinit(Configuration conf);
}
里面的每一个方法都很关键,因为后面压缩,解压操作都是基于上面的函数实现。在这里,先看一下zlib中的一些关于压缩的变量的设置:

public class ZlibCompressor implements Compressor {//默认的缓冲区64kprivate static final int DEFAULT_DIRECT_BUFFER_SIZE = 64*1024;// HACK - Use this as a global lock in the JNI layerprivate static Class clazz = ZlibCompressor.class;private long stream;/*** 定义了压缩水平,可以是无损压缩,可以是追求效率的快速的压缩等等类型*/private CompressionLevel level;/*** 定义了压缩策略,有比如常见的哈弗曼编码方式,filterd方式,或者其他*/private CompressionStrategy strategy;/*** 定于了压缩的头部格式信息,比如一般前面都会有个checksum校验和的信息,当然可以选择NO_HEAEDER*/private final CompressionHeader windowBits;还有缓冲区的设置,定义了未压缩缓冲区的设置,已压缩缓冲区等等:

private int directBufferSize;private byte[] userBuf = null;private int userBufOff = 0, userBufLen = 0;//未压缩的缓冲private Buffer uncompressedDirectBuf = null;private int uncompressedDirectBufOff = 0, uncompressedDirectBufLen = 0;//已压缩缓冲数据private Buffer compressedDirectBuf = null;//输入结束标识,压缩结束标识private boolean finish, finished;
默认zlib压缩器的构造:

/*** Creates a new compressor with the default compression level.* Compressed data will be generated in ZLIB format.* 默认构造出压缩器,压缩水平,策略等都是默认值*/public ZlibCompressor() {this(CompressionLevel.DEFAULT_COMPRESSION,CompressionStrategy.DEFAULT_STRATEGY,CompressionHeader.DEFAULT_HEADER,DEFAULT_DIRECT_BUFFER_SIZE);}最后悔跑到一个大的重载函数上:

public ZlibCompressor(CompressionLevel level, CompressionStrategy strategy, CompressionHeader header, int directBufferSize) {this.level = level;this.strategy = strategy;this.windowBits = header;stream = init(this.level.compressionLevel(), this.strategy.compressionStrategy(), this.windowBits.windowBits());//设置直接缓冲区的大小为64*1024个字节this.directBufferSize = directBufferSize;//申请2个一样大小的64k的缓冲区uncompressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize);compressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize);//把压缩缓冲的位置挪到最后面compressedDirectBuf.position(directBufferSize);}上面的关键是定义了2个64k的缓冲区,我们初步可以断定,压缩的实现过程一定是从用户的输入,放入uncompressedDirectBuf,调用Compress压缩方法后转入compressedDirectBuf,最后拷贝到外界的缓冲中。

下面是一个压缩器中的压缩步骤

1.调用setInput()向里面输入待压缩数据

2.调用needsInput()判断是否能够输入,如果不能输入,则调用Compress取出已压缩的数据,才能再次输入数据
3.执行上述2步骤直到全部输入数据,调用finish(),代表输入结束

4.最后调用Compress连续取出压缩好的数据直到调用finished()方法判断已压缩缓冲中无数据了。
下面是一个流程图

流程搞清楚了之后,我们才能知道里面到底是怎么做的。首先setInput()方法,

public synchronized void setInput(byte[] b, int off, int len) {if (b&#61;&#61; null) {throw new NullPointerException();}if (off <0 || len <0 || off > b.length - len) {throw new ArrayIndexOutOfBoundsException();}//设置用户缓冲数据变量this.userBuf &#61; b;this.userBufOff &#61; off;this.userBufLen &#61; len;setInputFromSavedData();// Reinitialize zlib&#39;s output direct buffer //重新初始化已压缩缓冲的位置compressedDirectBuf.limit(directBufferSize);compressedDirectBuf.position(directBufferSize);}里面关键的方法是setInputFromSavedData()

synchronized void setInputFromSavedData() {uncompressedDirectBufOff &#61; 0;//更新未压缩缓冲数据的长度为用户buf的长度uncompressedDirectBufLen &#61; userBufLen;if (uncompressedDirectBufLen > directBufferSize) {//如果超过最大值&#xff0c;则变为最大值uncompressedDirectBufLen &#61; directBufferSize;}// Reinitialize zlib&#39;s input direct bufferuncompressedDirectBuf.rewind();//将用户数据存入uncompressedDirectBuf((ByteBuffer)uncompressedDirectBuf).put(userBuf, userBufOff, uncompressedDirectBufLen);// Note how much data is being fed to zlib//加上用户缓冲区的偏移量userBufOff &#43;&#61; uncompressedDirectBufLen;//减少用户缓冲区的数据长度userBufLen -&#61; uncompressedDirectBufLen;}就这样把用户数据放入了uncompressedDirectBuf中了&#xff0c;但是有最大缓冲值的限制。下面看看needsInput()判断是否能输入的判断:

public boolean needsInput() {// Consume remaining compressed data?if (compressedDirectBuf.remaining() > 0) {//如果已压缩缓冲区中有数据&#xff0c;必须先取出其中的数据&#xff0c;才能输入return false;}// Check if zlib has consumed all inputif (uncompressedDirectBufLen <&#61; 0) {//判断未压缩缓冲的大小是否小于等于0// Check if we have consumed all user-inputif (userBufLen <&#61; 0) {//判断之前用户的缓冲数据都已经出来完毕了return true;} else {setInputFromSavedData();}}return false;}
里面经过了很多层的判断&#xff0c;比如第一个过滤条件就是首先取出压缩好后的数据&#xff0c;才能允许再次输入&#xff0c;而且要解决掉上次的输入数据&#xff0c;我看完这段代码之后&#xff0c;跟原先的根据未压缩缓冲区是否满不满来判断能否输入的设想完全不一样&#xff0c;因为某些资料就是这么说的&#xff0c;一想还挺有道理的&#xff0c;看完代码之后才发现真理不是这样的。如果是判断未压缩缓冲区是否满判断&#xff0c;第二个判断直接就不符号&#xff0c;返回false了。

         下面是finish()代表输入的结束标记&#xff0c;操作很简单&#xff1a;

public synchronized void finish() {//输入结束标识改为truefinish &#61; true;}        下面是关键的Compress压缩操作的执行了:

public synchronized int compress(byte[] b, int off, int len) throws IOException {if (b &#61;&#61; null) {throw new NullPointerException();}if (off <0 || len <0 || off > b.length - len) {throw new ArrayIndexOutOfBoundsException();}int n &#61; 0;// Check if there is compressed data//判断已压缩缓冲区中是否还有数据n &#61; compressedDirectBuf.remaining();if (n > 0) {n &#61; Math.min(n, len);//取出放入传入的输出缓冲中((ByteBuffer)compressedDirectBuf).get(b, off, n);return n;}// Re-initialize the zlib&#39;s output direct buffer//如果已压缩缓冲中没有数据了&#xff0c;重新设置compressedDirectBufcompressedDirectBuf.rewind();compressedDirectBuf.limit(directBufferSize);// Compress data//调用压缩数据的native方法,最后未压缩的数据就会被压缩后转入已压缩的缓冲中n &#61; deflateBytesDirect();compressedDirectBuf.limit(n);// Get atmost &#39;len&#39; bytesn &#61; Math.min(n, len);//将此时压缩好后的数据再从已压缩缓冲中取出((ByteBuffer)compressedDirectBuf).get(b, off, n);return n;}操作比较多&#xff0c;归归类

1.如果已压缩缓冲区中还有数据&#xff0c;先取出compressedDirectBuf到输出缓冲中&#xff0c;就是传入的b中&#xff0c;操作结束

2.如果已压缩缓冲区中没有数据&#xff0c;则会deflateBytesDirect()调用方法压缩数据&#xff0c;然后再执行1操作&#xff0c;取出缓冲数据&#xff0c;操作结束

我们找到关键的deflateBytesDirect方法&#xff0c;发现如下:

private native static void initIDs();private native static long init(int level, int strategy, int windowBits);private native static void setDictionary(long strm, byte[] b, int off,int len);private native int deflateBytesDirect();private native static long getBytesRead(long strm);private native static long getBytesWritten(long strm);private native static void reset(long strm);private native static void end(long strm);

发现了一堆的native的方法&#xff0c;也就是说&#xff0c;这些更加底层的实现是通过JNI的方式被调用的&#xff0c;但是我们基本能猜到&#xff0c;在这个方法里就是2个缓冲区方法的压缩转移处理。最后一个方法是判断是否结束方法

public synchronized boolean finished() {// Check if &#39;zlib&#39; says its &#39;finished&#39; and// all compressed data has been consumed//判断压缩过程是佛结束&#xff0c;判断已压缩缓冲中是否还有未取出的数据return (finished && compressedDirectBuf.remaining() &#61;&#61; 0);}

以上就是压缩操作的主要步骤&#xff0c;解压操作思路类似&#xff0c;不展开分析了。




推荐阅读
  • SpringBoot uri统一权限管理的实现方法及步骤详解
    本文详细介绍了SpringBoot中实现uri统一权限管理的方法,包括表结构定义、自动统计URI并自动删除脏数据、程序启动加载等步骤。通过该方法可以提高系统的安全性,实现对系统任意接口的权限拦截验证。 ... [详细]
  • 本文详细介绍了Java中vector的使用方法和相关知识,包括vector类的功能、构造方法和使用注意事项。通过使用vector类,可以方便地实现动态数组的功能,并且可以随意插入不同类型的对象,进行查找、插入和删除操作。这篇文章对于需要频繁进行查找、插入和删除操作的情况下,使用vector类是一个很好的选择。 ... [详细]
  • Java学习笔记之面向对象编程(OOP)
    本文介绍了Java学习笔记中的面向对象编程(OOP)内容,包括OOP的三大特性(封装、继承、多态)和五大原则(单一职责原则、开放封闭原则、里式替换原则、依赖倒置原则)。通过学习OOP,可以提高代码复用性、拓展性和安全性。 ... [详细]
  • 先看官方文档TheJavaTutorialshavebeenwrittenforJDK8.Examplesandpracticesdescribedinthispagedontta ... [详细]
  • Week04面向对象设计与继承学习总结及作业要求
    本文总结了Week04面向对象设计与继承的重要知识点,包括对象、类、封装性、静态属性、静态方法、重载、继承和多态等。同时,还介绍了私有构造函数在类外部无法被调用、static不能访问非静态属性以及该类实例可以共享类里的static属性等内容。此外,还提到了作业要求,包括讲述一个在网上商城购物或在班级博客进行学习的故事,并使用Markdown的加粗标记和语句块标记标注关键名词和动词。最后,还提到了参考资料中关于UML类图如何绘制的范例。 ... [详细]
  • ***byte(字节)根据长度转成kb(千字节)和mb(兆字节)**parambytes*return*publicstaticStringbytes2kb(longbytes){ ... [详细]
  • 本文介绍了解决java开源项目apache commons email简单使用报错的方法,包括使用正确的JAR包和正确的代码配置,以及相关参数的设置。详细介绍了如何使用apache commons email发送邮件。 ... [详细]
  • 本文介绍了关于Java异常的八大常见问题,包括异常管理的最佳做法、在try块中定义的变量不能用于catch或finally的原因以及为什么Double.parseDouble(null)和Integer.parseInt(null)会抛出不同的异常。同时指出这些问题是由于不同的开发人员开发所导致的,不值得过多思考。 ... [详细]
  • Java容器中的compareto方法排序原理解析
    本文从源码解析Java容器中的compareto方法的排序原理,讲解了在使用数组存储数据时的限制以及存储效率的问题。同时提到了Redis的五大数据结构和list、set等知识点,回忆了作者大学时代的Java学习经历。文章以作者做的思维导图作为目录,展示了整个讲解过程。 ... [详细]
  • 本文讨论了如何优化解决hdu 1003 java题目的动态规划方法,通过分析加法规则和最大和的性质,提出了一种优化的思路。具体方法是,当从1加到n为负时,即sum(1,n)sum(n,s),可以继续加法计算。同时,还考虑了两种特殊情况:都是负数的情况和有0的情况。最后,通过使用Scanner类来获取输入数据。 ... [详细]
  • 本文讨论了Kotlin中扩展函数的一些惯用用法以及其合理性。作者认为在某些情况下,定义扩展函数没有意义,但官方的编码约定支持这种方式。文章还介绍了在类之外定义扩展函数的具体用法,并讨论了避免使用扩展函数的边缘情况。作者提出了对于扩展函数的合理性的质疑,并给出了自己的反驳。最后,文章强调了在编写Kotlin代码时可以自由地使用扩展函数的重要性。 ... [详细]
  • 纠正网上的错误:自定义一个类叫java.lang.System/String的方法
    本文纠正了网上关于自定义一个类叫java.lang.System/String的错误答案,并详细解释了为什么这种方法是错误的。作者指出,虽然双亲委托机制确实可以阻止自定义的System类被加载,但通过自定义一个特殊的类加载器,可以绕过双亲委托机制,达到自定义System类的目的。作者呼吁读者对网上的内容持怀疑态度,并带着问题来阅读文章。 ... [详细]
  • 本文整理了Java中java.lang.NoSuchMethodError.getMessage()方法的一些代码示例,展示了NoSuchMethodErr ... [详细]
  • java实现rstp格式转换使用ffmpeg实现linux命令第一步安装node.js和ffmpeg第二步搭建node.js启动websocket接收服务
    java实现rstp格式转换使用ffmpeg实现linux命令第一步安装node.js和ffmpeg第二步搭建node.js启动websocket接收服务第三步java实现 ... [详细]
  • PHP输出缓冲控制Output Control系列函数详解【PHP】
    后端开发|php教程PHP,输出缓冲,Output,Control后端开发-php教程概述全景网页源码,vscode如何打开c,ubuntu强制解锁,sts启动tomcat慢,sq ... [详细]
author-avatar
mobiledu2502897737
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有