热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Hadoop2.xNodeManager启动之服务初始化

NM和RM类似,也是以服务的形式启动,但服务相对于RM来说少些,也经历服务初始化和服务启动两个阶段,NM继承自CompositeService。NM的服务初始化列表如下:

  NM和RM类似,也是以服务的形式启动,但服务相对于RM来说少些,也经历服务初始化和服务启动两个阶段,NM继承自CompositeService。NM的服务初始化列表如下:

bubuko.com,布布扣

NM的启动入口

public static void main(String[] args) {
//异常处理
Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
StringUtils.startupShutdownMessage(NodeManager.class, args, LOG);
//创建初始NM
NodeManager nodeManager = new NodeManager();
//载入控制文件
Configuration cOnf= new YarnConfiguration();
setHttpPolicy(conf);
//初始化并启动
nodeManager.initAndStartNodeManager(conf, false);
}下面进入initAndStartNodeManager函数,该函数主要包含两个功能:init和start

private void initAndStartNodeManager(Configuration conf, boolean hasToReboot) {
try {
.....
.....
this.init(conf);//初始化
this.start();//启动
} catch (Throwable t) {
LOG.fatal("Error starting NodeManager", t);
System.exit(-1);
}
}服务初始化最终进入NM的serviceInit函数,该函数比较清晰的描述了NM服务用到的子服务

@Override
protected void serviceInit(Configuration conf) throws Exception {
conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true);
//容器令牌管理器
NMContainerTokenSecretManager cOntainerTokenSecretManager=
new NMContainerTokenSecretManager(conf);
//NM令牌管理
NMTokenSecretManagerInNM nmTokenSecretManager =
new NMTokenSecretManagerInNM();
//APP权限管理
this.aclsManager = new ApplicationACLsManager(conf);
//容器启动器,非常重要
ContainerExecutor exec = ReflectionUtils.newInstance(
conf.getClass(YarnConfiguration.NM_CONTAINER_EXECUTOR,
DefaultContainerExecutor.class, ContainerExecutor.class), conf);
try {
exec.init();
} catch (IOException e) {
throw new YarnRuntimeException("Failed to initialize container executor", e);
}
//文件清理服务
DeletionService del = createDeletionService(exec);
addService(del);
// NodeManager level dispatcher
//事件分发器
this.dispatcher = new AsyncDispatcher();
//节点健康检查
nodeHealthChecker = new NodeHealthCheckerService();
addService(nodeHealthChecker);
dirsHandler = nodeHealthChecker.getDiskHandler();
this.cOntext= createNMContext(containerTokenSecretManager,
nmTokenSecretManager);
//状态更新服务
nodeStatusUpdater =
createNodeStatusUpdater(context, dispatcher, nodeHealthChecker);
//节点资源监控
NodeResourceMonitor nodeResourceMOnitor= createNodeResourceMonitor();
addService(nodeResourceMonitor);
//容器管理器
cOntainerManager=
createContainerManager(context, exec, del, nodeStatusUpdater,
this.aclsManager, dirsHandler);
addService(containerManager);
((NMContext) context).setContainerManager(containerManager);
//web服务
WebServer webServer = createWebServer(context, containerManager
.getContainersMonitor(), this.aclsManager, dirsHandler);
addService(webServer);
((NMContext) context).setWebServer(webServer);
dispatcher.register(ContainerManagerEventType.class, containerManager);
dispatcher.register(NodeManagerEventType.class, this);
addService(dispatcher);
//指标监控系统
DefaultMetricsSystem.initialize("NodeManager");
// StatusUpdater should be added last so that it get started last
// so that we make sure everything is up before registering with RM.
addService(nodeStatusUpdater);
//循环初始化上述服务
super.serviceInit(conf);
// TODO add local dirs to del
}DeletionService服务:主要用于删除文件,该服务启动了一个线程池,默认核心数为4,有文件需要删除时会构建FileDeletionTask放入线程池

@Override
protected void serviceInit(Configuration conf) throws Exception {
//创建线程工厂
ThreadFactory tf = new ThreadFactoryBuilder()
.setNameFormat("DeletionService #%d")
.build();
if (conf != null) {
//构建线程池
sched = new ScheduledThreadPoolExecutor(
conf.getInt(YarnConfiguration.NM_DELETE_THREAD_COUNT, YarnConfiguration.DEFAULT_NM_DELETE_THREAD_COUNT),
tf);
debugDelay = conf.getInt(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 0);
} else {
sched = new ScheduledThreadPoolExecutor(YarnConfiguration.DEFAULT_NM_DELETE_THREAD_COUNT,
tf);
}
sched.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
sched.setKeepAliveTime(60L, SECONDS);
super.serviceInit(conf);
}

