@Override
public ClientProtocol create(Configuration conf) throws IOException {
if (MRConfig.YARN_FRAMEWORK_NAME.equals(conf.get(MRConfig.FRAMEWORK_NAME))) {
return new YARNRunner(conf);
}
return null;
}
ClientProtocol目前有两个实现YARNRunner和LocalJobRunner,LocalJobRunner(mapreduce.framework.name为local)主要是在本地执行mapreduce,可以方便对程序进行调试。YARNRunner是将作业提交到YARN上。
YARNRunner初始化会和ResourceManager建立RPC链接(默认是8032端口
),真正和RM通信的协议是
ClientRMProtocol
,客户端和RM交互的所有操作都会通过YARNRunner的成员变量
rmClient(
ClientRMProtocol
)提交出去,比如killApplication, getNodeReports, getJobCounters等等
public synchronized void start() {
YarnRPC rpc = YarnRPC.create(getConfig());
this.rmClient = (ClientRMProtocol) rpc.getProxy(
ClientRMProtocol.class, rmAddress, getConfig());
if (LOG.isDebugEnabled()) {
LOG.debug("Connecting to ResourceManager at " + rmAddress);
}
super.start();
}
Cluster类初始化完成后,就要生成Application了,先和RM通信申请一个Application(getNewApplication),得到一个GetNewApplicationResponse,里面封装了ApplicationID,和RM能提供的最小、最大Resource Capacity
public interface GetNewApplicationResponse {
public abstract ApplicationId getApplicationId();
public Resource getMinimumResourceCapability();
public Resource getMaximumResourceCapability();
public void setMaximumResourceCapability(Resource capability);
}
Resource定义了一组集群计算资源,目前只把memory和cpu纳入进来,这边的cpu指virtual core,也就是一个物理core可以被认为抽象成多个virtual core,而非一对一对应关系
public abstract class Resource implements Comparable {
public abstract int getMemory();
public abstract void setMemory(int memory);
public abstract int getVirtualCores();
public abstract void setVirtualCores(int vCores);
}
然后需要构造ApplicationSubmissionContext,其中包含了启动MR AM的信息,比如提交的job在HDFS的staging目录路径(job.xml, job.split, job.splitmetainfo, libjars, files, archives等),用户ugi信息,Secure Tokens。完成context构造后,调用resMgrDelegate.submitApplication(appContext)
YARNRunner的submitJob方法:
@Override
public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts)
throws IOException, InterruptedException {
// Construct necessary information to start the MR AM
ApplicationSubmissionContext appCOntext=
createApplicationSubmissionContext(conf, jobSubmitDir, ts);
// Submit to ResourceManager
ApplicationId applicatiOnId= resMgrDelegate.submitApplication(appContext);
ApplicationReport appMaster = resMgrDelegate.getApplicationReport(applicationId);
String diagnostics = (appMaster == null ?
"application report is null" : appMaster.getDiagnostics());
if (appMaster == null || appMaster.getYarnApplicationState() == YarnApplicationState.FAILED
|| appMaster.getYarnApplicationState() == YarnApplicationState.KILLED) {
throw new IOException("Failed to run job : " +
diagnostics);
}
return clientCache.getClient(jobId).getJobStatus(jobId);
}
最后通过getJobStatus方法获得Job状态信息
org.apache.hadoop.mapreduce.v2.api.records.JobId jobId =
TypeConverter.toYarn(oldJobID);
GetJobReportRequest request =
recordFactory.newRecordInstance(GetJobReportRequest.class);
request.setJobId(jobId);
JobReport report = ((GetJobReportResponse) invoke("getJobReport",
GetJobReportRequest.class, request)).getJobReport();