热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Hadoop3.2.1【YARN】源码分析:RPC通讯解析

--------------------------一.前言二.RPC流程概略图三.YARNRPC的主要组成协议四.ApplicationClientProtocol五.Resou

--------------------------

  • 一.前言
  • 二.RPC流程 概略图
  • 三.YARN RPC的主要组成协议
  • 四.ApplicationClientProtocol
  • 五.ResourceManagerAdministrationProtocol
  • 六.ResourceTracker
  • 七.ApplicationMasterProtocol
  • 八.ContainerManagementProtocol


一.前言

远程过程调用(Remote Procedure Call, RPC) 是一个计算机通信协议。 该协议允许运行于一台计算机的程序调用另一台计算机的子程序, 同时将网络的通信细节隐藏起来,而程序员无需额外地为这个交互作用编程。

作为一个分布式系统, Hadoop实现了自己的RPC通信协议, 它是上层多个分布式子系统(MR、 YARN、 HDFS等) 的公用网络通信模块, 保证其轻量级、 高性能。

在这里插入图片描述

典型的RPC框架, 主要包括以下几个部分。
( 1) 通信模块: 主要是请求-应答协议, 包括同步方式和异步方式。
①Stub程序: 客户端和服务端均包含Stub程序, 可将之看作代理程序。
②调度程序: 调度程序接收来自通信模块的请求消息, 并根据其中的标识选择一个Stub程序进行处理。
( 2) 客户程序/服务过程: 请求的发出者和请求的处理者。
请求过程为: 客户端程序—>客户端Stub程序—>通信模块—>远程请求—>通信模块—调度程序—服务器Stub程序—>服务程序

二.RPC流程 概略图

在这里插入图片描述

客户端:这里我们假设ConnectionId对应的Connection并不存在。在调用getConnection方法时,这里构造了Connection,由于入参ConnectionId.doPing一般为true,因此,在Connection的构造方法时,会构造相应的pingHeader写入到成员变量pingRequest中。接着将call加入到connection中后,调用了connection.setupIOstreams,这里一开始就调用了writeConnectionHeader,一共写了7个字节的内容到服务端(分别是"hrpc",Version、Service Class、AuthProtocol,显然,前面是四个字节,后面三个都是一个字节)。另外,由于这里的成员变量doPing为true,因此,这里使用PingInputStream封装了上面获取的输入流。该方法内接着调用了writeConnectionContext,该方法就是rpc的认证调用。这里的callId设置为CONNECTION_CONTEXT_CALL_ID,这里最后调用了request.write,将request中的信息写到了out中,注意,这里并没有调用其flush方法。但是,这里封装了一次请求。接着,调用了connection.sendRpcRequest,这里将请求的call继续写入到out中,并调用了flush方法。至此,客户端的流程也就完成了。

服务端:关于服务端的流程,我从Server.Listener.run讲起。这里一开始是阻塞的。当客户端调用了flush之后,这里的阻塞被打断。这里然后调用到了doAccept方法。在该方法中首先调用到了ConnectionManager.register,这里新构造了Connection并且将其加入到成员结合connections中。在Connection的构造中封装了客户端的channel、socket等相关信息。然后将新构造的connection作为attachment绑定到SelectionKey上,然后调用reader.addConnection将其加入到阻塞队列pendingConnections,并调用wakeup唤醒了reader在doRunLoop方法中的readSelector.select阻塞。而Server.Listener中的方法继续阻塞。

接下来,我们重点关注Server.Listener.Reader.doRunLoop。这里的readSelector.select阻塞打断后,由于此时的readSelector中并没有相关的selectedKeys,因此继续循环执行。接着从pendingConnections队列中取出封装了客户端相关信息的Connection,然后将Connection.channel注册到readSelector选择器上,由于客户端已经将信息flush过来,所以,这里的注册后的readSelector不会阻塞,且其中含有相应的selectedKeys,故,接下来会调用到doRead方法,接着调用到了Connection.readAndProcess。接下来另起一段着重讲解。