ContainerManager:容器管理器,相关参数yarn.nodemanager.sleep-delay-before-sigkill.ms yarn.nodemanager.process-kill-wait.ms,包含多个子服务,列表如下:

bubuko.com,布布扣

//ContainerManagerImpl.java
@Override
public void serviceInit(Configuration conf) throws Exception {
LogHandler logHandler =
createLogHandler(conf, this.context, this.deletionService);
addIfService(logHandler);
dispatcher.register(LogHandlerEventType.class, logHandler);
waitForCOntainersOnShutdownMillis=
conf.getLong(YarnConfiguration.NM_SLEEP_DELAY_BEFORE_SIGKILL_MS,
YarnConfiguration.DEFAULT_NM_SLEEP_DELAY_BEFORE_SIGKILL_MS) +
conf.getLong(YarnConfiguration.NM_PROCESS_KILL_WAIT_MS,
YarnConfiguration.DEFAULT_NM_PROCESS_KILL_WAIT_MS) +
SHUTDOWN_CLEANUP_SLOP_MS;
//此处会启动各个子服务
super.serviceInit(conf);
}ContainerManager之ResourceLocalizationService:负责本地初始化,建立相应目录,相关参数yarn.nodemanager.localizer.cache.target-size-mb yarn.nodemanager.localizer.cache.cleanup.interval-ms 

@Override
public void serviceInit(Configuration conf) throws Exception {
this.validateConf(conf);
this.publicRsrc =
new LocalResourcesTrackerImpl(null, dispatcher, true, conf);
this.recordFactory = RecordFactoryProvider.getRecordFactory(conf);
try {
FileContext lfs = getLocalFileContext(conf);
lfs.setUMask(new FsPermission((short)FsPermission.DEFAULT_UMASK));
//清理本地目录
cleanUpLocalDir(lfs,delService);
//建立初始化目录
List localDirs = dirsHandler.getLocalDirs();
for (String localDir : localDirs) {
// $local/usercache
Path userDir = new Path(localDir, ContainerLocalizer.USERCACHE);
lfs.mkdir(userDir, null, true);
// $local/filecache
Path fileDir = new Path(localDir, ContainerLocalizer.FILECACHE);
lfs.mkdir(fileDir, null, true);
// $local/nmPrivate
Path sysDir = new Path(localDir, NM_PRIVATE_DIR);
lfs.mkdir(sysDir, NM_PRIVATE_PERM, true);
}
//日志目录创建
List logDirs = dirsHandler.getLogDirs();
for (String logDir : logDirs) {
lfs.mkdir(new Path(logDir), null, true);
}
} catch (IOException e) {
throw new YarnRuntimeException("Failed to initialize LocalizationService", e);
}
//初始化相关参数设置
cacheTargetSize =
conf.getLong(YarnConfiguration.NM_LOCALIZER_CACHE_TARGET_SIZE_MB, YarnConfiguration.DEFAULT_NM_LOCALIZER_CACHE_TARGET_SIZE_MB) <<20;
cacheCleanupPeriod =
conf.getLong(YarnConfiguration.NM_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS, YarnConfiguration.DEFAULT_NM_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS);
localizatiOnServerAddress= conf.getSocketAddr(
YarnConfiguration.NM_LOCALIZER_ADDRESS,
YarnConfiguration.DEFAULT_NM_LOCALIZER_ADDRESS,
YarnConfiguration.DEFAULT_NM_LOCALIZER_PORT);
localizerTracker = createLocalizerTracker(conf);
addService(localizerTracker);
dispatcher.register(LocalizerEventType.class, localizerTracker);
super.serviceInit(conf);
}ContainerManager之containers-launcher:容器启动服务,初始化过程比较简单,如下

@Override
protected void serviceInit(Configuration conf) throws Exception {
try {
//TODO Is this required?
FileContext.getLocalFSFileContext(conf);
} catch (UnsupportedFileSystemException e) {
throw new YarnRuntimeException("Failed to start ContainersLauncher", e);
}
super.serviceInit(conf);
}ContainerManager之containers-monitor:容器状态监控,主要是内存合理性方面的校验

