热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

HDFS文件创建流程

文件夹的创建是一个相对简单的过程,主要是通过FileSystem中的mkdirs()方法,这个方法在DFSClient实例中调用同名方法mkdirs(),通过Hadoop本身的RPC机制调

文件夹的创建是一个相对简单的过程,主要是通过FileSystem中的mkdirs()方法,这个方法在DFSClient实例中调用同名方法mkdirs(),通过Hadoop本身的RPC机制调用Namenode的mkdirs()方法,最终这个调用PUSH到FSNameSystem的mkdirsInternal方法,这个方法主要就是检验访问权限,最后通过FSDirectory的unprotectedMkdir()方法,构建一个INodeDirectory实例添加到文件系统的目录树中。 

文件节点的创建与添加相对比较麻烦,主要步骤如下:  
FileSystem的create方法返回一个很重要的类FSDataOutputStream,这一点也比较好理解,就像java中的文件流一样,创建一个文件写入流对文件内容进行追加,首先我们看文件创建阶段namenode主要做了什么事情(权限验证以及租约验证这些前面都已经有提到,下面的内容就会掠过这一部分) 

DfSOutputStream在实例构建时,通过Hadoop本身的RPC机制调用Namenode的create方法,最终这个调用PUSH到FSNameSystem的StartFileInternal方法,需要做权限验证,租约检验等工作,这个方法主要作用就是创建一个INodeFileUnderConstruction实例(上面已经提过,文件写入过程中都会有一个INodeFileUnderConstruction与这个文件对应),这个实例最后通过FSDirectoty的addNode()方法添加到文件系统目录数中,这个时候文件创建操作就算完成了重要的第一步,文件系统中已经有了这个文件的记录。 

下面就涉及到文件的写入操作(相当复杂的部分)  
这个时候就需要用到返回的DfSOutputStream对象。这部分太复杂了,我们先分析一些基本模块儿,逐步吃透这部分的实现。 

