热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Hadoop源码学习——DistributedFileSystem、DFSClient、Namenode三者的协作(4)

我们先来看看Hadoop源码中对DFSClient的注释说明:*********************************************************DF

我们先来看看Hadoop源码中对DFS Client的注释说明:

/********************************************************
* DFSClient can connect to a Hadoop Filesystem and
* perform basic file tasks. It uses the ClientProtocol
* to communicate with a NameNode daemon, and connects
* directly to DataNodes to read/write block data.
*
* Hadoop DFS users should obtain an instance of
* DistributedFileSystem, which uses DFSClient to handle
* filesystem tasks.
********************************************************/

显然,DFSClient在DistributedFileSystem和NameNode和之间起到了桥梁的作用。hdfs用户需要获取一个DistributedFileSystem实例对象来处理文件系统的一些任务。而DistributedFileSystem实例对象又需要使用DFSClient来完成用户提出的任务需求。而DFSClient则又通过ClientProtocol来连接Namenode,以及直接连接Datanode来读取block数据。这就是它们三者的协作关系。
接下来,我们来看看它们具体的实现过程。接上一篇,ls方法里的第二个步骤,需要根据Path对象来获取FileSystem对象:

FileSystem srcFs = srcPath.getFileSystem(this.getConf());

srcPath变量是Path对象,接着我们来看看Path对象里的getFileSystem这个方法:

public FileSystem getFileSystem(Configuration conf) throws IOException {
return FileSystem.get(this.toUri(), conf);
}

马上,又获取到Path对象的uri格式,接着调用FileSystem对象的get方法。

String scheme = uri.getScheme();
String authority = uri.getAuthority();
if (scheme == null && authority == null) { // use default FS
return get(conf);
}
if (scheme != null && authority == null) { // no authority
URI defaultUri = getDefaultUri(conf);
if (scheme.equals(defaultUri.getScheme()) // if scheme matches default
&& defaultUri.getAuthority() != null) { // & default has authority
return get(defaultUri, conf); // return default
}
}

在FileSystem对象的get方法中会先解析uri的scheme和authority。scheme确定文件系统的类型,authority确定访问地址(ip)和端口(port)。如果scheme和authority都不为null,则正常获取FileSystem对象;如果scheme不为null,而authority为null,且scheme和默认的匹配,则根据默认的scheme和authority来创建FileSystem对象;若scheme和authority同为null,则也根据默认的scheme和authority来创建FileSystem对象。
在我们之前的例子里“-ls /test”,是scheme和authority同为null的情况,所以需要默认的scheme和authority来创建FileSystem对象。那么,我们来看看默认获取scheme和authority的过程。
跳转到FileSystem对象的get(Conf)方法:

public static FileSystem get(Configuration conf) throws IOException {
return get(getDefaultUri(conf), conf);
}

public static URI getDefaultUri(Configuration conf) {
return URI.create(fixName(conf.get(FS_DEFAULT_NAME_KEY, "file:///")));
}