@Override
protected void serviceInit(Configuration conf) throws Exception {
//监控频率设定
this.mOnitoringInterval=
conf.getLong(YarnConfiguration.NM_CONTAINER_MON_INTERVAL_MS,
YarnConfiguration.DEFAULT_NM_CONTAINER_MON_INTERVAL_MS);
//资源计算类,可由yarn.nodemanager.container-monitor.resource-calculator.class指定
Class clazz =
conf.getClass(YarnConfiguration.NM_CONTAINER_MON_RESOURCE_CALCULATOR, null,
ResourceCalculatorPlugin.class);
this.resourceCalculatorPlugin =
ResourceCalculatorPlugin.getResourceCalculatorPlugin(clazz, conf);
LOG.info(" Using ResourceCalculatorPlugin : "
+ this.resourceCalculatorPlugin);
processTreeClass = conf.getClass(YarnConfiguration.NM_CONTAINER_MON_PROCESS_TREE, null,
ResourceCalculatorProcessTree.class);
this.cOnf= conf;
LOG.info(" Using ResourceCalculatorProcessTree : "
+ this.processTreeClass);
//容器运行内存设置
long cOnfiguredPMemForContainers= conf.getLong(
YarnConfiguration.NM_PMEM_MB,
YarnConfiguration.DEFAULT_NM_PMEM_MB) * 1024 * 1024l;
// Setting these irrespective of whether checks are enabled. Required in
// the UI.
// ///////// Physical memory configuration //////
//最大物理内存
this.maxPmemAllottedForCOntainers= configuredPMemForContainers;
// ///////// Virtual memory configuration //////
//最大虚拟内存计算
float vmemRatio = conf.getFloat(YarnConfiguration.NM_VMEM_PMEM_RATIO,
YarnConfiguration.DEFAULT_NM_VMEM_PMEM_RATIO);
Preconditions.checkArgument(vmemRatio > 0.99f,
YarnConfiguration.NM_VMEM_PMEM_RATIO + " should be at least 1.0");
this.maxVmemAllottedForCOntainers=
(long) (vmemRatio * configuredPMemForContainers);
//以下都是内存检测操作,这些操作通过资源计算插件resourceCalculatorPlugin来实现
pmemCheckEnabled = conf.getBoolean(YarnConfiguration.NM_PMEM_CHECK_ENABLED,
YarnConfiguration.DEFAULT_NM_PMEM_CHECK_ENABLED);
vmemCheckEnabled = conf.getBoolean(YarnConfiguration.NM_VMEM_CHECK_ENABLED,
YarnConfiguration.DEFAULT_NM_VMEM_CHECK_ENABLED);
LOG.info("Physical memory check enabled: " + pmemCheckEnabled);
LOG.info("Virtual memory check enabled: " + vmemCheckEnabled);
//物理内存计算
if (pmemCheckEnabled) {
// Logging if actual pmem cannot be determined.
long totalPhysicalMemoryOnNM= UNKNOWN_MEMORY_LIMIT;
if (this.resourceCalculatorPlugin != null) {
totalPhysicalMemoryOnNM= this.resourceCalculatorPlugin
.getPhysicalMemorySize();
if (totalPhysicalMemoryOnNM <= 0) {
LOG.warn("NodeManager‘s totalPmem could not be calculated. "
+ "Setting it to " + UNKNOWN_MEMORY_LIMIT);
totalPhysicalMemoryOnNM= UNKNOWN_MEMORY_LIMIT;
}
}
//最大使用内存超过物理内存的80%,则发出警告,因为这很有可能引起OOM
if (totalPhysicalMemoryOnNM != UNKNOWN_MEMORY_LIMIT &&
this.maxPmemAllottedForContainers > totalPhysicalMemoryOnNM * 0.80f) {
LOG.warn("NodeManager configured with "
+ TraditionalBinaryPrefix.long2String(maxPmemAllottedForContainers,
"", 1)
+ " physical memory allocated to containers, which is more than "
+ "80% of the total physical memory available ("
+ TraditionalBinaryPrefix.long2String(totalPhysicalMemoryOnNM, "",
1) + "). Thrashing might happen.");
}
}
super.serviceInit(conf);
}NodeStatusUpdater服务:

@Override
protected void serviceInit(Configuration conf) throws Exception {
//获得内存和虚拟CPU核心相关配置
int memoryMb =
conf.getInt(
YarnConfiguration.NM_PMEM_MB, YarnConfiguration.DEFAULT_NM_PMEM_MB);
float vMemToPMem =
conf.getFloat(
YarnConfiguration.NM_VMEM_PMEM_RATIO,
YarnConfiguration.DEFAULT_NM_VMEM_PMEM_RATIO);
int virtualMemoryMb = (int)Math.ceil(memoryMb * vMemToPMem);
int virtualCores =
conf.getInt(
YarnConfiguration.NM_VCORES, YarnConfiguration.DEFAULT_NM_VCORES);
//记录上述资源配置
this.totalResource = Resource.newInstance(memoryMb, virtualCores);
//将资源记录至指标监控系统
metrics.addResource(totalResource);
this.tokenKeepAliveEnabled = isTokenKeepAliveEnabled(conf);
this.tokenRemovalDelayMs =
conf.getInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS,
YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS);
this.minimumResourceManagerVersion = conf.get(
YarnConfiguration.NM_RESOURCEMANAGER_MINIMUM_VERSION,
YarnConfiguration.DEFAULT_NM_RESOURCEMANAGER_MINIMUM_VERSION);
// Default duration to track stopped containers on nodemanager is 10Min.
// This should not be assigned very large value as it will remember all the
// containers stopped during that time.
//记录执行完毕容器的时间间隔,默认10分钟
duratiOnToTrackStoppedContainers=
conf.getLong(YARN_NODEMANAGER_DURATION_TO_TRACK_STOPPED_CONTAINERS,
600000);
if (durationToTrackStoppedContainers <0) {
String message = "Invalid configuration for "
+ YARN_NODEMANAGER_DURATION_TO_TRACK_STOPPED_CONTAINERS + " default "
+ "value is 10Min(600000).";
LOG.error(message);
throw new YarnException(message);
}
if (LOG.isDebugEnabled()) {
LOG.debug(YARN_NODEMANAGER_DURATION_TO_TRACK_STOPPED_CONTAINERS + " :"
+ durationToTrackStoppedContainers);
}
super.serviceInit(conf);
LOG.info("Initialized nodemanager for " + nodeId + ":" +
" physical-memory=" + memoryMb + " virtual-memory=" + virtualMemoryMb +
" virtual-cores=" + virtualCores);
}

