热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Kafka(三)——集群监控

任何应用功能再强大、性能再优越,如果没有与之匹配的监控,那么一切都是虚无缥缈的。监控不仅可以为应用提供运行时的数据作为依据参考,还可以迅速定位问题,提供预防及告警等功能,很大程度上增强了整体服务的鲁棒性。

一、Kafka监控指标与获取

Kafka监控的4个维度:

  • 集群信息
  • broker信息
  • topic信息
  • consumer group信息

使用JConsole访问JMX

(1)终端输入jconsole,启动Java监视和管理控制台。

(2)修改kafka-run-class.sh,使JConsole可以通过远程连接。

KAFKA_JMX_OPTS="

-Dcom.sun.management.jmxremote 

-Dcom.sun.management.jmxremote.authenticate=false  

-Dcom.sun.management.jmxremote.ssl=false 

-Djava.rmi.server.hostname=服务器的IP地址或者域名"

(3)修改kafka-server-start.sh,增加export JMX_PORT="9999"

if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
    export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
    export JMX_PORT="9999"
fi

(4)检查是否开启JMX

broker在启动过程中,始终会将JMX端口信息写入Kafka对应的位置.

Kafka(三) —— 集群监控

(5)连接

Kafka(三) —— 集群监控

(6)查看MBean

Kafka(三) —— 集群监控

MBean的名称,xxx.type=yyy,{attr} = zzz

其中xxx指的是组件名,如xxx = kafka.server
zzz 和 attr 指的是MBean的范围,例如topic = test,表示该MBean的作用范围是名为test的topic。

指标分类:

  • kafka.server 服务器端JMX指标
  • kafka.network 网络相关JMX指标
  • kafka.log 分区日志相关JMX指标
  • kafka.controller controller相关指标

使用Java程序访问JMX

(1)监控broker一分钟消息流入的速度

kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec

OneMinuteRate 表示某个broker一分钟消息流入的速度(messages/s)。

Kafka(三) —— 集群监控


public class KafkaJmxDemo {

    private MBeanServerConnection conn;

    private String jmxUrl;

    private String ipAndPort;

    public KafkaJmxDemo(String ipAndPort) {
        this.ipAndPort = ipAndPort;
    }

