NM和RM类似,也是以服务的形式启动,但服务相对于RM来说少些,也经历服务初始化和服务启动两个阶段,NM继承自CompositeService。NM的服务初始化列表如下:
public static void main(String[] args) {
//异常处理
Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
StringUtils.startupShutdownMessage(NodeManager.class, args, LOG);
//创建初始NM
NodeManager nodeManager = new NodeManager();
//载入控制文件
Configuration cOnf= new YarnConfiguration();
setHttpPolicy(conf);
//初始化并启动
nodeManager.initAndStartNodeManager(conf, false);
}下面进入initAndStartNodeManager函数,该函数主要包含两个功能:init和start
private void initAndStartNodeManager(Configuration conf, boolean hasToReboot) {
try {
.....
.....
this.init(conf);//初始化
this.start();//启动
} catch (Throwable t) {
LOG.fatal("Error starting NodeManager", t);
System.exit(-1);
}
}服务初始化最终进入NM的serviceInit函数,该函数比较清晰的描述了NM服务用到的子服务
@Override
protected void serviceInit(Configuration conf) throws Exception {
conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true);
//容器令牌管理器
NMContainerTokenSecretManager cOntainerTokenSecretManager=
new NMContainerTokenSecretManager(conf);
//NM令牌管理
NMTokenSecretManagerInNM nmTokenSecretManager =
new NMTokenSecretManagerInNM();
//APP权限管理
this.aclsManager = new ApplicationACLsManager(conf);
//容器启动器,非常重要
ContainerExecutor exec = ReflectionUtils.newInstance(
conf.getClass(YarnConfiguration.NM_CONTAINER_EXECUTOR,
DefaultContainerExecutor.class, ContainerExecutor.class), conf);
try {
exec.init();
} catch (IOException e) {
throw new YarnRuntimeException("Failed to initialize container executor", e);
}
//文件清理服务
DeletionService del = createDeletionService(exec);
addService(del);
// NodeManager level dispatcher
//事件分发器
this.dispatcher = new AsyncDispatcher();
//节点健康检查
nodeHealthChecker = new NodeHealthCheckerService();
addService(nodeHealthChecker);
dirsHandler = nodeHealthChecker.getDiskHandler();
this.cOntext= createNMContext(containerTokenSecretManager,
nmTokenSecretManager);
//状态更新服务
nodeStatusUpdater =
createNodeStatusUpdater(context, dispatcher, nodeHealthChecker);
//节点资源监控
NodeResourceMonitor nodeResourceMOnitor= createNodeResourceMonitor();
addService(nodeResourceMonitor);
//容器管理器
cOntainerManager=
createContainerManager(context, exec, del, nodeStatusUpdater,
this.aclsManager, dirsHandler);
addService(containerManager);
((NMContext) context).setContainerManager(containerManager);
//web服务
WebServer webServer = createWebServer(context, containerManager
.getContainersMonitor(), this.aclsManager, dirsHandler);
addService(webServer);
((NMContext) context).setWebServer(webServer);
dispatcher.register(ContainerManagerEventType.class, containerManager);
dispatcher.register(NodeManagerEventType.class, this);
addService(dispatcher);
//指标监控系统
DefaultMetricsSystem.initialize("NodeManager");
// StatusUpdater should be added last so that it get started last
// so that we make sure everything is up before registering with RM.
addService(nodeStatusUpdater);
//循环初始化上述服务
super.serviceInit(conf);
// TODO add local dirs to del
}DeletionService服务:主要用于删除文件,该服务启动了一个线程池,默认核心数为4,有文件需要删除时会构建FileDeletionTask放入线程池
@Override
protected void serviceInit(Configuration conf) throws Exception {
//创建线程工厂
ThreadFactory tf = new ThreadFactoryBuilder()
.setNameFormat("DeletionService #%d")
.build();
if (conf != null) {
//构建线程池
sched = new ScheduledThreadPoolExecutor(
conf.getInt(YarnConfiguration.NM_DELETE_THREAD_COUNT, YarnConfiguration.DEFAULT_NM_DELETE_THREAD_COUNT),
tf);
debugDelay = conf.getInt(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 0);
} else {
sched = new ScheduledThreadPoolExecutor(YarnConfiguration.DEFAULT_NM_DELETE_THREAD_COUNT,
tf);
}
sched.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
sched.setKeepAliveTime(60L, SECONDS);
super.serviceInit(conf);
}
ContainerManager:容器管理器,相关参数yarn.nodemanager.sleep-delay-before-sigkill.ms yarn.nodemanager.process-kill-wait.ms,包含多个子服务,列表如下:
//ContainerManagerImpl.java
@Override
public void serviceInit(Configuration conf) throws Exception {
LogHandler logHandler =
createLogHandler(conf, this.context, this.deletionService);
addIfService(logHandler);
dispatcher.register(LogHandlerEventType.class, logHandler);
waitForCOntainersOnShutdownMillis=
conf.getLong(YarnConfiguration.NM_SLEEP_DELAY_BEFORE_SIGKILL_MS,
YarnConfiguration.DEFAULT_NM_SLEEP_DELAY_BEFORE_SIGKILL_MS) +
conf.getLong(YarnConfiguration.NM_PROCESS_KILL_WAIT_MS,
YarnConfiguration.DEFAULT_NM_PROCESS_KILL_WAIT_MS) +
SHUTDOWN_CLEANUP_SLOP_MS;
//此处会启动各个子服务
super.serviceInit(conf);
}ContainerManager之ResourceLocalizationService:负责本地初始化,建立相应目录,相关参数yarn.nodemanager.localizer.cache.target-size-mb yarn.nodemanager.localizer.cache.cleanup.interval-ms
@Override @Override @Override @Override Hadoop2.x NodeManager启动之服务初始化,布布扣,bubuko.com
public void serviceInit(Configuration conf) throws Exception {
this.validateConf(conf);
this.publicRsrc =
new LocalResourcesTrackerImpl(null, dispatcher, true, conf);
this.recordFactory = RecordFactoryProvider.getRecordFactory(conf);
try {
FileContext lfs = getLocalFileContext(conf);
lfs.setUMask(new FsPermission((short)FsPermission.DEFAULT_UMASK));
//清理本地目录
cleanUpLocalDir(lfs,delService);
//建立初始化目录
List
for (String localDir : localDirs) {
// $local/usercache
Path userDir = new Path(localDir, ContainerLocalizer.USERCACHE);
lfs.mkdir(userDir, null, true);
// $local/filecache
Path fileDir = new Path(localDir, ContainerLocalizer.FILECACHE);
lfs.mkdir(fileDir, null, true);
// $local/nmPrivate
Path sysDir = new Path(localDir, NM_PRIVATE_DIR);
lfs.mkdir(sysDir, NM_PRIVATE_PERM, true);
}
//日志目录创建
List
for (String logDir : logDirs) {
lfs.mkdir(new Path(logDir), null, true);
}
} catch (IOException e) {
throw new YarnRuntimeException("Failed to initialize LocalizationService", e);
}
//初始化相关参数设置
cacheTargetSize =
conf.getLong(YarnConfiguration.NM_LOCALIZER_CACHE_TARGET_SIZE_MB, YarnConfiguration.DEFAULT_NM_LOCALIZER_CACHE_TARGET_SIZE_MB) <<20;
cacheCleanupPeriod =
conf.getLong(YarnConfiguration.NM_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS, YarnConfiguration.DEFAULT_NM_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS);
localizatiOnServerAddress= conf.getSocketAddr(
YarnConfiguration.NM_LOCALIZER_ADDRESS,
YarnConfiguration.DEFAULT_NM_LOCALIZER_ADDRESS,
YarnConfiguration.DEFAULT_NM_LOCALIZER_PORT);
localizerTracker = createLocalizerTracker(conf);
addService(localizerTracker);
dispatcher.register(LocalizerEventType.class, localizerTracker);
super.serviceInit(conf);
}ContainerManager之containers-launcher:容器启动服务,初始化过程比较简单,如下
protected void serviceInit(Configuration conf) throws Exception {
try {
//TODO Is this required?
FileContext.getLocalFSFileContext(conf);
} catch (UnsupportedFileSystemException e) {
throw new YarnRuntimeException("Failed to start ContainersLauncher", e);
}
super.serviceInit(conf);
}ContainerManager之containers-monitor:容器状态监控,主要是内存合理性方面的校验
protected void serviceInit(Configuration conf) throws Exception {
//监控频率设定
this.mOnitoringInterval=
conf.getLong(YarnConfiguration.NM_CONTAINER_MON_INTERVAL_MS,
YarnConfiguration.DEFAULT_NM_CONTAINER_MON_INTERVAL_MS);
//资源计算类,可由yarn.nodemanager.container-monitor.resource-calculator.class指定
Class extends ResourceCalculatorPlugin> clazz =
conf.getClass(YarnConfiguration.NM_CONTAINER_MON_RESOURCE_CALCULATOR, null,
ResourceCalculatorPlugin.class);
this.resourceCalculatorPlugin =
ResourceCalculatorPlugin.getResourceCalculatorPlugin(clazz, conf);
LOG.info(" Using ResourceCalculatorPlugin : "
+ this.resourceCalculatorPlugin);
processTreeClass = conf.getClass(YarnConfiguration.NM_CONTAINER_MON_PROCESS_TREE, null,
ResourceCalculatorProcessTree.class);
this.cOnf= conf;
LOG.info(" Using ResourceCalculatorProcessTree : "
+ this.processTreeClass);
//容器运行内存设置
long cOnfiguredPMemForContainers= conf.getLong(
YarnConfiguration.NM_PMEM_MB,
YarnConfiguration.DEFAULT_NM_PMEM_MB) * 1024 * 1024l;
// Setting these irrespective of whether checks are enabled. Required in
// the UI.
// ///////// Physical memory configuration //////
//最大物理内存
this.maxPmemAllottedForCOntainers= configuredPMemForContainers;
// ///////// Virtual memory configuration //////
//最大虚拟内存计算
float vmemRatio = conf.getFloat(YarnConfiguration.NM_VMEM_PMEM_RATIO,
YarnConfiguration.DEFAULT_NM_VMEM_PMEM_RATIO);
Preconditions.checkArgument(vmemRatio > 0.99f,
YarnConfiguration.NM_VMEM_PMEM_RATIO + " should be at least 1.0");
this.maxVmemAllottedForCOntainers=
(long) (vmemRatio * configuredPMemForContainers);
//以下都是内存检测操作,这些操作通过资源计算插件resourceCalculatorPlugin来实现
pmemCheckEnabled = conf.getBoolean(YarnConfiguration.NM_PMEM_CHECK_ENABLED,
YarnConfiguration.DEFAULT_NM_PMEM_CHECK_ENABLED);
vmemCheckEnabled = conf.getBoolean(YarnConfiguration.NM_VMEM_CHECK_ENABLED,
YarnConfiguration.DEFAULT_NM_VMEM_CHECK_ENABLED);
LOG.info("Physical memory check enabled: " + pmemCheckEnabled);
LOG.info("Virtual memory check enabled: " + vmemCheckEnabled);
//物理内存计算
if (pmemCheckEnabled) {
// Logging if actual pmem cannot be determined.
long totalPhysicalMemoryOnNM= UNKNOWN_MEMORY_LIMIT;
if (this.resourceCalculatorPlugin != null) {
totalPhysicalMemoryOnNM= this.resourceCalculatorPlugin
.getPhysicalMemorySize();
if (totalPhysicalMemoryOnNM <= 0) {
LOG.warn("NodeManager‘s totalPmem could not be calculated. "
+ "Setting it to " + UNKNOWN_MEMORY_LIMIT);
totalPhysicalMemoryOnNM= UNKNOWN_MEMORY_LIMIT;
}
}
//最大使用内存超过物理内存的80%,则发出警告,因为这很有可能引起OOM
if (totalPhysicalMemoryOnNM != UNKNOWN_MEMORY_LIMIT &&
this.maxPmemAllottedForContainers > totalPhysicalMemoryOnNM * 0.80f) {
LOG.warn("NodeManager configured with "
+ TraditionalBinaryPrefix.long2String(maxPmemAllottedForContainers,
"", 1)
+ " physical memory allocated to containers, which is more than "
+ "80% of the total physical memory available ("
+ TraditionalBinaryPrefix.long2String(totalPhysicalMemoryOnNM, "",
1) + "). Thrashing might happen.");
}
}
super.serviceInit(conf);
}NodeStatusUpdater服务:
protected void serviceInit(Configuration conf) throws Exception {
//获得内存和虚拟CPU核心相关配置
int memoryMb =
conf.getInt(
YarnConfiguration.NM_PMEM_MB, YarnConfiguration.DEFAULT_NM_PMEM_MB);
float vMemToPMem =
conf.getFloat(
YarnConfiguration.NM_VMEM_PMEM_RATIO,
YarnConfiguration.DEFAULT_NM_VMEM_PMEM_RATIO);
int virtualMemoryMb = (int)Math.ceil(memoryMb * vMemToPMem);
int virtualCores =
conf.getInt(
YarnConfiguration.NM_VCORES, YarnConfiguration.DEFAULT_NM_VCORES);
//记录上述资源配置
this.totalResource = Resource.newInstance(memoryMb, virtualCores);
//将资源记录至指标监控系统
metrics.addResource(totalResource);
this.tokenKeepAliveEnabled = isTokenKeepAliveEnabled(conf);
this.tokenRemovalDelayMs =
conf.getInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS,
YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS);
this.minimumResourceManagerVersion = conf.get(
YarnConfiguration.NM_RESOURCEMANAGER_MINIMUM_VERSION,
YarnConfiguration.DEFAULT_NM_RESOURCEMANAGER_MINIMUM_VERSION);
// Default duration to track stopped containers on nodemanager is 10Min.
// This should not be assigned very large value as it will remember all the
// containers stopped during that time.
//记录执行完毕容器的时间间隔,默认10分钟
duratiOnToTrackStoppedContainers=
conf.getLong(YARN_NODEMANAGER_DURATION_TO_TRACK_STOPPED_CONTAINERS,
600000);
if (durationToTrackStoppedContainers <0) {
String message = "Invalid configuration for "
+ YARN_NODEMANAGER_DURATION_TO_TRACK_STOPPED_CONTAINERS + " default "
+ "value is 10Min(600000).";
LOG.error(message);
throw new YarnException(message);
}
if (LOG.isDebugEnabled()) {
LOG.debug(YARN_NODEMANAGER_DURATION_TO_TRACK_STOPPED_CONTAINERS + " :"
+ durationToTrackStoppedContainers);
}
super.serviceInit(conf);
LOG.info("Initialized nodemanager for " + nodeId + ":" +
" physical-memory=" + memoryMb + " virtual-memory=" + virtualMemoryMb +
" virtual-cores=" + virtualCores);
}