Hadoop2.x NodeManager启动之服务初始化,布布扣,bubuko.com


推荐阅读
  • malloc 是 C 语言中的一个标准库函数,全称为 memory allocation,即动态内存分配。它用于在程序运行时申请一块指定大小的连续内存区域,并返回该区域的起始地址。当无法预先确定内存的具体位置时,可以通过 malloc 动态分配内存。 ... [详细]
  • 本文介绍了一种支付平台异步风控系统的架构模型,旨在为开发类似系统的工程师提供参考。 ... [详细]
  • Python多线程详解与示例
    本文介绍了Python中的多线程编程,包括僵尸进程和孤儿进程的概念,并提供了具体的代码示例。同时,详细解释了0号进程和1号进程在系统中的作用。 ... [详细]
  • 本文详细介绍了Linux系统中用于管理IPC(Inter-Process Communication)资源的两个重要命令:ipcs和ipcrm。通过这些命令,用户可以查看和删除系统中的消息队列、共享内存和信号量。 ... [详细]
  • NX二次开发:UFUN点收集器UF_UI_select_point_collection详解
    本文介绍了如何在NX中使用UFUN库进行点收集器的二次开发,包括必要的头文件包含、初始化和选择点集合的具体实现。 ... [详细]
  • 解决SQL Server数据库sa登录名无法连接的问题
    在安装SQL Server数据库后,使用Windows身份验证成功,但使用SQL Server身份验证时遇到问题。本文将介绍如何通过设置sa登录名的密码、启用登录名状态以及开启TCP协议来解决这一问题。 ... [详细]
  • 本文介绍了如何在 ASP.NET 中设置 Excel 单元格格式为文本,获取多个单元格区域并作为表头,以及进行单元格合并、赋值、格式设置等操作。 ... [详细]
  • LDAP服务器配置与管理
    本文介绍如何通过安装和配置SSSD服务来统一管理用户账户信息,并实现其他系统的登录调用。通过图形化交互界面配置LDAP服务器,确保用户账户信息的集中管理和安全访问。 ... [详细]
  • 本文详细介绍了Java代码分层的基本概念和常见分层模式,特别是MVC模式。同时探讨了不同项目需求下的分层策略,帮助读者更好地理解和应用Java分层思想。 ... [详细]
  • 如果应用程序经常播放密集、急促而又短暂的音效(如游戏音效)那么使用MediaPlayer显得有些不太适合了。因为MediaPlayer存在如下缺点:1)延时时间较长,且资源占用率高 ... [详细]
  • 基于iSCSI的SQL Server 2012群集测试(一)SQL群集安装
    一、测试需求介绍与准备公司计划服务器迁移过程计划同时上线SQLServer2012,引入SQLServer2012群集提高高可用性,需要对SQLServ ... [详细]
  • 网络爬虫的规范与限制
    本文探讨了网络爬虫引发的问题及其解决方案,重点介绍了Robots协议的作用和使用方法,旨在为网络爬虫的合理使用提供指导。 ... [详细]
  • [c++基础]STL
    cppfig15_10.cppincludeincludeusingnamespacestd;templatevoidprintVector(constvector&integer ... [详细]
  • 自动验证时页面显示问题的解决方法
    在使用自动验证功能时,页面未能正确显示错误信息。通过使用 `dump($info->getError())` 可以帮助诊断和解决问题。 ... [详细]
  • 使用 Git Rebase -i 合并多个提交
    在开发过程中,频繁的小改动往往会生成多个提交记录。为了保持代码仓库的整洁,我们可以使用 git rebase -i 命令将多个提交合并成一个。 ... [详细]
author-avatar
手机用户2502873943
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有