这里dataLengthBuffer.remaining()>0判断成立,调用channelRead将客户端的版本信息读入。接着将Version、Service Class、AuthProtocol读入到connectionHeaderBuf中,然后调用了dataLengthBuffer.flip,方便下次读取数据长度,并且将connectionHeaderRead置为true,也就是下一次遍历的时候不会再读取头部的相关信息。只是读取实际数据长度。然后,由于data == null,因此,这里为data分配空间,然后调用了processOneRpc,由于这里是封装的CONNECTION_CONTEXT_CALL_ID的call,因此,这里来到processRpcOutOfBandRequest方法,该方法完成了认证的相关流程。由于一开始connectionContextRead为false,而在processRpcOutOfBandRequest方法中被置为true,因此,这里会调用continue。这里重新读取的数据的长度,然后继续调用了processOneRpc。这一次callId不小于0,因此会调用到方法processRpcRequest。该方法是完成真正请求的,也就是说,前面的几次调用只是前期的校验。所谓真正的请求,也就是要调用服务端的相关方法。processRpcRequest方法将客户端的相关信息封装到Call,并将其加入到callQueue中。

另一方面,在Handler.run中一直在等待callQueue中成员的加入。通过其take方法可以看到内部的阻塞队列一直在轮询等待成员的加入。这里接着调用了CurCall.set(call),方便后面获取客户端的ugi。接着就调用到了RPC.Server.call,由于这里的Server继承自org.apache.hadoop.ipc.Server(该类是一个抽象类),而前面的Server覆写了后面Server的抽象方法call。最终调用了ProtobufRpcEngine.cal,该方法完成了服务端对应方法的调用,并且将结果返回。

另外,如果客户端在请求超时之后,会调用sendPing方法,用于测试服务端的服务是否仍然开启,此时的callId为PING_CALL_ID。

三.YARN RPC的主要组成协议

在这里插入图片描述

(1) ApplicationClientProtocol: clients与RM之间的协议, JobClient通过该RPC协议提交应用程序、 查询应用程序状态等。
(2) ResourceManagerAdministrationProtocol: Admin与RM之间的通信协议, Admin通过该RPC协议更新系统配置文件, 例如节点黑白名单等。
(3) ApplicationMasterProtocol: AM与RM之间的协议, AM通过该RPC协议向RM注册和撤销自己, 并为各个任务申请资源。
(4) ContainerManagementProtocol: AM与NM之间的协议, AM通过该RPC要求NM启动或者停止Container, 获取各个Container的使用状态等信息。
(5) ResourceTracker: NM与RM之间的协议, NM通过该RPC协议向RM注册, 并定时发送心跳信息汇报当前节点的资源使用情况和Container运行情况。

四.ApplicationClientProtocol

clients与RM之间的协议, JobClient通过该RPC协议提交应用程序、 查询应用程序状态、 集群状态、 节点、队列和权限控制等。

  • 应用信息

方法名称描述
getNewApplicationclient获得一个单调递增的ApplicationId用来提交Application .
响应信息中会包含集群的一些详细信息,比如集群中指定最大资源能力
submitApplicationclient向ResourceManager提交Application.
客户端需要通过SubmitApplicationRequest提供诸如队列、运行ApplicationMaster所需的资源、用于启动ApplicationMaster的容器的数据量等详细信息
ResourceManager如果接受请求的话会立即返回一个<空的>SubmitApplicationResponse,如果拒绝则直接回抛出一个异常,然而 请求需要根据getApplicationReport方法确保application被正确的提交, 从ResourceManager获取SubmitApplicationResponse并不保证RM在故障转移或重新启动之后“记住”此应用程序。
如果ResourceManager在成功保存application之前发生了故障转义或者重启,getApplicationReport方法将会抛出一个ApplicationNotFoundException异常.当clinet在通过getApplicationReport获得一个ApplicationNotFoundException时,会以相同的ApplicationSubmissionContext配置重新提交application.
在提交 的过程中,会检查application 是否存在,如果application存在,将会返回SubmitApplicationResponse.
在安全模式下,ResouceManager会在接受应用程序提交之前验证对队列等的访问权限。
failApplicationAttemptclient用来请求ResourceManager使应用程序尝试失败
forceKillApplicationclient请求ResourceManager终止已提交的Application
moveApplicationAcrossQueues将一个application移动到另外一个队列
updateApplicationPriority更新application的优先级

