热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

【原创】大数据基础之Hadoop(3)yarn数据收集与监控

yarn常用restapi1metrics#curlhttp:localhost:8088wsv1clustermetricsTheclusterm
yarn常用rest api

1 metrics

# curl http://localhost:8088/ws/v1/cluster/metrics

The cluster metrics resource provides some overall metrics about the cluster. More detailed metrics should be retrieved from the jmx interface.

{
  "clusterMetrics":
  {
    "appsSubmitted":0,
    "appsCompleted":0,
    "appsPending":0,
    "appsRunning":0,
    "appsFailed":0,
    "appsKilled":0,
    "reservedMB":0,
    "availableMB":17408,
    "allocatedMB":0,
    "reservedVirtualCores":0,
    "availableVirtualCores":7,
    "allocatedVirtualCores":1,
    "containersAllocated":0,
    "containersReserved":0,
    "containersPending":0,
    "totalMB":17408,
    "totalVirtualCores":8,
    "totalNodes":1,
    "lostNodes":0,
    "unhealthyNodes":0,
    "decommissionedNodes":0,
    "rebootedNodes":0,
    "activeNodes":1
  }
}

2 scheduler

# curl http://localhost:8088/ws/v1/cluster/scheduler

A scheduler resource contains information about the current scheduler configured in a cluster. It currently supports both the Fifo and Capacity Scheduler. You will get different information depending on which scheduler is configured so be sure to look at the type information.