    /**
     * 初始化JMX连接
     *
     * @return
     */
    public boolean init() {
        jmxUrl = "service:jmx:rmi:///jndi/rmi://" + ipAndPort + "/jmxrmi";
        try {
            JMXServiceURL serviceURL = new JMXServiceURL(jmxUrl);
            JMXConnector cOnnector= JMXConnectorFactory.connect(serviceURL, null);
            cOnn= connector.getMBeanServerConnection();
            if (cOnn== null) {
                return false;
            }
        } catch (MalformedURLException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
        return true;
    }

    public double getMsgInPerSec() {
        String objectName = "kafka.server:type=BrokerTopicMetrics," +
                "name=MessagesInPerSec";
        Object val = getAttribute(objectName, "OneMinuteRate");
        if (val != null) {
            return (double) (Double) val;
        }
        return 0.0;
    }

    private Object getAttribute(String objName, String objAttr) {
        ObjectName objectName;
        try {
            objectName = new ObjectName(objName);
            return conn.getAttribute(objectName, objAttr);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }

    public static void main(String[] args) {
        KafkaJmxDemo kafkaJmxDemo = new KafkaJmxDemo("127.0.0.1:9999");
        kafkaJmxDemo.init();
        System.out.println(kafkaJmxDemo.getMsgInPerSec());
    }

}


(2)获取指定Topic、指定分区的LEO值

Kafka(三) —— 集群监控


    public long getTopicPatitionLeo(String topic, int partition) {
        String objectName = "kafka.log:type=Log,name=LogEndOffset,topic=" + topic + ",partition=" + partition;
        Object val = getAttribute(objectName, "Value");
        if (val != null) {
            return (long) (Long) val;
        }
        return 0L;
    }

(3)监控指定Topic的消息流入的速度

Kafka(三) —— 集群监控


public double getBrokerTopicMetrics(String topic) {
        String objectName = "kafka.server:type=BrokerTopicMetrics," +
                "name=BytesInPerSec,topic=" + topic;
        Object val = getAttribute(objectName, "OneMinuteRate");
        if (val != null) {
            return (double) (Double) val;
        }
        return 0.0;
    }

输出

Kafka(三) —— 集群监控

Kafka重要监控参数

(1)消息入站、出站速率

## 入站速率
kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec

## 出站速率
kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec
属性名 含义
Count broker处理过的总消息字节数
OneMinuteRate 统计过去1分钟内的消息速率
MeanRate 统计平均消息速率

二、监控系统kafka-manager

注意每一行后面不要留空格。

[repositories] 
local
aliyun: http://maven.aliyun.com/nexus/content/groups/public/
typesafe: http://repo.typesafe.com/typesafe/ivy-releases/, [organization]/[module]/(scala_[scalaVersion]/)(sbt_[sbtVersion]/)[revision]/[type]s/[artifact](-[classifier]).[ext], bootOnly
sonatype-oss-releases
maven-central
sonatype-oss-snapshots

Add Cluster时,会提示异常。

Yikes! Ask timed out on [ActorSelection[Anchor(akka://kafka-manager-system/), Path(/user/kafka-manager)]] after [5000 ms]. Message of type [kafka.manager.model.ActorModel$KMAddCluster]. A typical reason for `AskTimeoutException` is that the recipient actor didn't send a reply.

Kafka(三) —— 集群监控

三、监控系统kafka-eagle

安装参考

官方参考文档:https://docs.kafka-eagle.org/

https://www.cnblogs.com/yinzhengjie/p/9957389.html

下载


wget https://github.com/smartloli/kafka-eagle-bin/archive/v1.2.0.tar.gz

效果图

Kafka(三) —— 集群监控

Kafka(三) —— 集群监控

创建Topic

Kafka(三) —— 集群监控

监控Broker的消息出入站速率

Kafka(三) —— 集群监控

Kafka(三) —— 集群监控

参考文档

kafka-manager Github
Kafka集群管理工具kafka-manager的安装使用
kafka manager的使用,kafka manager页面参数说明
Kafka Manager几个指标含义

关于作者

后端程序员,五年开发经验,从事互联网金融方向。技术公众号「清泉白石」。如果您在阅读文章时有什么疑问或者发现文章的错误,欢迎在公众号里给我留言。

Kafka(三) —— 集群监控


推荐阅读
  • 基于Net Core 3.0与Web API的前后端分离开发:Vue.js在前端的应用
    本文介绍了如何使用Net Core 3.0和Web API进行前后端分离开发,并重点探讨了Vue.js在前端的应用。后端采用MySQL数据库和EF Core框架进行数据操作,开发环境为Windows 10和Visual Studio 2019,MySQL服务器版本为8.0.16。文章详细描述了API项目的创建过程、启动步骤以及必要的插件安装,为开发者提供了一套完整的开发指南。 ... [详细]
  • Spring – Bean Life Cycle
    Spring – Bean Life Cycle ... [详细]
  • 零拷贝技术是提高I/O性能的重要手段,常用于Java NIO、Netty、Kafka等框架中。本文将详细解析零拷贝技术的原理及其应用。 ... [详细]
  • 多线程基础概览
    本文探讨了多线程的起源及其在现代编程中的重要性。线程的引入是为了增强进程的稳定性,确保一个进程的崩溃不会影响其他进程。而进程的存在则是为了保障操作系统的稳定运行,防止单一应用程序的错误导致整个系统的崩溃。线程作为进程的逻辑单元,多个线程共享同一CPU,需要合理调度以避免资源竞争。 ... [详细]
  • [转]doc,ppt,xls文件格式转PDF格式http:blog.csdn.netlee353086articledetails7920355确实好用。需要注意的是#import ... [详细]
  • 深入解析 Lifecycle 的实现原理
    本文将详细介绍 Android Jetpack 中 Lifecycle 组件的实现原理,帮助开发者更好地理解和使用 Lifecycle,避免常见的内存泄漏问题。 ... [详细]
  • Unity与MySQL连接过程中出现的新挑战及解决方案探析 ... [详细]
  • 如果应用程序经常播放密集、急促而又短暂的音效(如游戏音效)那么使用MediaPlayer显得有些不太适合了。因为MediaPlayer存在如下缺点:1)延时时间较长,且资源占用率高 ... [详细]
  • JUC(三):深入解析AQS
    本文详细介绍了Java并发工具包中的核心类AQS(AbstractQueuedSynchronizer),包括其基本概念、数据结构、源码分析及核心方法的实现。 ... [详细]
  • importpymysql#一、直接连接mysql数据库'''coonpymysql.connect(host'192.168.*.*',u ... [详细]
  • 本文介绍如何使用 Python 的 DOM 和 SAX 方法解析 XML 文件,并通过示例展示了如何动态创建数据库表和处理大量数据的实时插入。 ... [详细]
  • javascript分页类支持页码格式
    前端时间因为项目需要,要对一个产品下所有的附属图片进行分页显示,没考虑ajax一张张请求,所以干脆一次性全部把图片out,然 ... [详细]
  • 使用Jsoup解析并遍历HTML文档时,该库能够高效地生成一个清晰、规范的解析树,即使源HTML文档存在格式问题。Jsoup具备强大的容错能力,能够处理多种异常情况,如未闭合的标签等,确保解析结果的准确性和完整性。 ... [详细]
  • 如何使用 `org.opencb.opencga.core.results.VariantQueryResult.getSource()` 方法及其代码示例详解 ... [详细]
  • 本文将深入解析 Lumen 框架中的中间件机制,并提供实用的应用指南。我们将从官方文档出发,重点解读 5.3 版本中的相关内容,帮助开发者更好地理解和使用中间件功能。通过具体示例,探讨中间件在请求处理流程中的作用及其配置方法。 ... [详细]
author-avatar
爱电影麦兜兜
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有