资源预约


方法名称描述
getNewReservationclient请求ResourceManager获取ReservationId
submitReservation提交reservation
updateReservation更新reservation
deleteReservation删除reservation
listReservations根据筛选条件查询reservation列表信息.
可通过queue, reservationId, startTime,endTime,includeReservationAllocations等参数.
signalToContainer客户端用于请求ResourceManager向容器发出信号的接口
比如像container发送OUTPUT_THREAD_DUMP命令获取线程快照信息.
安全模式下,ResourceManager在application发送命令给container之前会验证权限信息. 用户需要具有权限
updateApplicationTimeouts更新application的超时时间. 需要提供格式时间,如果超时时间小于等于当前时间会抛出异常

在这里插入图片描述

  1. 用户提交预订(reservation)创建请求&#xff0c;并接收包含ReservationId的响应。
  2. 用户提交一个用RDL(Reservation Definition Language)表示的规格文件和ReservationId组成的预订请求。这描述了用户对资源随时间的需求&#xff08;例如&#xff0c;资源的阈值&#xff09;和时间约束&#xff08;例如&#xff0c;期限&#xff09;。这可以通过常用的Client-to-RM协议或通过RM的REST api以编程方式完成。如果使用相同的ReservationId提交预订&#xff0c;并且RDL相同&#xff0c;则不会创建新预留并且请求将成功。如果RDL不同&#xff0c;则将拒绝预留&#xff0c;并且请求将不成功。如果使用相同的ReservationId提交预订&#xff0c;并且RDL相同&#xff0c;则不会创建新预留但是请求将会成功。如果RDL不同&#xff0c;则将拒绝预留&#xff0c;并且请求将会不成功。
  3. ReservationSystem利用ReservationAgent&#xff08;图中的GREE&#xff09;启动一个客户端去查找合理的资源分配。一般是通过一个计划&#xff0c;计划是一个数据结构&#xff0c;描述了当前已经接受的保留和可获得的资源。
  4. SharingPolicy提供一种方式去接受或者拒绝预订。
  5. 成功验证后&#xff0c;ReservationSystem会向用户返回ReservationId。
    6.到一定时间&#xff0c;一个名为PlanFollower的新组件通过动态的创建/调整/销毁队列&#xff0c;将计划状态发布到调度程序。
  6. 用户可以提交一个作业到可预留队列&#xff0c;可以明确指定ReservationId作为ApplicationSubmissionContext的一部分。
  7. Scheduler将从创建的特殊队列中提供容器&#xff0c;以确保遵守资源预留。在预留的限制内&#xff0c;用户已经保证访问资源&#xff0c;在资源共享之上进行标准的Capacity/Fairness共享。
  8. 系统同时提供一个机制去适应集群容量下降的机制。

  • 集群/节点/配置信息

方法名称描述
getClusterMetricsclient 向ResourceManager获取集群信息
getClusterNodesclient 向ResourceManager获取集群所有节点的信息
getNodeToLabels获取node节点的label信息
getLabelsToNodes获取label的node 信息
getClusterNodeLabels获取集群中的label信息
getResourceProfiles获取特定资源配置[resource-profiles.json,]
getResourceTypeInfo获取特定资源配置
getAttributesToNodes获取attribute->node属性
getClusterNodeAttributes获取集群node属性
getNodesToAttributes获取集群node->attribute属性

  • 队列信息

方法名称描述
getQueueInfo获取集群队列信息[已使用/总容量/子队列/正在运行任务信息]
getQueueUserAclsClient从ResourceManager获取有关当前用户的队列ACL的信息

五.ResourceManagerAdministrationProtocol

Admin与RM之间的通信协议&#xff0c; Admin通过该RPC协议更新系统配置文件&#xff0c; 例如节点黑白名单等。ResourceManagerAdministrationProtocol继成了GetUserMappingsProtocol协议接口.
接口信息如下:
在这里插入图片描述