{
    "scheduler": {
        "schedulerInfo": {
            "capacity": 100.0, 
            "maxCapacity": 100.0, 
            "queueName": "root", 
            "queues": {
                "queue": [
                    {
                        "absoluteCapacity": 10.5, 
                        "absoluteMaxCapacity": 50.0, 
                        "absoluteUsedCapacity": 0.0, 
                        "capacity": 10.5, 
                        "maxCapacity": 50.0, 
                        "numApplications": 0, 
                        "queueName": "a", 
                        "queues": {
                            "queue": [
                                {
                                    "absoluteCapacity": 3.15, 
                                    "absoluteMaxCapacity": 25.0, 
                                    "absoluteUsedCapacity": 0.0, 
                                    "capacity": 30.000002, 
                                    "maxCapacity": 50.0, 
                                    "numApplications": 0, 
                                    "queueName": "a1",
...

3 apps

# curl http://localhost:8088/ws/v1/cluster/apps

With the Applications API, you can obtain a collection of resources, each of which represents an application. When you run a GET operation on this resource, you obtain a collection of Application Objects.

支持参数:

* state [deprecated] - state of the application
* states - applications matching the given application states, specified as a comma-separated list.
* finalStatus - the final status of the application - reported by the application itself
* user - user name
* queue - queue name
* limit - total number of app objects to be returned
* startedTimeBegin - applications with start time beginning with this time, specified in ms since epoch
* startedTimeEnd - applications with start time ending with this time, specified in ms since epoch
* finishedTimeBegin - applications with finish time beginning with this time, specified in ms since epoch
* finishedTimeEnd - applications with finish time ending with this time, specified in ms since epoch
* applicationTypes - applications matching the given application types, specified as a comma-separated list.
* applicationTags - applications matching any of the given application tags, specified as a comma-separated list.

 

{
  "apps":
  {
    "app":
    [
       {
          "finishedTime" : 1326815598530,
          "amContainerLogs" : "http://host.domain.com:8042/node/containerlogs/container_1326815542473_0001_01_000001",
          "trackingUI" : "History",
          "state" : "FINISHED",
          "user" : "user1",
          "id" : "application_1326815542473_0001",
          "clusterId" : 1326815542473,
          "finalStatus" : "SUCCEEDED",
          "amHostHttpAddress" : "host.domain.com:8042",
          "progress" : 100,
          "name" : "word count",
          "startedTime" : 1326815573334,
          "elapsedTime" : 25196,
          "diagnostics" : "",
          "trackingUrl" : "http://host.domain.com:8088/proxy/application_1326815542473_0001/jobhistory/job/job_1326815542473_1_1",
          "queue" : "default",
          "allocatedMB" : 0,
          "allocatedVCores" : 0,
          "runningContainers" : 0,
          "memorySeconds" : 151730,
          "vcoreSeconds" : 103
       },
       {
          "finishedTime" : 1326815789546,
          "amContainerLogs" : "http://host.domain.com:8042/node/containerlogs/container_1326815542473_0002_01_000001",
          "trackingUI" : "History",
          "state" : "FINISHED",
          "user" : "user1",
          "id" : "application_1326815542473_0002",
          "clusterId" : 1326815542473,
          "finalStatus" : "SUCCEEDED",
          "amHostHttpAddress" : "host.domain.com:8042",
          "progress" : 100,
          "name" : "Sleep job",
          "startedTime" : 1326815641380,
          "elapsedTime" : 148166,
          "diagnostics" : "",
          "trackingUrl" : "http://host.domain.com:8088/proxy/application_1326815542473_0002/jobhistory/job/job_1326815542473_2_2",
          "queue" : "default",
          "allocatedMB" : 0,
          "allocatedVCores" : 0,
          "runningContainers" : 1,
          "memorySeconds" : 640064,
          "vcoreSeconds" : 442
       } 
    ]
  }
}

 

收集shell脚本示例

metrics

#!/bin/sh

cluster_name="c1"
rms="192.168.0.1 192.168.0.2"

url_path="/ws/v1/cluster/metrics"
keyword="clusterMetrics"
log_name="metrics.log"

base_dir="/tmp"
log_path=${base_dir}/${log_name}

echo "`date +\'%Y-%m-%d %H:%M:%S\'`"
for rm in $rms
do
        url="http://${rm}:8088${url_path}"
        echo $url
        cOntent=`curl $url`
        echo $content
        if [[ "$content" == *"$keyword"* ]]; then
                break
        fi
done
if [[ "$content" == *"$keyword"* ]]; then
        modified="${content:0:$((${#content}-1))},\"currentTime\":`date +%s`,\"clusterName\":\"${cluster_name}\"}"
        echo "$modified"
        echo "$modified" >> $log_path
else
        echo "gather metrics failed from : ${rms}, ${url_path}, ${keyword}"
fi

 

apps

#!/bin/sh

cluster_name="c1"
rms="192.168.0.1 192.168.0.2"

url_path="/ws/v1/cluster/apps?states=RUNNING"
keyword="apps"
log_name="apps.log"

base_dir="/tmp"
log_path=${base_dir}/${log_name}

echo "`date +\'%Y-%m-%d %H:%M:%S\'`"
for rm in $rms
do
        url="http://${rm}:8088${url_path}"
        echo $url
        cOntent=`curl $url`
        echo $content
        if [[ "$content" == *"$keyword"* ]]; then
                break
        fi
done
if [[ "$content" == *"$keyword"* ]]; then
        if [[ "$content" == *"application_"* ]]; then
                postfix=",\"currentTime\":`date +%s`,\"clusterName\":\"${cluster_name}\"}"
                modified="${content:16:$((${#content}-20))}"
                echo "${modified//\"/\\\"}"|awk \'{split($0,arr,"},"); for (i in arr) {print arr[i]}}\'|xargs -i echo "{}$postfix" >> $log_path
        else
                echo "no apps is running"
        fi
else
        echo "gather metrics failed from : ${rms}, ${url_path}, ${keyword}"
fi

 

然后对接ELK

 

ELK

Logstash配置示例

metrics1:input json+filter mutate rename

input {
  file {
    path => "/tmp/metrics.log"
    codec => "json"
  }
}
filter {
  mutate {
    rename => { 
      "[clusterMetrics][appsSubmitted]" => "[appsSubmitted]"
      "[clusterMetrics][appsCompleted]" => "[appsCompleted]"
      "[clusterMetrics][appsPending]" => "[appsPending]"
      "[clusterMetrics][appsRunning]" => "[appsRunning]"
      "[clusterMetrics][appsFailed]" => "[appsFailed]"
      "[clusterMetrics][appsKilled]" => "[appsKilled]"
      "[clusterMetrics][reservedMB]" => "[reservedMB]"
      "[clusterMetrics][availableMB]" => "[availableMB]"
      "[clusterMetrics][allocatedMB]" => "[allocatedMB]"
      "[clusterMetrics][reservedVirtualCores]" => "[reservedVirtualCores]"
      "[clusterMetrics][availableVirtualCores]" => "[availableVirtualCores]"
      "[clusterMetrics][allocatedVirtualCores]" => "[allocatedVirtualCores]"
      "[clusterMetrics][containersAllocated]" => "[containersAllocated]"
      "[clusterMetrics][containersReserved]" => "[containersReserved]"
      "[clusterMetrics][containersPending]" => "[containersPending]"
      "[clusterMetrics][totalMB]" => "[totalMB]"
      "[clusterMetrics][totalVirtualCores]" => "[totalVirtualCores]"
      "[clusterMetrics][totalNodes]" => "[totalNodes]"
      "[clusterMetrics][lostNodes]" => "[lostNodes]"
      "[clusterMetrics][unhealthyNodes]" => "[unhealthyNodes]"
      "[clusterMetrics][decommissionedNodes]" => "[decommissionedNodes]"
      "[clusterMetrics][rebootedNodes]" => "[rebootedNodes]"
      "[clusterMetrics][activeNodes]" => "[activeNodes]"
    }
    remove_field => ["clusterMetrics", "path"]
  }
#  ruby {
#    code => "event.set(\'@timestamp\', LogStash::Timestamp.at(event.get(\'currentTime\') + 28800))"
#  }
  date {
    match => [ "currentTime","UNIX"]
    target => "@timestamp"
  }
}

 

metrics2:filter json+filter mutate add_field

input {
  file {
    path => "/tmp/metrics.log"
  }
}
filter {
  json {
    source => "message"
  }
  mutate {
    add_field => {
      "appsSubmitted" => "%{[clusterMetrics][appsSubmitted]}"
      "appsCompleted" => "%{[clusterMetrics][appsCompleted]}"
      "appsPending" => "%{[clusterMetrics][appsPending]}"
      "appsRunning" => "%{[clusterMetrics][appsRunning]}"
      "appsFailed" => "%{[clusterMetrics][appsFailed]}"
      "appsKilled" => "%{[clusterMetrics][appsKilled]}"
      "reservedMB" => "%{[clusterMetrics][reservedMB]}"
      "availableMB" => "%{[clusterMetrics][availableMB]}"
      "allocatedMB" => "%{[clusterMetrics][allocatedMB]}"
      "reservedVirtualCores" => "%{[clusterMetrics][reservedVirtualCores]}"
      "availableVirtualCores" => "%{[clusterMetrics][availableVirtualCores]}"
      "allocatedVirtualCores" => "%{[clusterMetrics][allocatedVirtualCores]}"
      "containersAllocated" => "%{[clusterMetrics][containersAllocated]}"
      "containersReserved" => "%{[clusterMetrics][containersReserved]}"
      "containersPending" => "%{[clusterMetrics][containersPending]}"
      "totalMB" => "%{[clusterMetrics][totalMB]}"
      "totalVirtualCores" => "%{[clusterMetrics][totalVirtualCores]}"
      "totalNodes" => "%{[clusterMetrics][totalNodes]}"
      "lostNodes" => "%{[clusterMetrics][lostNodes]}"
      "unhealthyNodes" => "%{[clusterMetrics][unhealthyNodes]}"
      "decommissionedNodes" => "%{[clusterMetrics][decommissionedNodes]}"
      "rebootedNodes" => "%{[clusterMetrics][rebootedNodes]}"
      "activeNodes" => "%{[clusterMetrics][activeNodes]}"
    }
    convert => {
      "appsSubmitted" => "integer"
      "appsCompleted" => "integer"
      "appsPending" => "integer"
      "appsRunning" => "integer"
      "appsFailed" => "integer"
      "appsKilled" => "integer"
      "reservedMB" => "integer"
      "availableMB" => "integer"
      "allocatedMB" => "integer"
      "reservedVirtualCores" => "integer"
      "availableVirtualCores" => "integer"
      "allocatedVirtualCores" => "integer"
      "containersAllocated" => "integer"
      "containersReserved" => "integer"
      "containersPending" => "integer"
      "totalMB" => "integer"
      "totalVirtualCores" => "integer"
      "totalNodes" => "integer"
      "lostNodes" => "integer"
      "unhealthyNodes" => "integer"
      "decommissionedNodes" => "integer"
      "rebootedNodes" => "integer"
      "activeNodes" => "integer"
    }
    remove_field => ["message", "clusterMetrics", "path"]
  }
#  ruby {
#    code => "event.set(\'@timestamp\', LogStash::Timestamp.at(event.get(\'currentTime\') + 28800))"
#  }
  date {
    match => [ "currentTime","UNIX"]
    target => "@timestamp"
  }
}

 

app:input json

input {
  file {
    path => "/tmp/apps.log"
    codec => "json"
  }
}
filter {
#  ruby {
#    code => "event.set(\'@timestamp\', LogStash::Timestamp.at(event.get(\'currentTime\') + 28800))"
#  }
  date {
    match => [ "currentTime","UNIX"]
    target => "@timestamp"
  }
}

 

注意:

date插件得到的timestamp为UTC时区,

1)如果是存放到elasticsearch然后用kibana展示(kibana会自动根据浏览器时区做偏移),直接使用UTC时区就可以;

2)如果是存放到其他存储,想直接存储当前时区的时间,需要指定timezone,但是date插件使用UNIX格式时timezone不会生效,会使用系统默认时区,所以使用ruby插件转换时区;

Unix timestamps (i.e. seconds since the epoch) are by definition always UTC and @timestamp is also always UTC. The timezone option indicates the timezone of the source timestamp, but doesn\'t really apply when the UNIX or UNIX_MS patterns are used.

所有timezone:http://joda-time.sourceforge.net/timezones.html

 

Kibana展示示例

 

参考:

https://hadoop.apache.org/docs/r2.7.3/hadoop-yarn/hadoop-yarn-site/ResourceManagerRest.html

https://discuss.elastic.co/t/new-timestamp-using-dynamic-timezone-not-working/97166

 


推荐阅读
  • 如何自行分析定位SAP BSP错误
    The“BSPtag”Imentionedintheblogtitlemeansforexamplethetagchtmlb:configCelleratorbelowwhichi ... [详细]
  • 本文分享了一个关于在C#中使用异步代码的问题,作者在控制台中运行时代码正常工作,但在Windows窗体中却无法正常工作。作者尝试搜索局域网上的主机,但在窗体中计数器没有减少。文章提供了相关的代码和解决思路。 ... [详细]
  • Metasploit攻击渗透实践
    本文介绍了Metasploit攻击渗透实践的内容和要求,包括主动攻击、针对浏览器和客户端的攻击,以及成功应用辅助模块的实践过程。其中涉及使用Hydra在不知道密码的情况下攻击metsploit2靶机获取密码,以及攻击浏览器中的tomcat服务的具体步骤。同时还讲解了爆破密码的方法和设置攻击目标主机的相关参数。 ... [详细]
  • http:my.oschina.netleejun2005blog136820刚看到群里又有同学在说HTTP协议下的Get请求参数长度是有大小限制的,最大不能超过XX ... [详细]
  • 本文介绍了在Linux下安装和配置Kafka的方法,包括安装JDK、下载和解压Kafka、配置Kafka的参数,以及配置Kafka的日志目录、服务器IP和日志存放路径等。同时还提供了单机配置部署的方法和zookeeper地址和端口的配置。通过实操成功的案例,帮助读者快速完成Kafka的安装和配置。 ... [详细]
  • 本文介绍了在MFC下利用C++和MFC的特性动态创建窗口的方法,包括继承现有的MFC类并加以改造、插入工具栏和状态栏对象的声明等。同时还提到了窗口销毁的处理方法。本文详细介绍了实现方法并给出了相关注意事项。 ... [详细]
  • 如何在php文件中添加图片?
    本文详细解答了如何在php文件中添加图片的问题,包括插入图片的代码、使用PHPword在载入模板中插入图片的方法,以及使用gd库生成不同类型的图像文件的示例。同时还介绍了如何生成一个正方形文件的步骤。希望对大家有所帮助。 ... [详细]
  • PHP设置MySQL字符集的方法及使用mysqli_set_charset函数
    本文介绍了PHP设置MySQL字符集的方法,详细介绍了使用mysqli_set_charset函数来规定与数据库服务器进行数据传送时要使用的字符集。通过示例代码演示了如何设置默认客户端字符集。 ... [详细]
  • Java序列化对象传给PHP的方法及原理解析
    本文介绍了Java序列化对象传给PHP的方法及原理,包括Java对象传递的方式、序列化的方式、PHP中的序列化用法介绍、Java是否能反序列化PHP的数据、Java序列化的原理以及解决Java序列化中的问题。同时还解释了序列化的概念和作用,以及代码执行序列化所需要的权限。最后指出,序列化会将对象实例的所有字段都进行序列化,使得数据能够被表示为实例的序列化数据,但只有能够解释该格式的代码才能够确定数据的内容。 ... [详细]
  • WhenIusepythontoapplythepymysqlmoduletoaddafieldtoatableinthemysqldatabase,itdo ... [详细]
  • 先看官方文档TheJavaTutorialshavebeenwrittenforJDK8.Examplesandpracticesdescribedinthispagedontta ... [详细]
  • 本文介绍了在MacOS系统上安装MySQL的步骤,并详细说明了如何设置MySQL服务的开机启动和如何修改MySQL的密码。通过下载MySQL的macos版本并按照提示一步一步安装,在系统偏好设置中可以找到MySQL的图标进行设置。同时,还介绍了通过终端命令来修改MySQL的密码的具体操作步骤。 ... [详细]
  • MySQL语句大全:创建、授权、查询、修改等【MySQL】的使用方法详解
    本文详细介绍了MySQL语句的使用方法,包括创建用户、授权、查询、修改等操作。通过连接MySQL数据库,可以使用命令创建用户,并指定该用户在哪个主机上可以登录。同时,还可以设置用户的登录密码。通过本文,您可以全面了解MySQL语句的使用方法。 ... [详细]
  • Postgresql备份和恢复的方法及命令行操作步骤
    本文介绍了使用Postgresql进行备份和恢复的方法及命令行操作步骤。通过使用pg_dump命令进行备份,pg_restore命令进行恢复,并设置-h localhost选项,可以完成数据的备份和恢复操作。此外,本文还提供了参考链接以获取更多详细信息。 ... [详细]
  • 本文介绍了如何使用PHP代码将表格导出为UTF8格式的Excel文件。首先,需要连接到数据库并获取表格的列名。然后,设置文件名和文件指针,并将内容写入文件。最后,设置响应头部,将文件作为附件下载。 ... [详细]
author-avatar
氵殳冼臉_885
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有