整个分布式文件系统中网络通讯部分分为两类: 
1.命令类调用(这部分通过HADOOP的RPC机制进行支持) 
2.流式数据传输(这部分通过HADOOP的流式数据传输协议支持)  
为了保证数据的正确性,hadoop在多个关键处理单元做了数据检验操作,在流式数据网络传输部分通过校验和保证数据传输正常。 
Client在DfSOutputStream对象调用write方法时,系统并不会马上把数据写入SOCKET中,而是逐个构建Package并将这些Package加入一个队列。 
在DfSOutputStream对象构建时,系统通过Hadoop本身的RPC机制调用Namenode的create方法后,会启动一个后台线程 streamer.start(); 这个线程的主要目的就是将上述的package队列写入SOCKET中。 
右图为DfSOutputStream的继承关系 
其中FSOutputSummer这个类其实质是一个decorator设计模式的实现,主要的目的就是在OutputStream的void write(byte b[], int off, int len)方法中增加一些功能,上文已经提过,文件数据传输的同时,系统会在传输的数据中增加检验和数据,系统收到数据后对数据进行校验,保证数据传输的正确性,但是用户在对文件输出流进行操作的时候并不需要关注校验和数据,用户只需要不断的调用write方法在目标文件中追加数据。 
注:我们通常可以使用继承来实现功能的拓展,如果这些需要拓展的功能的种类很繁多,那么势必生成很多子类,增加系统的复杂性,同时,使用继承实现功能拓展,我们必须可预见这些拓展功能,这些功能是编译时就确定了,是静态的.使用Decorator的理由是:这些功能需要由用户动态决定加入的方式和时机.Decorator提供了"即插即用"的方法,在运行期间决定何时增加何种功能. 
我们看一下FSOutputSummer中write(byte b[], int off, int len)的实现 
public synchronized void write(byte b[], int off, int len)  throws  IOException  { 
    if (off <0 || len <0 || off > b.length - len) { 
throw new ArrayIndexOutOfBoundsException();     

for (int n=0;n

}

可以看到不断调用write1方法,保证数据发送的完整性。 
那么write1方法又做了什么事情呢,write1将用户需要写入的数据流首先写到自己的BUFFER中,达到一定数量(基本是一个chunk的大小)后进行CheckSum方法调用得到一段数据的校验和,然后通过
writeChecksumChunk这个方法将数据以及该部分数据的校验和,按照一定格式要求一并写入Stream。 
writeChecksumChunk这个方法的主要作用就是将用户写入的数据以及该部分数据的校验和做为参数调用writeChunk()方法,这个方法是一个虚方法,真正的实现在DFSOutputStream这个类中,这也合情合理,本身FSOutputSummer这个类的作用仅仅是在输出流中增加校验和数据,至于数据是如何进行传输的是通过DFSOutputStream来实现的。 
那么接下来需要说明的就是DFSOutputStream的writeChunk这个方法了。   HDFS流式数据网络传输的基本单位有哪些呢? 
chunk->package->block 
我们上文已经提过:等用户写入的数据达到一定数量(基本是一个chunk的大小)后就会对这段数据取校验和。一定数量的chunk就会组成一个package,这个package就是最终进行网络传输的基本单元,datanode收到package后,将这些package组合起来最终得到一个block。 
我们接下来通过实际主要的代码了解这部分功能的实现:  
currentPacket这个对象初始化的时候就是null,第一次写入数据时这个判断成立 if (currentPacket == null) { 
currentPacket = new Packet(packetSize, chunksPerPacket, bytesCurBlock); ...  
//下面开始构建package包。 
//在package包中增加一个chunk,首先添加这个chunk所包含数据的checksum currentPacket.writeChecksum(checksum, 0, cklen); 
currentPacket.writeData(b, offset, len);  //然后添加这个chunk所包含的数据 currentPacket.numChunks++;  //增加这个package所包含的chunk个数 bytesCurBlock += len;  //当前已经写入的byte个数 // If packet is full, enqueue it for transmission 
//如果这个package已经达到一定的chunk数量,准备实际的传输操作 
if (currentPacket.numChunks == currentPacket.maxChunks || bytesCurBlock == blockSize) { ...... 
if (bytesCurBlock == blockSize)  //如果用户写入的数据,已经达到一个block缺省大小(64M) { 
//设置当前的package是某一个block的最后一个package currentPacket.lastPacketInBlock = true; //清除一些变量的值 bytesCurBlock = 0; lastFlushOffset = -1; } 
//这三段代码是关键的一部分代码,将已经构建完成的package写入一个dataQueue队列,由另一个线程(就是我们开始提到的:启动一个后台线程 streamer.start(); 这个线程的主要目的就是将上述的package队列写入(SOCKET中)从该队列中不断取出package,进行实际的网络传输 
dataQueue.addLast(currentPacket);    //产生event,进而通知并唤醒等待线程 
dataQueue.notifyAll();   //这一步也很重要,设置currentPacket 为空,表示这个package已经满了,需要new一个新的package继续接收用户后面进一步需要写入的数据。 
currentPacket = null; 
// If this was the first write after reopening a file, then 
// the above write filled up any partial chunk. Tell the summer to generate full // crc chunks from now on. if (appendChunk) { 
appendChunk = false; 
resetChecksumChunk(bytesPerChecksum); } 
int psize = Math.min((int) (blockSize - bytesCurBlock),writePacketSize); computePacketChunkSize(psize, bytesPerChecksum); }  
computePacketChunkSize这个方法的主要作用是计算两个参数: 1.chunksPerPacket 
接下来的package需要承载多少个chunk;因为最后一个package承载的chunk个数与文件大小也有关系。 2.packetSize 
接下来的package的大小。 
以上两个参数与判断是否需要new一个新的PACKAGE很有关系。  
private void computePacketChunkSize(int psize, int csize) { 
int chunkSize = csize + checksum.getChecksumSize(); 
int n = DataNode.PKT_HEADER_LEN + SIZE_OF_INTEGER; 
chunksPerPacket = Math.max((psize - n + chunkSize - 1) /chunkSize,1); packetSize = n + chunkSize * chunksPerPacket; if (LOG.isDebugEnabled()) } 
可以看到构建的package不断添加到dataQueue这个队列,streamer.start()这个线程从中弹出package进行实际网络传输操作。 
下面就涉及到比较复杂的网络传输协议部分。 我们先看一下这部分的流程: 
1.上面已经讲过,开始的一步就是客户端调用create方法,在namenode上的目录树中注册一个INodeFileUnderConstruction节点,并得到一个DfSOutputStream。 
2.用户得到这个outputStream后就可以进行写入操作,用户写入的数据就不断构建成package写入dataQueue这个队列。 
3.streamer.start()这个线程从dataQueue队列中取出package进行实际网络传输操作。 
下面的网络传输流程为关键流程: 

4.streamer是一个DataStreamer的实例,这是一个线程实例。大家知道HDFS中的文件数据会分成很多64M大小的block,所以在HDFS中保存文件数据第一步就是在namenode上申请一个特殊的blockID(当然还是通过RPC调用的方式)。

更多精彩内容请关注:http://bbs.superwu.cn 

关注超人学院微信二维码:




推荐阅读
  • Hadoop 源码学习笔记(4)Hdfs 数据读写流程分析
    Hdfs的数据模型在对读写流程进行分析之前,我们需要先对Hdfs的数据模型有一个简单的认知。数据模型如上图所示,在NameNode中有一个唯一的FSDirectory类负责维护文件 ... [详细]
  • mapreduce源码分析总结
    这篇文章总结的非常到位,故而转之一MapReduce概述MapReduce是一个用于大规模数据处理的分布式计算模型,它最初是由Google工程师设计并实现的ÿ ... [详细]
  • 本文介绍了深入浅出Linux设备驱动编程的重要性,以及两种加载和删除Linux内核模块的方法。通过一个内核模块的例子,展示了模块的编译和加载过程,并讨论了模块对内核大小的控制。深入理解Linux设备驱动编程对于开发者来说非常重要。 ... [详细]
  • This article discusses the efficiency of using char str[] and char *str and whether there is any reason to prefer one over the other. It explains the difference between the two and provides an example to illustrate their usage. ... [详细]
  • 全面介绍Windows内存管理机制及C++内存分配实例(四):内存映射文件
    本文旨在全面介绍Windows内存管理机制及C++内存分配实例中的内存映射文件。通过对内存映射文件的使用场合和与虚拟内存的区别进行解析,帮助读者更好地理解操作系统的内存管理机制。同时,本文还提供了相关章节的链接,方便读者深入学习Windows内存管理及C++内存分配实例的其他内容。 ... [详细]
  • 本文介绍了在sqoop1.4.*版本中,如何实现自定义分隔符的方法及步骤。通过修改sqoop生成的java文件,并重新编译,可以满足实际开发中对分隔符的需求。具体步骤包括修改java文件中的一行代码,重新编译所需的hadoop包等。详细步骤和编译方法在本文中都有详细说明。 ... [详细]
  • BZOJ1233 干草堆单调队列优化DP
    本文介绍了一个关于干草堆摆放的问题,通过使用单调队列来优化DP算法,求解最多可以叠几层干草堆。具体的解题思路和转移方程在文章中进行了详细说明,并给出了相应的代码示例。 ... [详细]
  • linux进阶50——无锁CAS
    1.概念比较并交换(compareandswap,CAS),是原⼦操作的⼀种,可⽤于在多线程编程中实现不被打断的数据交换操作࿰ ... [详细]
  •     这里使用自己编译的hadoop-2.7.0版本部署在windows上,记得几年前,部署hadoop需要借助于cygwin,还需要开启ssh服务,最近发现,原来不需要借助cy ... [详细]
  • Hadoop2.6.0 + 云centos +伪分布式只谈部署
    3.0.3玩不好,现将2.6.0tar.gz上传到usr,chmod-Rhadoop:hadophadoop-2.6.0,rm掉3.0.32.在etcp ... [详细]
  • 项目需要将音视频文件上传服务器,考虑并发要求高,通过七牛来实现。直接上代码usingQiniu.IO;usingQiniu.IO.Resumable;usingQiniu.RPC; ... [详细]
  • Flink使用java实现读取csv文件简单实例首先我们来看官方文档中给出的几种方法:首先我们来看官方文档中给出的几种方法:第一种:Da ... [详细]
  • MR程序的几种提交运行模式本地模型运行1在windows的eclipse里面直接运行main方法,就会将job提交给本地执行器localjobrunner执行-- ... [详细]
  • 伸缩性|发生_分布式文件系统设计,该从哪些方面考虑?
    篇首语:本文由编程笔记#小编为大家整理,主要介绍了分布式文件系统设计,该从哪些方面考虑?相关的知识,希望对你有一定的参考价值。点击上方关注“ ... [详细]
  • 我们在之前的文章中已经初步介绍了Cloudera。hadoop基础----hadoop实战(零)-----hadoop的平台版本选择从版本选择这篇文章中我们了解到除了hadoop官方版本外很多 ... [详细]
author-avatar
请让我来打酱油
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有