方法名称描述
refreshQueues刷新队列
refreshNodes刷新节点
refreshSuperUserGroupsConfiguration刷新配置
refreshUserToGroupsMappings刷新用户->用户组映射信息
refreshAdminAcls刷新Admin的ACL信息
refreshServiceAcls刷新服务级别信息&#xff08;SLA&#xff09;
updateNodeResource更新在RM端维护的RMNode资源信息
refreshNodesResources刷新node资源信息
addToClusterNodeLabels向集群中节点添加Label
removeFromClusterNodeLabels移除集群中节点Label
replaceLabelsOnNode替换集群中节点Label
checkForDecommissioningNodes检查停用的节点
refreshClusterMaxPriority刷新群集最大优先级
mapAttributesToNodes获取attribute -> node 信息

六.ResourceTracker

NM与RM之间的协议&#xff0c; NM通过该RPC协议向RM注册&#xff0c; 并定时发送心跳信息汇报当前节点的资源使用情况和Container运行情况 [ 服务端ResourceTrackerService : 8031端口]。

方法名称描述
nodeHeartbeat心跳信息
registerNodeManager注册NodeManager
unRegisterNodeManager解除注册NodeManager

七.ApplicationMasterProtocol

AM与RM之间的协议&#xff0c; AM通过该RPC协议向RM注册和撤销自己&#xff0c; 并为各个任务申请资源。

方法名称描述
registerApplicationMaster新的ApplicationMaster向RM注册
ApplicationMaster会提供RPC端口,url等信息给RM,响应信息会返回集群所能响应的最大资源能力
finishApplicationMasterApplicationMaster通知RM自己的状态为成功/失败
allocateApplicationMaster向ResourceManager申请资源/心跳

八.ContainerManagementProtocol

ContainerManagementProtocol&#xff1a; AM与NM之间的协议&#xff0c; AM通过该RPC要求NM启动或者停止Container&#xff0c; 获取各个Container的使用状态等信息。

方法名称描述
startContainers启动容器
stopContainers停止容器
getContainerStatuses获取容器状态
increaseContainersResource [废弃]增加容器资源
updateContainer更新容器
signalToContainer发送信号
localize本地化容器所需的资源,目前&#xff0c;此API仅适用于运行容器
reInitializeContainer使用新的 Launch Context 初始化容器
restartContainer重新启动容器
rollbackLastReInitialization尝试回滚最后一次重新初始化操作
commitLastReInitialization尝试提交最后一次初始化操作,如果提交成功则不可以回滚.

ContainerManagementProtocol协议主要提供了以下三个RPC函数 :
❑ startContainer&#xff1a; ApplicationMaster通过该RPC要求NodeManager启动一个Container。该函数有一个StartContainerRequest类型的参数&#xff0c; 封装了Container启动所需的本地资源、 环境变量、 执行命令、 Token等信息。 如果Container启动成功&#xff0c; 则该函数返回一个StartContainerResponse对象。
❑ stopContainer&#xff1a; ApplicationMaster通过该RPC要求NodeManager停止&#xff08; 杀死&#xff09; 一个Container。 该函数有一个StopContainerRequest类型的参数&#xff0c; 用于指定待杀死的ContainerID。 如果Container被成功杀死&#xff0c; 则该函数返回一个StopContainer-Response对象。
❑ getContainerStatus&#xff1a; ApplicationMaster通过该RPC获取一个Container的运行状态。 该函数参数类型为GetContainerStatusRequest&#xff0c; 封装了目标Container的ID&#xff0c; 返回值为封装了Container当前运行状态的类型为GetContainerStatusResponse的对象。