其中,FS_DEFAULT_NAME_KEY=”fs.default.name“。conf.get(FS_DEFAULT_NAME_KEY, “file:///”)从配置文件里获取属性fs.default.name的值(这和我们对配置文件的配置有关,我们配置的是”hdfs://localhost“),如果获取为空,则设为”file:///”。所以我们可以预见,默认的uri的scheme是hdfs,默认的uri的authority是localhost。获取到默认的uri后,然后再返回上面的get方法重新解析uri。解析获取scheme和authority都不为空,于是继续执行。

String disableCacheName = String.format("fs.%s.impl.disable.cache", scheme);
if (conf.getBoolean(disableCacheName, false)) {
return createFileSystem(uri, conf);
}
return CACHE.get(uri, conf);

继续执行过程有一步是说,看看配置中有没有配置fs.hdfs.impl.disable.cache这个参数,如果设置里这个参数为true,则不在缓存中创建文件系统。如果设置了false或者没有设置,则在缓存中创建文件系统。
因为,我们是没有设置这个参数的,所以接下来会在缓存中创建文件系统。缓存的实现具体实现我先不看,但是个值得关注的点。需要注意一点的是,Cache是FileSystem的内部静态类。
我们继续看到Cache里的get方法:

FileSystem get(URI uri, Configuration conf) throws IOException{
Key key = new Key(uri, conf);
FileSystem fs = null;
synchronized (this) {
fs = map.get(key);
}
if (fs != null) {
return fs;
}

需要指出的是,这里的map是Cache类的一个成员变量,map的key是封装了uri和conf的一个Key对象(是Cache类的一个内部静态类),value是FileSystem类型。
首先,get方法将uri和conf封装成一个key值,然后企图从map中根据key值(以多线程并发)来获取相应的FileSystem。如果获取到了,则直接返回从map中获取的FileSystem,否则创建新的FileSystem:

fs = createFileSystem(uri, conf); // 创建文件系统

private static FileSystem createFileSystem(URI uri, Configuration conf
) throws IOException {
Class clazz = conf.getClass("fs." + uri.getScheme() + ".impl", null);
LOG.debug("Creating filesystem for " + uri);
if (clazz == null) {
throw new IOException("No FileSystem for scheme: " + uri.getScheme());
}
FileSystem fs = (FileSystem)ReflectionUtils.newInstance(clazz, conf);
fs.initialize(uri, conf);
return fs;
}

我们可以看到,在FileSystem类的createFileSystem方法中,先用scheme在conf中找到对应的FileSystem的类类型(例如,scheme:hdfs对应了DistributedFileSystem),然后利用Java反射机制来实例化该类类型,即创建一个具体的FileSystem对象。
接下来,还有非常重要的一步,那就是DistributedFileSystem.initialize(uri, conf)。
这里,要提两点。第二点非常重要!
一、我自己目前看到的,hadoop的源码中很多对类的构造就分两步,先是构造函数初始化一些内部变量(其实,DistributedFileSystem类的构造函数已经不做任何事了),然后再用initialize方法初始化一些变量。这肯定有它的设计目的在,暂不深究,先知道有这么个意思就好。
二、这里的initialize方法是DistributedFileSystem类的方法,不是FileSystem类的。之前提到过,FileSystem是一个抽象类,DistributedFileSystem继承了它。所以,这里fs是父抽象类对子类的一个引用。如果子类没有重写父类的方法,则调用父类方法;如果子类重写了父类的方法,则默认调用子类重写的方法。另外加一句,如果是子类新添加的方法,那么这种方法对该引用是不可见的。
而这里的initialize方法就属于子类重写了父方法的那种。我一开始没发现(java基础薄弱),然后在FileSystem类里的initialize方法里打转转。
好,言归正传,继续看DistributedFileSystem类的initialize方法做了些什么。

public void initialize(URI uri, Configuration conf) throws IOException {
super.initialize(uri, conf);
setConf(conf);
String host = uri.getHost();
if (host == null) {
throw new IOException("Incomplete HDFS URI, no host: "+ uri);
}
InetSocketAddress namenode = NameNode.getAddress(uri.getAuthority());
this.dfs = new DFSClient(namenode, conf, statistics);
this.uri = URI.create(uri.getScheme()+"://"+uri.getAuthority());
this.workingDir = getHomeDirectory();
}

首先,它调用了super.initialize(uri, conf)方法,其实就是FileSystem.initialize(uri, conf),也就是我掉过的坑,现在知道为什么掉入了那个坑就出不来了吧。
然后,设置了配置setConf(conf)。
接着,创建了一个InetSocketAddress对象namenode(一个由hostname、IP、port构成的对象,并实现了序列化),用来socket通信,还没深入了解。
以及,初始化uri、workingDir,以及我们今天的另一个主角DFSClient。
我们接下来就来看看DFSClient创建的过程(这是DistributedFileSystem类的initialize过程的一部分,我们不要忘了我们的目的,以及我们开篇提到过的,DistributedFileSystem需要使用DFSCclient类来完成在文件系统上的任务。)

public DFSClient(InetSocketAddress nameNodeAddr, Configuration conf,
FileSystem.Statistics stats)
throws IOException {
this(nameNodeAddr, null, conf, stats);
}

提一句,FileSystem.Statistics对象记录了整个文件系统的一些信息,包括读/写字节数、读/写次数等等。

/**
* Create a new DFSClient connected to the given nameNodeAddr or rpcNamenode.
* Exactly one of nameNodeAddr or rpcNamenode must be null.
* 创建一个DFSClient用来连接到给定的nameNodeAddr或者rpcNameNode, 两者必有一个为null(为什么呢?)
*/
DFSClient(InetSocketAddress nameNodeAddr, ClientProtocol rpcNamenode,
Configuration conf, FileSystem.Statistics stats)
throws IOException {
this.cOnf= conf;
this.stats = stats;
this.nnAddress = nameNodeAddr;
this.socketTimeout = conf.getInt("dfs.socket.timeout",
HdfsConstants.READ_TIMEOUT);
this.datanodeWriteTimeout = conf.getInt("dfs.datanode.socket.write.timeout",
HdfsConstants.WRITE_TIMEOUT);
this.timeoutValue = this.socketTimeout;
this.socketFactory = NetUtils.getSocketFactory(conf, ClientProtocol.class);
// dfs.write.packet.size is an internal config variable
this.writePacketSize = conf.getInt("dfs.write.packet.size", 64*1024);
this.maxBlockAcquireFailures = getMaxBlockAcquireFailures(conf);
this.hdfsTimeout = Client.getTimeout(conf);
ugi = UserGroupInformation.getCurrentUser();
this.authority = nameNodeAddr == null? "null":
nameNodeAddr.getHostName() + ":" + nameNodeAddr.getPort();
String taskId = conf.get("mapred.task.id", "NONMAPREDUCE");
this.clientName = "DFSClient_" + taskId + "_" +
r.nextInt() + "_" + Thread.currentThread().getId();
defaultBlockSize = conf.getLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
defaultReplication = (short) conf.getInt("dfs.replication", 3);```
DFSClient初始化了很多变量,有配置信息conf、文件系统的统计信息stats、Namenode的socket地址、socket连接超时上限、datanode写超时上限、socket工厂、写数据包的大小、block获取重复次数、默认块大小、默认块备份个数等等。
**这里我先留一个问题,hadoop源码的DFSClient类的注释里说明:创建一个DFSClient用来连接到给定的nameNodeAddr或者rpcNameNode, 两者必有一个为null。为什么两者必有一个是null?这个我不明白。这应该和hadoop ipc机制有关。也是我接下来需要关注的重点。**
接下来,DFSClient的创建来到一个**非常重要的步骤**:

if (nameNodeAddr != null && rpcNamenode == null) {
this.rpcNamenode = createRPCNamenode(nameNodeAddr, conf, ugi);
this.namenode = createNamenode(this.rpcNamenode, conf);
} else if (nameNodeAddr == null && rpcNamenode != null) {
//This case is used for testing.
this.namenode = this.rpcNamenode = rpcNamenode;
} else {
throw new IllegalArgumentException(
“Expecting exactly one of nameNodeAddr and rpcNamenode being null: “
+ “nameNodeAddr=” + nameNodeAddr + “, rpcNamenode=” + rpcNamenode);
}

在nameNodeAddr不为null,而rpcNamenode为null的情况下(例如,我在调试代码事,nameNodeAddr=localhost/127.0.0.1:8020,rpcNamenode为null),会创建RPCNamenode对象(createRPCNamenode)和Namenode对象(createNamenode),用来建立到Namenode节点的IPC连接。这里就涉及hadoop ipc机制的核心,我现在还不理解,需要等学习之后再来讲。

// read directly from the block file if configured.
this.shortCircuitLocalReads = conf.getBoolean(
DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY,
DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_DEFAULT);
if (LOG.isDebugEnabled()) {
LOG.debug(“Short circuit read is ” + shortCircuitLocalReads);
}
this.cOnnectToDnViaHostname= conf.getBoolean(
DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME,
DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT);
if (LOG.isDebugEnabled()) {
LOG.debug(“Connect to datanode via hostname is ” + connectToDnViaHostname);
}
String localInterfaces[] =
conf.getStrings(DFSConfigKeys.DFS_CLIENT_LOCAL_INTERFACES);
if (null == localInterfaces) {
localInterfaces = new String[0];
}
this.localInterfaceAddrs = getLocalInterfaceAddrs(localInterfaces);
if (LOG.isDebugEnabled() && 0 != localInterfaces.length) {
LOG.debug(“Using local interfaces [” +
StringUtils.join(“,”,localInterfaces)+ “] with addresses [” +
StringUtils.join(“,”,localInterfaceAddrs) + “]”);
}

接着,DFSClient又设置了一些配置参数。好吧,这些参数意思也不是很明确。shortCircuitLocalReads的意思是说是否支持本地读取;connectToDnViaHostname是说是否支持用hostname来连接DataNode。
至此,DFSClient就创建好了(其实我们还没讲它的核心T^T)。那么,DistributedFileSystem的initialize过程也Ok了。接下来要回到哪儿了呢?
就要返回到FsShell里ls第二个步骤(获取FileSystem)完了之后的那个步骤(在FileSystem上获取文件信息)。
第三个步骤,我们简略的讲,但我们需要明确一点,看下面。
获取文件信息这个过程会从FsShell类通过方法调用到FileSystem类,再到DistributedFileSystem类,再到DFSClient类里的getFileInfo方法。这个getFileInfo方法又调用了namenode.getFileInfo方法。**注意,namenode是ClientProtocol类型的,是一个接口,没有任何方法的实现**

// DFSClient类里的getFileInfo
public HdfsFileStatus getFileInfo(String src) throws IOException {
checkOpen(); // 检查路径是否为空或null
try {
System.out.println(“getFileInfo’s src ” + src);
return namenode.getFileInfo(src);
} catch(RemoteException re) {
throw re.unwrapRemoteException(AccessControlException.class);
}
}

那么,namenode的getFileInfo方法又怎能获取到文件信息呢?
我们想到一点,这个namenode就是之前我们创建DFSClient是初始化的namenode变量,它建立了DFSClient和Namenode节点之间的连接。那么,会不会因为通信的关系,其实是调用了Namenode里的getFileInfo方法呢?(其实这样想还有另外一个原因,那就是这个变量的名字)
然后我们看到,在Namenode类里,果然有getFileInfo方法,而且有具体的实现:

public HdfsFileStatus getFileInfo(String src) throws IOException {
System.out.println(“call this function NameNode->getFileInfo()”);
myMetrics.incrNumFileInfoOps();
return namesystem.getFileInfo(src);
}

而且,我们通过跟踪调用,确实证明了调用Namenode的getFileInfo这个方法。我们通过这个方法继续跟踪下去,就能看到获取文件信息的具体操作:

/**

  • Create FileStatus by file INode
    */
    private static HdfsFileStatus createFileStatus(byte[] path, INode node) {
    // length is zero for directories
    return new HdfsFileStatus(
    node.isDirectory() ? 0 : ((INodeFile)node).computeContentSummary().getLength(),
    node.isDirectory(),
    node.isDirectory() ? 0 : ((INodeFile)node).getReplication(),
    node.isDirectory() ? 0 : ((INodeFile)node).getPreferredBlockSize(),
    node.getModificationTime(),
    node.getAccessTime(),
    node.getFsPermission(),
    node.getUserName(),
    node.getGroupName(),
    path);
    }

获取的文件信息最终以FileStatus对象的形式返回给FsShell里的ls方法,ls再从FileStatus对象中获取具体的文件信息,将其打印出来:
上面所说,我们需要明确的一点就是,**DFSClient会把方法的调用发给Namenode节点来执行,且执行的是Namenode自己的方法。**提一点,我们发现这里的方法接口是一样的,因为Namnode也实现了ClientProtocol接口。
###总结一下###
* DFSClient在DistributedFileSystem和NameNode和之间起到了桥梁的作用
* 创建DistributedFileSystem类用到了Java反射机制
* DistributedFileSystem.initialize方法初始化了uri、workingDir变量,以及非常重要的创建DFSClient对象
* 创建DFSClient对象除了初始化一些变量外,还建立了和Namenode节点的连接

推荐阅读
  • 本文详细介绍了 PHP 中对象的生命周期、内存管理和魔术方法的使用,包括对象的自动销毁、析构函数的作用以及各种魔术方法的具体应用场景。 ... [详细]
  • 本文介绍如何使用 Python 的 DOM 和 SAX 方法解析 XML 文件,并通过示例展示了如何动态创建数据库表和处理大量数据的实时插入。 ... [详细]
  • 本文详细介绍了 com.apollographql.apollo.api.internal.Optional 类中的 orNull() 方法,并提供了多个实际代码示例,帮助开发者更好地理解和使用该方法。 ... [详细]
  • 在OpenShift上部署基于MongoDB和Node.js的多层应用程序
    本文档详细介绍了如何在OpenShift 4.x环境中部署一个包含MongoDB数据库和Node.js后端及前端的多层应用程序。通过逐步指导,读者可以轻松完成整个部署过程。 ... [详细]
  • 如果应用程序经常播放密集、急促而又短暂的音效(如游戏音效)那么使用MediaPlayer显得有些不太适合了。因为MediaPlayer存在如下缺点:1)延时时间较长,且资源占用率高 ... [详细]
  • Spring – Bean Life Cycle
    Spring – Bean Life Cycle ... [详细]
  • Spring Boot 中配置全局文件上传路径并实现文件上传功能
    本文介绍如何在 Spring Boot 项目中配置全局文件上传路径,并通过读取配置项实现文件上传功能。通过这种方式,可以更好地管理和维护文件路径。 ... [详细]
  • 解决Only fullscreen opaque activities can request orientation错误的方法
    本文介绍了在使用PictureSelectorLight第三方框架时遇到的Only fullscreen opaque activities can request orientation错误,并提供了一种有效的解决方案。 ... [详细]
  • 普通树(每个节点可以有任意数量的子节点)级序遍历 ... [详细]
  • 兆芯X86 CPU架构的演进与现状(国产CPU系列)
    本文详细介绍了兆芯X86 CPU架构的发展历程,从公司成立背景到关键技术授权,再到具体芯片架构的演进,全面解析了兆芯在国产CPU领域的贡献与挑战。 ... [详细]
  • [转]doc,ppt,xls文件格式转PDF格式http:blog.csdn.netlee353086articledetails7920355确实好用。需要注意的是#import ... [详细]
  • 重要知识点有:函数参数默许值、盈余参数、扩大运算符、new.target属性、块级函数、箭头函数以及尾挪用优化《深切明白ES6》笔记目次函数的默许参数在ES5中,我们给函数传参数, ... [详细]
  • 深入解析 Lifecycle 的实现原理
    本文将详细介绍 Android Jetpack 中 Lifecycle 组件的实现原理,帮助开发者更好地理解和使用 Lifecycle,避免常见的内存泄漏问题。 ... [详细]
  • 在分析和解决 Keepalived VIP 漂移故障的过程中,我们发现主备节点配置如下:主节点 IP 为 172.16.30.31,备份节点 IP 为 172.16.30.32,虚拟 IP 为 172.16.30.10。故障表现为监控系统显示 Keepalived 主节点状态异常,导致 VIP 漂移到备份节点。通过详细检查配置文件和日志,我们发现主节点上的 Keepalived 进程未能正常运行,最终通过优化配置和重启服务解决了该问题。此外,我们还增加了健康检查机制,以提高系统的稳定性和可靠性。 ... [详细]
  • 本文详细解析了使用C++实现的键盘输入记录程序的源代码,该程序在Windows应用程序开发中具有很高的实用价值。键盘记录功能不仅在远程控制软件中广泛应用,还为开发者提供了强大的调试和监控工具。通过具体实例,本文深入探讨了C++键盘记录程序的设计与实现,适合需要相关技术的开发者参考。 ... [详细]
author-avatar
手机用户2502898521
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有