远程过程调用(Remote Procedure Call, RPC) 是一个计算机通信协议。 该协议允许运行于一台计算机的程序调用另一台计算机的子程序, 同时将网络的通信细节隐藏起来,而程序员无需额外地为这个交互作用编程。
作为一个分布式系统, Hadoop实现了自己的RPC通信协议, 它是上层多个分布式子系统(MR、 YARN、 HDFS等) 的公用网络通信模块, 保证其轻量级、 高性能。
典型的RPC框架, 主要包括以下几个部分。
( 1) 通信模块: 主要是请求-应答协议, 包括同步方式和异步方式。
①Stub程序: 客户端和服务端均包含Stub程序, 可将之看作代理程序。
②调度程序: 调度程序接收来自通信模块的请求消息, 并根据其中的标识选择一个Stub程序进行处理。
( 2) 客户程序/服务过程: 请求的发出者和请求的处理者。
请求过程为: 客户端程序—>客户端Stub程序—>通信模块—>远程请求—>通信模块—调度程序—服务器Stub程序—>服务程序
客户端:这里我们假设ConnectionId对应的Connection并不存在。在调用getConnection方法时,这里构造了Connection,由于入参ConnectionId.doPing一般为true,因此,在Connection的构造方法时,会构造相应的pingHeader写入到成员变量pingRequest中。接着将call加入到connection中后,调用了connection.setupIOstreams,这里一开始就调用了writeConnectionHeader,一共写了7个字节的内容到服务端(分别是"hrpc",Version、Service Class、AuthProtocol,显然,前面是四个字节,后面三个都是一个字节)。另外,由于这里的成员变量doPing为true,因此,这里使用PingInputStream封装了上面获取的输入流。该方法内接着调用了writeConnectionContext,该方法就是rpc的认证调用。这里的callId设置为CONNECTION_CONTEXT_CALL_ID,这里最后调用了request.write,将request中的信息写到了out中,注意,这里并没有调用其flush方法。但是,这里封装了一次请求。接着,调用了connection.sendRpcRequest,这里将请求的call继续写入到out中,并调用了flush方法。至此,客户端的流程也就完成了。
服务端:关于服务端的流程,我从Server.Listener.run讲起。这里一开始是阻塞的。当客户端调用了flush之后,这里的阻塞被打断。这里然后调用到了doAccept方法。在该方法中首先调用到了ConnectionManager.register,这里新构造了Connection并且将其加入到成员结合connections中。在Connection的构造中封装了客户端的channel、socket等相关信息。然后将新构造的connection作为attachment绑定到SelectionKey上,然后调用reader.addConnection将其加入到阻塞队列pendingConnections,并调用wakeup唤醒了reader在doRunLoop方法中的readSelector.select阻塞。而Server.Listener中的方法继续阻塞。
接下来,我们重点关注Server.Listener.Reader.doRunLoop。这里的readSelector.select阻塞打断后,由于此时的readSelector中并没有相关的selectedKeys,因此继续循环执行。接着从pendingConnections队列中取出封装了客户端相关信息的Connection,然后将Connection.channel注册到readSelector选择器上,由于客户端已经将信息flush过来,所以,这里的注册后的readSelector不会阻塞,且其中含有相应的selectedKeys,故,接下来会调用到doRead方法,接着调用到了Connection.readAndProcess。接下来另起一段着重讲解。
这里dataLengthBuffer.remaining()>0判断成立,调用channelRead将客户端的版本信息读入。接着将Version、Service Class、AuthProtocol读入到connectionHeaderBuf中,然后调用了dataLengthBuffer.flip,方便下次读取数据长度,并且将connectionHeaderRead置为true,也就是下一次遍历的时候不会再读取头部的相关信息。只是读取实际数据长度。然后,由于data == null,因此,这里为data分配空间,然后调用了processOneRpc,由于这里是封装的CONNECTION_CONTEXT_CALL_ID的call,因此,这里来到processRpcOutOfBandRequest方法,该方法完成了认证的相关流程。由于一开始connectionContextRead为false,而在processRpcOutOfBandRequest方法中被置为true,因此,这里会调用continue。这里重新读取的数据的长度,然后继续调用了processOneRpc。这一次callId不小于0,因此会调用到方法processRpcRequest。该方法是完成真正请求的,也就是说,前面的几次调用只是前期的校验。所谓真正的请求,也就是要调用服务端的相关方法。processRpcRequest方法将客户端的相关信息封装到Call,并将其加入到callQueue中。
另一方面,在Handler.run中一直在等待callQueue中成员的加入。通过其take方法可以看到内部的阻塞队列一直在轮询等待成员的加入。这里接着调用了CurCall.set(call),方便后面获取客户端的ugi。接着就调用到了RPC.Server.call,由于这里的Server继承自org.apache.hadoop.ipc.Server(该类是一个抽象类),而前面的Server覆写了后面Server的抽象方法call。最终调用了ProtobufRpcEngine.cal,该方法完成了服务端对应方法的调用,并且将结果返回。
另外,如果客户端在请求超时之后,会调用sendPing方法,用于测试服务端的服务是否仍然开启,此时的callId为PING_CALL_ID。
三.YARN RPC的主要组成协议(1) ApplicationClientProtocol: clients与RM之间的协议, JobClient通过该RPC协议提交应用程序、 查询应用程序状态等。
(2) ResourceManagerAdministrationProtocol: Admin与RM之间的通信协议, Admin通过该RPC协议更新系统配置文件, 例如节点黑白名单等。
(3) ApplicationMasterProtocol: AM与RM之间的协议, AM通过该RPC协议向RM注册和撤销自己, 并为各个任务申请资源。
(4) ContainerManagementProtocol: AM与NM之间的协议, AM通过该RPC要求NM启动或者停止Container, 获取各个Container的使用状态等信息。
(5) ResourceTracker: NM与RM之间的协议, NM通过该RPC协议向RM注册, 并定时发送心跳信息汇报当前节点的资源使用情况和Container运行情况。
clients与RM之间的协议, JobClient通过该RPC协议提交应用程序、 查询应用程序状态、 集群状态、 节点、队列和权限控制等。
方法名称 | 描述 |
---|---|
getNewApplication | client获得一个单调递增的ApplicationId用来提交Application . 响应信息中会包含集群的一些详细信息,比如集群中指定最大资源能力 |
submitApplication | client向ResourceManager提交Application. 客户端需要通过SubmitApplicationRequest提供诸如队列、运行ApplicationMaster所需的资源、用于启动ApplicationMaster的容器的数据量等详细信息 ResourceManager如果接受请求的话会立即返回一个<空的>SubmitApplicationResponse,如果拒绝则直接回抛出一个异常,然而 请求需要根据getApplicationReport方法确保application被正确的提交, 从ResourceManager获取SubmitApplicationResponse并不保证RM在故障转移或重新启动之后“记住”此应用程序。 如果ResourceManager在成功保存application之前发生了故障转义或者重启,getApplicationReport方法将会抛出一个ApplicationNotFoundException异常.当clinet在通过getApplicationReport获得一个ApplicationNotFoundException时,会以相同的ApplicationSubmissionContext配置重新提交application. 在提交 的过程中,会检查application 是否存在,如果application存在,将会返回SubmitApplicationResponse. 在安全模式下,ResouceManager会在接受应用程序提交之前验证对队列等的访问权限。 |
failApplicationAttempt | client用来请求ResourceManager使应用程序尝试失败 |
forceKillApplication | client请求ResourceManager终止已提交的Application |
moveApplicationAcrossQueues | 将一个application移动到另外一个队列 |
updateApplicationPriority | 更新application的优先级 |
资源预约
方法名称 | 描述 |
---|---|
getNewReservation | client请求ResourceManager获取ReservationId |
submitReservation | 提交reservation |
updateReservation | 更新reservation |
deleteReservation | 删除reservation |
listReservations | 根据筛选条件查询reservation列表信息. 可通过queue, reservationId, startTime,endTime,includeReservationAllocations等参数. |
signalToContainer | 客户端用于请求ResourceManager向容器发出信号的接口 比如像container发送OUTPUT_THREAD_DUMP命令获取线程快照信息. 安全模式下,ResourceManager在application发送命令给container之前会验证权限信息. 用户需要具有 |
updateApplicationTimeouts | 更新application的超时时间. 需要提供 |
方法名称 | 描述 |
---|---|
getClusterMetrics | client 向ResourceManager获取集群信息 |
getClusterNodes | client 向ResourceManager获取集群所有节点的信息 |
getNodeToLabels | 获取node节点的label信息 |
getLabelsToNodes | 获取label的node 信息 |
getClusterNodeLabels | 获取集群中的label信息 |
getResourceProfiles | 获取特定资源配置[resource-profiles.json,] |
getResourceTypeInfo | 获取特定资源配置 |
getAttributesToNodes | 获取attribute->node属性 |
getClusterNodeAttributes | 获取集群node属性 |
getNodesToAttributes | 获取集群node->attribute属性 |
方法名称 | 描述 |
---|---|
getQueueInfo | 获取集群队列信息[已使用/总容量/子队列/正在运行任务信息] |
getQueueUserAcls | Client从ResourceManager获取有关当前用户的队列ACL的信息 |
Admin与RM之间的通信协议&#xff0c; Admin通过该RPC协议更新系统配置文件&#xff0c; 例如节点黑白名单等。ResourceManagerAdministrationProtocol继成了GetUserMappingsProtocol协议接口.
接口信息如下:
方法名称 | 描述 |
---|---|
refreshQueues | 刷新队列 |
refreshNodes | 刷新节点 |
refreshSuperUserGroupsConfiguration | 刷新配置 |
refreshUserToGroupsMappings | 刷新用户->用户组映射信息 |
refreshAdminAcls | 刷新Admin的ACL信息 |
refreshServiceAcls | 刷新服务级别信息&#xff08;SLA&#xff09; |
updateNodeResource | 更新在RM端维护的RMNode资源信息 |
refreshNodesResources | 刷新node资源信息 |
addToClusterNodeLabels | 向集群中节点添加Label |
removeFromClusterNodeLabels | 移除集群中节点Label |
replaceLabelsOnNode | 替换集群中节点Label |
checkForDecommissioningNodes | 检查停用的节点 |
refreshClusterMaxPriority | 刷新群集最大优先级 |
mapAttributesToNodes | 获取attribute -> node 信息 |
NM与RM之间的协议&#xff0c; NM通过该RPC协议向RM注册&#xff0c; 并定时发送心跳信息汇报当前节点的资源使用情况和Container运行情况 [ 服务端ResourceTrackerService : 8031端口]。
方法名称 | 描述 |
---|---|
nodeHeartbeat | 心跳信息 |
registerNodeManager | 注册NodeManager |
unRegisterNodeManager | 解除注册NodeManager |
AM与RM之间的协议&#xff0c; AM通过该RPC协议向RM注册和撤销自己&#xff0c; 并为各个任务申请资源。
方法名称 | 描述 |
---|---|
registerApplicationMaster | 新的ApplicationMaster向RM注册 ApplicationMaster会提供RPC端口,url等信息给RM,响应信息会返回集群所能响应的最大资源能力 |
finishApplicationMaster | ApplicationMaster通知RM自己的状态为成功/失败 |
allocate | ApplicationMaster向ResourceManager申请资源/心跳 |
ContainerManagementProtocol&#xff1a; AM与NM之间的协议&#xff0c; AM通过该RPC要求NM启动或者停止Container&#xff0c; 获取各个Container的使用状态等信息。
方法名称 | 描述 |
---|---|
startContainers | 启动容器 |
stopContainers | 停止容器 |
getContainerStatuses | 获取容器状态 |
increaseContainersResource [废弃] | 增加容器资源 |
updateContainer | 更新容器 |
signalToContainer | 发送信号 |
localize | 本地化容器所需的资源,目前&#xff0c;此API仅适用于运行容器 |
reInitializeContainer | 使用新的 Launch Context 初始化容器 |
restartContainer | 重新启动容器 |
rollbackLastReInitialization | 尝试回滚最后一次重新初始化操作 |
commitLastReInitialization | 尝试提交最后一次初始化操作,如果提交成功则不可以回滚. |
ContainerManagementProtocol协议主要提供了以下三个RPC函数 :
❑ startContainer&#xff1a; ApplicationMaster通过该RPC要求NodeManager启动一个Container。该函数有一个StartContainerRequest类型的参数&#xff0c; 封装了Container启动所需的本地资源、 环境变量、 执行命令、 Token等信息。 如果Container启动成功&#xff0c; 则该函数返回一个StartContainerResponse对象。
❑ stopContainer&#xff1a; ApplicationMaster通过该RPC要求NodeManager停止&#xff08; 杀死&#xff09; 一个Container。 该函数有一个StopContainerRequest类型的参数&#xff0c; 用于指定待杀死的ContainerID。 如果Container被成功杀死&#xff0c; 则该函数返回一个StopContainer-Response对象。
❑ getContainerStatus&#xff1a; ApplicationMaster通过该RPC获取一个Container的运行状态。 该函数参数类型为GetContainerStatusRequest&#xff0c; 封装了目标Container的ID&#xff0c; 返回值为封装了Container当前运行状态的类型为GetContainerStatusResponse的对象。