热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Hadoop源码学习——DistributedFileSystem、DFSClient、Namenode三者的协作(4)

我们先来看看Hadoop源码中对DFSClient的注释说明:*********************************************************DF

我们先来看看Hadoop源码中对DFS Client的注释说明:

/********************************************************
* DFSClient can connect to a Hadoop Filesystem and
* perform basic file tasks. It uses the ClientProtocol
* to communicate with a NameNode daemon, and connects
* directly to DataNodes to read/write block data.
*
* Hadoop DFS users should obtain an instance of
* DistributedFileSystem, which uses DFSClient to handle
* filesystem tasks.
********************************************************/

显然,DFSClient在DistributedFileSystem和NameNode和之间起到了桥梁的作用。hdfs用户需要获取一个DistributedFileSystem实例对象来处理文件系统的一些任务。而DistributedFileSystem实例对象又需要使用DFSClient来完成用户提出的任务需求。而DFSClient则又通过ClientProtocol来连接Namenode,以及直接连接Datanode来读取block数据。这就是它们三者的协作关系。
接下来,我们来看看它们具体的实现过程。接上一篇,ls方法里的第二个步骤,需要根据Path对象来获取FileSystem对象:

FileSystem srcFs = srcPath.getFileSystem(this.getConf());

srcPath变量是Path对象,接着我们来看看Path对象里的getFileSystem这个方法:

public FileSystem getFileSystem(Configuration conf) throws IOException {
return FileSystem.get(this.toUri(), conf);
}

马上,又获取到Path对象的uri格式,接着调用FileSystem对象的get方法。

String scheme = uri.getScheme();
String authority = uri.getAuthority();
if (scheme == null && authority == null) { // use default FS
return get(conf);
}
if (scheme != null && authority == null) { // no authority
URI defaultUri = getDefaultUri(conf);
if (scheme.equals(defaultUri.getScheme()) // if scheme matches default
&& defaultUri.getAuthority() != null) { // & default has authority
return get(defaultUri, conf); // return default
}
}

在FileSystem对象的get方法中会先解析uri的scheme和authority。scheme确定文件系统的类型,authority确定访问地址(ip)和端口(port)。如果scheme和authority都不为null,则正常获取FileSystem对象;如果scheme不为null,而authority为null,且scheme和默认的匹配,则根据默认的scheme和authority来创建FileSystem对象;若scheme和authority同为null,则也根据默认的scheme和authority来创建FileSystem对象。
在我们之前的例子里“-ls /test”,是scheme和authority同为null的情况,所以需要默认的scheme和authority来创建FileSystem对象。那么,我们来看看默认获取scheme和authority的过程。
跳转到FileSystem对象的get(Conf)方法:

public static FileSystem get(Configuration conf) throws IOException {
return get(getDefaultUri(conf), conf);
}

public static URI getDefaultUri(Configuration conf) {
return URI.create(fixName(conf.get(FS_DEFAULT_NAME_KEY, "file:///")));
}