推荐阅读
  • 什么是大数据lambda架构
    一、什么是Lambda架构Lambda架构由Storm的作者[NathanMarz]提出,根据维基百科的定义,Lambda架构的设计是为了在处理大规模数 ... [详细]
  • ejava,刘聪dejava
    本文目录一览:1、什么是Java?2、java ... [详细]
  • 阿,里,云,物,联网,net,core,客户端,czgl,aliiotclient, ... [详细]
  • 计算机存储系统的层次结构及其优势
    本文介绍了计算机存储系统的层次结构,包括高速缓存、主存储器和辅助存储器三个层次。通过分层存储数据可以提高程序的执行效率。计算机存储系统的层次结构将各种不同存储容量、存取速度和价格的存储器有机组合成整体,形成可寻址存储空间比主存储器空间大得多的存储整体。由于辅助存储器容量大、价格低,使得整体存储系统的平均价格降低。同时,高速缓存的存取速度可以和CPU的工作速度相匹配,进一步提高程序执行效率。 ... [详细]
  • 本文介绍了在mac环境下使用nginx配置nodejs代理服务器的步骤,包括安装nginx、创建目录和文件、配置代理的域名和日志记录等。 ... [详细]
  • 解决nginx启动报错epoll_wait() reported that client prematurely closed connection的方法
    本文介绍了解决nginx启动报错epoll_wait() reported that client prematurely closed connection的方法,包括检查location配置是否正确、pass_proxy是否需要加“/”等。同时,还介绍了修改nginx的error.log日志级别为debug,以便查看详细日志信息。 ... [详细]
  • mac php错误日志配置方法及错误级别修改
    本文介绍了在mac环境下配置php错误日志的方法,包括修改php.ini文件和httpd.conf文件的操作步骤。同时还介绍了如何修改错误级别,以及相应的错误级别参考链接。 ... [详细]
  • 如何提高PHP编程技能及推荐高级教程
    本文介绍了如何提高PHP编程技能的方法,推荐了一些高级教程。学习任何一种编程语言都需要长期的坚持和不懈的努力,本文提醒读者要有足够的耐心和时间投入。通过实践操作学习,可以更好地理解和掌握PHP语言的特异性,特别是单引号和双引号的用法。同时,本文也指出了只走马观花看整体而不深入学习的学习方式无法真正掌握这门语言,建议读者要从整体来考虑局部,培养大局观。最后,本文提醒读者完成一个像模像样的网站需要付出更多的努力和实践。 ... [详细]
  • 基于Socket的多个客户端之间的聊天功能实现方法
    本文介绍了基于Socket的多个客户端之间实现聊天功能的方法,包括服务器端的实现和客户端的实现。服务器端通过每个用户的输出流向特定用户发送消息,而客户端通过输入流接收消息。同时,还介绍了相关的实体类和Socket的基本概念。 ... [详细]
  • 使用freemaker生成Java代码的步骤及示例代码
    本文介绍了使用freemaker这个jar包生成Java代码的步骤,通过提前编辑好的模板,可以避免写重复代码。首先需要在springboot的pom.xml文件中加入freemaker的依赖包。然后编写模板,定义要生成的Java类的属性和方法。最后编写生成代码的类,通过加载模板文件和数据模型,生成Java代码文件。本文提供了示例代码,并展示了文件目录结构。 ... [详细]
  • Hadoop 源码学习笔记(4)Hdfs 数据读写流程分析
    Hdfs的数据模型在对读写流程进行分析之前,我们需要先对Hdfs的数据模型有一个简单的认知。数据模型如上图所示,在NameNode中有一个唯一的FSDirectory类负责维护文件 ... [详细]
  • 本文讨论了如何使用GStreamer来删除H264格式视频文件中的中间部分,而不需要进行重编码。作者提出了使用gst_element_seek(...)函数来实现这个目标的思路,并提到遇到了一个解决不了的BUG。文章还列举了8个解决方案,希望能够得到更好的思路。 ... [详细]
  • 本文整理了Java中java.lang.NoSuchMethodError.getMessage()方法的一些代码示例,展示了NoSuchMethodErr ... [详细]
  • 本文介绍了解决java开源项目apache commons email简单使用报错的方法,包括使用正确的JAR包和正确的代码配置,以及相关参数的设置。详细介绍了如何使用apache commons email发送邮件。 ... [详细]
  • Flink使用java实现读取csv文件简单实例首先我们来看官方文档中给出的几种方法:首先我们来看官方文档中给出的几种方法:第一种:Da ... [详细]
author-avatar
零泉爱
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有