其中,FS_DEFAULT_NAME_KEY=”fs.default.name“。conf.get(FS_DEFAULT_NAME_KEY, “file:///”)从配置文件里获取属性fs.default.name的值(这和我们对配置文件的配置有关,我们配置的是”hdfs://localhost“),如果获取为空,则设为”file:///”。所以我们可以预见,默认的uri的scheme是hdfs,默认的uri的authority是localhost。获取到默认的uri后,然后再返回上面的get方法重新解析uri。解析获取scheme和authority都不为空,于是继续执行。

String disableCacheName = String.format("fs.%s.impl.disable.cache", scheme);
if (conf.getBoolean(disableCacheName, false)) {
return createFileSystem(uri, conf);
}
return CACHE.get(uri, conf);

继续执行过程有一步是说,看看配置中有没有配置fs.hdfs.impl.disable.cache这个参数,如果设置里这个参数为true,则不在缓存中创建文件系统。如果设置了false或者没有设置,则在缓存中创建文件系统。
因为,我们是没有设置这个参数的,所以接下来会在缓存中创建文件系统。缓存的实现具体实现我先不看,但是个值得关注的点。需要注意一点的是,Cache是FileSystem的内部静态类。
我们继续看到Cache里的get方法:

FileSystem get(URI uri, Configuration conf) throws IOException{
Key key = new Key(uri, conf);
FileSystem fs = null;
synchronized (this) {
fs = map.get(key);
}
if (fs != null) {
return fs;
}

需要指出的是,这里的map是Cache类的一个成员变量,map的key是封装了uri和conf的一个Key对象(是Cache类的一个内部静态类),value是FileSystem类型。
首先,get方法将uri和conf封装成一个key值,然后企图从map中根据key值(以多线程并发)来获取相应的FileSystem。如果获取到了,则直接返回从map中获取的FileSystem,否则创建新的FileSystem:

fs = createFileSystem(uri, conf); // 创建文件系统

private static FileSystem createFileSystem(URI uri, Configuration conf
) throws IOException {
Class clazz = conf.getClass("fs." + uri.getScheme() + ".impl", null);
LOG.debug("Creating filesystem for " + uri);
if (clazz == null) {
throw new IOException("No FileSystem for scheme: " + uri.getScheme());
}
FileSystem fs = (FileSystem)ReflectionUtils.newInstance(clazz, conf);
fs.initialize(uri, conf);
return fs;
}

我们可以看到,在FileSystem类的createFileSystem方法中,先用scheme在conf中找到对应的FileSystem的类类型(例如,scheme:hdfs对应了DistributedFileSystem),然后利用Java反射机制来实例化该类类型,即创建一个具体的FileSystem对象。
接下来,还有非常重要的一步,那就是DistributedFileSystem.initialize(uri, conf)。
这里,要提两点。第二点非常重要!
一、我自己目前看到的,hadoop的源码中很多对类的构造就分两步,先是构造函数初始化一些内部变量(其实,DistributedFileSystem类的构造函数已经不做任何事了),然后再用initialize方法初始化一些变量。这肯定有它的设计目的在,暂不深究,先知道有这么个意思就好。
二、这里的initialize方法是DistributedFileSystem类的方法,不是FileSystem类的。之前提到过,FileSystem是一个抽象类,DistributedFileSystem继承了它。所以,这里fs是父抽象类对子类的一个引用。如果子类没有重写父类的方法,则调用父类方法;如果子类重写了父类的方法,则默认调用子类重写的方法。另外加一句,如果是子类新添加的方法,那么这种方法对该引用是不可见的。
而这里的initialize方法就属于子类重写了父方法的那种。我一开始没发现(java基础薄弱),然后在FileSystem类里的initialize方法里打转转。
好,言归正传,继续看DistributedFileSystem类的initialize方法做了些什么。

public void initialize(URI uri, Configuration conf) throws IOException {
super.initialize(uri, conf);
setConf(conf);
String host = uri.getHost();
if (host == null) {
throw new IOException("Incomplete HDFS URI, no host: "+ uri);
}
InetSocketAddress namenode = NameNode.getAddress(uri.getAuthority());
this.dfs = new DFSClient(namenode, conf, statistics);
this.uri = URI.create(uri.getScheme()+"://"+uri.getAuthority());
this.workingDir = getHomeDirectory();
}

首先,它调用了super.initialize(uri, conf)方法,其实就是FileSystem.initialize(uri, conf),也就是我掉过的坑,现在知道为什么掉入了那个坑就出不来了吧。
然后,设置了配置setConf(conf)。
接着,创建了一个InetSocketAddress对象namenode(一个由hostname、IP、port构成的对象,并实现了序列化),用来socket通信,还没深入了解。
以及,初始化uri、workingDir,以及我们今天的另一个主角DFSClient。
我们接下来就来看看DFSClient创建的过程(这是DistributedFileSystem类的initialize过程的一部分,我们不要忘了我们的目的,以及我们开篇提到过的,DistributedFileSystem需要使用DFSCclient类来完成在文件系统上的任务。)

public DFSClient(InetSocketAddress nameNodeAddr, Configuration conf,
FileSystem.Statistics stats)
throws IOException {
this(nameNodeAddr, null, conf, stats);
}

提一句,FileSystem.Statistics对象记录了整个文件系统的一些信息,包括读/写字节数、读/写次数等等。

/**
* Create a new DFSClient connected to the given nameNodeAddr or rpcNamenode.
* Exactly one of nameNodeAddr or rpcNamenode must be null.
* 创建一个DFSClient用来连接到给定的nameNodeAddr或者rpcNameNode, 两者必有一个为null(为什么呢?)
*/
DFSClient(InetSocketAddress nameNodeAddr, ClientProtocol rpcNamenode,
Configuration conf, FileSystem.Statistics stats)
throws IOException {
this.cOnf= conf;
this.stats = stats;
this.nnAddress = nameNodeAddr;
this.socketTimeout = conf.getInt("dfs.socket.timeout",
HdfsConstants.READ_TIMEOUT);
this.datanodeWriteTimeout = conf.getInt("dfs.datanode.socket.write.timeout",
HdfsConstants.WRITE_TIMEOUT);
this.timeoutValue = this.socketTimeout;
this.socketFactory = NetUtils.getSocketFactory(conf, ClientProtocol.class);
// dfs.write.packet.size is an internal config variable
this.writePacketSize = conf.getInt("dfs.write.packet.size", 64*1024);
this.maxBlockAcquireFailures = getMaxBlockAcquireFailures(conf);
this.hdfsTimeout = Client.getTimeout(conf);
ugi = UserGroupInformation.getCurrentUser();
this.authority = nameNodeAddr == null? "null":
nameNodeAddr.getHostName() + ":" + nameNodeAddr.getPort();
String taskId = conf.get("mapred.task.id", "NONMAPREDUCE");
this.clientName = "DFSClient_" + taskId + "_" +
r.nextInt() + "_" + Thread.currentThread().getId();
defaultBlockSize = conf.getLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
defaultReplication = (short) conf.getInt("dfs.replication", 3);```
DFSClient初始化了很多变量,有配置信息conf、文件系统的统计信息stats、Namenode的socket地址、socket连接超时上限、datanode写超时上限、socket工厂、写数据包的大小、block获取重复次数、默认块大小、默认块备份个数等等。
**这里我先留一个问题,hadoop源码的DFSClient类的注释里说明:创建一个DFSClient用来连接到给定的nameNodeAddr或者rpcNameNode, 两者必有一个为null。为什么两者必有一个是null?这个我不明白。这应该和hadoop ipc机制有关。也是我接下来需要关注的重点。**
接下来,DFSClient的创建来到一个**非常重要的步骤**:

if (nameNodeAddr != null && rpcNamenode == null) {
this.rpcNamenode = createRPCNamenode(nameNodeAddr, conf, ugi);
this.namenode = createNamenode(this.rpcNamenode, conf);
} else if (nameNodeAddr == null && rpcNamenode != null) {
//This case is used for testing.
this.namenode = this.rpcNamenode = rpcNamenode;
} else {
throw new IllegalArgumentException(
“Expecting exactly one of nameNodeAddr and rpcNamenode being null: “
+ “nameNodeAddr=” + nameNodeAddr + “, rpcNamenode=” + rpcNamenode);
}

在nameNodeAddr不为null,而rpcNamenode为null的情况下(例如,我在调试代码事,nameNodeAddr=localhost/127.0.0.1:8020,rpcNamenode为null),会创建RPCNamenode对象(createRPCNamenode)和Namenode对象(createNamenode),用来建立到Namenode节点的IPC连接。这里就涉及hadoop ipc机制的核心,我现在还不理解,需要等学习之后再来讲。

// read directly from the block file if configured.
this.shortCircuitLocalReads = conf.getBoolean(
DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY,
DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_DEFAULT);
if (LOG.isDebugEnabled()) {
LOG.debug(“Short circuit read is ” + shortCircuitLocalReads);
}
this.cOnnectToDnViaHostname= conf.getBoolean(
DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME,
DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT);
if (LOG.isDebugEnabled()) {
LOG.debug(“Connect to datanode via hostname is ” + connectToDnViaHostname);
}
String localInterfaces[] =
conf.getStrings(DFSConfigKeys.DFS_CLIENT_LOCAL_INTERFACES);
if (null == localInterfaces) {
localInterfaces = new String[0];
}
this.localInterfaceAddrs = getLocalInterfaceAddrs(localInterfaces);
if (LOG.isDebugEnabled() && 0 != localInterfaces.length) {
LOG.debug(“Using local interfaces [” +
StringUtils.join(“,”,localInterfaces)+ “] with addresses [” +
StringUtils.join(“,”,localInterfaceAddrs) + “]”);
}

接着,DFSClient又设置了一些配置参数。好吧,这些参数意思也不是很明确。shortCircuitLocalReads的意思是说是否支持本地读取;connectToDnViaHostname是说是否支持用hostname来连接DataNode。
至此,DFSClient就创建好了(其实我们还没讲它的核心T^T)。那么,DistributedFileSystem的initialize过程也Ok了。接下来要回到哪儿了呢?
就要返回到FsShell里ls第二个步骤(获取FileSystem)完了之后的那个步骤(在FileSystem上获取文件信息)。
第三个步骤,我们简略的讲,但我们需要明确一点,看下面。
获取文件信息这个过程会从FsShell类通过方法调用到FileSystem类,再到DistributedFileSystem类,再到DFSClient类里的getFileInfo方法。这个getFileInfo方法又调用了namenode.getFileInfo方法。**注意,namenode是ClientProtocol类型的,是一个接口,没有任何方法的实现**

// DFSClient类里的getFileInfo
public HdfsFileStatus getFileInfo(String src) throws IOException {
checkOpen(); // 检查路径是否为空或null
try {
System.out.println(“getFileInfo’s src ” + src);
return namenode.getFileInfo(src);
} catch(RemoteException re) {
throw re.unwrapRemoteException(AccessControlException.class);
}
}

那么,namenode的getFileInfo方法又怎能获取到文件信息呢?
我们想到一点,这个namenode就是之前我们创建DFSClient是初始化的namenode变量,它建立了DFSClient和Namenode节点之间的连接。那么,会不会因为通信的关系,其实是调用了Namenode里的getFileInfo方法呢?(其实这样想还有另外一个原因,那就是这个变量的名字)
然后我们看到,在Namenode类里,果然有getFileInfo方法,而且有具体的实现:

public HdfsFileStatus getFileInfo(String src) throws IOException {
System.out.println(“call this function NameNode->getFileInfo()”);
myMetrics.incrNumFileInfoOps();
return namesystem.getFileInfo(src);
}

而且,我们通过跟踪调用,确实证明了调用Namenode的getFileInfo这个方法。我们通过这个方法继续跟踪下去,就能看到获取文件信息的具体操作:

/**

  • Create FileStatus by file INode
    */
    private static HdfsFileStatus createFileStatus(byte[] path, INode node) {
    // length is zero for directories
    return new HdfsFileStatus(
    node.isDirectory() ? 0 : ((INodeFile)node).computeContentSummary().getLength(),
    node.isDirectory(),
    node.isDirectory() ? 0 : ((INodeFile)node).getReplication(),
    node.isDirectory() ? 0 : ((INodeFile)node).getPreferredBlockSize(),
    node.getModificationTime(),
    node.getAccessTime(),
    node.getFsPermission(),
    node.getUserName(),
    node.getGroupName(),
    path);
    }

获取的文件信息最终以FileStatus对象的形式返回给FsShell里的ls方法,ls再从FileStatus对象中获取具体的文件信息,将其打印出来:
上面所说,我们需要明确的一点就是,**DFSClient会把方法的调用发给Namenode节点来执行,且执行的是Namenode自己的方法。**提一点,我们发现这里的方法接口是一样的,因为Namnode也实现了ClientProtocol接口。
###总结一下###
* DFSClient在DistributedFileSystem和NameNode和之间起到了桥梁的作用
* 创建DistributedFileSystem类用到了Java反射机制
* DistributedFileSystem.initialize方法初始化了uri、workingDir变量,以及非常重要的创建DFSClient对象
* 创建DFSClient对象除了初始化一些变量外,还建立了和Namenode节点的连接

推荐阅读
  • 如何自行分析定位SAP BSP错误
    The“BSPtag”Imentionedintheblogtitlemeansforexamplethetagchtmlb:configCelleratorbelowwhichi ... [详细]
  • 本文分享了一个关于在C#中使用异步代码的问题,作者在控制台中运行时代码正常工作,但在Windows窗体中却无法正常工作。作者尝试搜索局域网上的主机,但在窗体中计数器没有减少。文章提供了相关的代码和解决思路。 ... [详细]
  • 本文详细介绍了PHP中与URL处理相关的三个函数:http_build_query、parse_str和查询字符串的解析。通过示例和语法说明,讲解了这些函数的使用方法和作用,帮助读者更好地理解和应用。 ... [详细]
  • 本文介绍了Oracle数据库中tnsnames.ora文件的作用和配置方法。tnsnames.ora文件在数据库启动过程中会被读取,用于解析LOCAL_LISTENER,并且与侦听无关。文章还提供了配置LOCAL_LISTENER和1522端口的示例,并展示了listener.ora文件的内容。 ... [详细]
  • Spring特性实现接口多类的动态调用详解
    本文详细介绍了如何使用Spring特性实现接口多类的动态调用。通过对Spring IoC容器的基础类BeanFactory和ApplicationContext的介绍,以及getBeansOfType方法的应用,解决了在实际工作中遇到的接口及多个实现类的问题。同时,文章还提到了SPI使用的不便之处,并介绍了借助ApplicationContext实现需求的方法。阅读本文,你将了解到Spring特性的实现原理和实际应用方式。 ... [详细]
  • http:my.oschina.netleejun2005blog136820刚看到群里又有同学在说HTTP协议下的Get请求参数长度是有大小限制的,最大不能超过XX ... [详细]
  • 《数据结构》学习笔记3——串匹配算法性能评估
    本文主要讨论串匹配算法的性能评估,包括模式匹配、字符种类数量、算法复杂度等内容。通过借助C++中的头文件和库,可以实现对串的匹配操作。其中蛮力算法的复杂度为O(m*n),通过随机取出长度为m的子串作为模式P,在文本T中进行匹配,统计平均复杂度。对于成功和失败的匹配分别进行测试,分析其平均复杂度。详情请参考相关学习资源。 ... [详细]
  • 自动轮播,反转播放的ViewPagerAdapter的使用方法和效果展示
    本文介绍了如何使用自动轮播、反转播放的ViewPagerAdapter,并展示了其效果。该ViewPagerAdapter支持无限循环、触摸暂停、切换缩放等功能。同时提供了使用GIF.gif的示例和github地址。通过LoopFragmentPagerAdapter类的getActualCount、getActualItem和getActualPagerTitle方法可以实现自定义的循环效果和标题展示。 ... [详细]
  • 本文介绍了iOS数据库Sqlite的SQL语句分类和常见约束关键字。SQL语句分为DDL、DML和DQL三种类型,其中DDL语句用于定义、删除和修改数据表,关键字包括create、drop和alter。常见约束关键字包括if not exists、if exists、primary key、autoincrement、not null和default。此外,还介绍了常见的数据库数据类型,包括integer、text和real。 ... [详细]
  • 本文讨论了在openwrt-17.01版本中,mt7628设备上初始化启动时eth0的mac地址总是随机生成的问题。每次随机生成的eth0的mac地址都会写到/sys/class/net/eth0/address目录下,而openwrt-17.01原版的SDK会根据随机生成的eth0的mac地址再生成eth0.1、eth0.2等,生成后的mac地址会保存在/etc/config/network下。 ... [详细]
  • 本文介绍了一个适用于PHP应用快速接入TRX和TRC20数字资产的开发包,该开发包支持使用自有Tron区块链节点的应用场景,也支持基于Tron官方公共API服务的轻量级部署场景。提供的功能包括生成地址、验证地址、查询余额、交易转账、查询最新区块和查询交易信息等。详细信息可参考tron-php的Github地址:https://github.com/Fenguoz/tron-php。 ... [详细]
  • SpringBoot整合SpringSecurity+JWT实现单点登录
    SpringBoot整合SpringSecurity+JWT实现单点登录,Go语言社区,Golang程序员人脉社 ... [详细]
  • 本文详细介绍了使用C#实现Word模版打印的方案。包括添加COM引用、新建Word操作类、开启Word进程、加载模版文件等步骤。通过该方案可以实现C#对Word文档的打印功能。 ... [详细]
  • 本文介绍了安全性要求高的真正密码随机数生成器的概念和原理。首先解释了统计学意义上的伪随机数和真随机数的区别,以及伪随机数在密码学安全中的应用。然后讨论了真随机数的定义和产生方法,并指出了实际情况下真随机数的不可预测性和复杂性。最后介绍了随机数生成器的概念和方法。 ... [详细]
  • 本文介绍了一种求解最小权匹配问题的方法,使用了拆点和KM算法。通过将机器拆成多个点,表示加工的顺序,然后使用KM算法求解最小权匹配,得到最优解。文章给出了具体的代码实现,并提供了一篇题解作为参考。 ... [详细]
author-avatar
手机用户2502898521
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有