热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

利用InfluxDB+Grafana实现kafka消费tps监控

由于配套的监控系统还不完善,自己写了一个简易版kafka消息消费tps监控页面。InfluxDB安装下载安装wgethttps:dl.influxdata.cominfluxdbr

由于配套的监控系统还不完善,自己写了一个简易版kafka消息消费tps监控页面。


InfluxDB安装



  • 下载安装



wget https://dl.influxdata.com/influxdb/releases/influxdb-1.6.2.x86_64.rpm
sudo yum localinstall influxdb-1.6.2.x86_64.rpm

 



  • InfluxDB配置

 

vi /etc/influxdb/influxdb.conf

 

 

enabled = true
database
= "jmeter"
retention
-policy = ""
bind
-address = ":2003"
protocol
= "tcp"
consistency
-level = "one"
batch
-size = 5000
batch
-pending = 10
batch
-timeout = "1s"
udp
-read-buffer = 0
separator
= "."


  • InfluxDB启动

influxd -config /etc/influxdb/influxdb.conf


  • InfluxDB操作

显示所有数据库:show databases
新增数据库:create database shhnwangjian
创建管理员权限的用户:CREATE USER
"admin" WITH PASSWORD 'admin' WITH ALL PRIVILEGES
删除数据库:drop database shhnwangjian
使用指定数据库:use shhnwangjian
显示所有表:SHOW MEASUREMENTS
显示
一个measurement中所有tag key:show tag keys from disk_free

查看一个measurement中所有的field key:show field keys

新增数据:insert disk_free,hostname=server01 value=442221834240i 1435362189575692182
删除表:drop measurement disk_free
插入数据:insert disk_free,
hostname=server01 value=100 1613791502000000000
顺序为:表名(不存在则直接新建)+tag的key和value,field的key和value。注意中间的空格。

 

Api接口插入数据:
curl
-i -XPOST 'http://IP:PORT/write?db=jmeter' --data-binary 'cpu_load_short,host=server01,region=us-west value=0.64 1434055562000000000'

 


JMeter配置



  1. 创建一个测试计划,并添加Backend Listener

  2. 设置InfluxDB IP及端口设置InfluxDB IP及端口

  3. 运行测试,等待几秒运行测试,等待几秒

  4. 查看JMeter是否生成错误日志查看JMeter是否生成错误日志

端口说明:



  • 8086端口,Grafana用来从数据库取数据的端口

  • 2003端口,JMeter往数据库发数据的端口

 


Grafana配置



  • 下载安装

wget https://s3-us-west-2.amazonaws.com/grafana-releases/release/grafana-4.2.0-1.x86_64.rpm sudo yum localinstall grafana-4.2.0-1.x86_64.rpm


  • 启动

service grafana-server start


  • 使用浏览器打开 http://IP:3000/login,访问Grafana主页



  • 创建InfluxDB数据源



  • 创建dashboard

 

 

SQL:SELECT mean("value") FROM "tps" WHERE $timeFilter GROUP BY time(1s), "host" fill(null)

$__interval:group time(interval)会对查询结果按照interval进行聚合,例如:time(5m),interval=5m, 则会将数据每隔5m进行聚合



  • 折线图配置

 

通过Stack来控制是否累积 


数据入库

通过javasample将消费监控集成到jmeter中,在启动生产者之前先启动消费监控线程组

import lombok.SneakyThrows;
import org.apache.jmeter.config.Arguments;
import org.apache.jmeter.protocol.java.sampler.AbstractJavaSamplerClient;
import org.apache.jmeter.protocol.java.sampler.JavaSamplerContext;
import org.apache.jmeter.samplers.SampleResult;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
/**
*
@author: weisy
* @date: 2021/02/22
* @description:
*/
public class KafkaMonitor extends AbstractJavaSamplerClient {
private String kafkaIp;
private String topic;
private String consumer;
private String influxdbIp;
private String influxdbName;
private String measurement;
private int sampleInterval;
private int loopTime;
@Override
public Arguments getDefaultParameters() {
Arguments params
= new Arguments();
params.addArgument(
"kafkaIp", "IP:PORT","kafka集群信息");
params.addArgument(
"topic", "topic_name","topic");
params.addArgument(
"consumer", group_name","消费者");
params.addArgument("influxdbIp", "IP:PORT","数据库ip");
params.addArgument(
"influxdbName", "jmeter","数据库名");
params.addArgument(
"measurement", "tps","表名");
params.addArgument(
"sampleInterval", "0.5","采样间隔时间/s");
params.addArgument(
"loopTime", "60","持续时间/s");
return params;
}
/**
* 每个线程测试前执行一次,做一些初始化工作
* 获取输入的参数,赋值给变量,参数也可以在下面的runTest方法中获取,这里是为了展示该方法的作用
*
@param arg0
*/
@Override
public void setupTest(JavaSamplerContext arg0) {
kafkaIp
= arg0.getParameter("kafkaIp");
topic
= arg0.getParameter("topic");
consumer
= arg0.getParameter("consumer");
influxdbIp
= arg0.getParameter("influxdbIp");
influxdbName
= arg0.getParameter("influxdbName");
measurement
= arg0.getParameter("measurement");
try {
this.sampleInterval = Integer.valueOf(arg0.getParameter("sampleInterval")).intValue()*1000;
}
catch (NumberFormatException e) {
e.printStackTrace();
}
try {
this.loopTime = Integer.valueOf(arg0.getParameter("loopTime")).intValue();
}
catch (NumberFormatException e) {
e.printStackTrace();
}
}
/**
* 真正执行逻辑的方法
*
@param arg0
*
@return
*/
@SneakyThrows
@Override
public SampleResult runTest(JavaSamplerContext arg0) {
SampleResult sr
= new SampleResult();
Date data
= new Date();
long currentTime =new Date().getTime();
long endTime=currentTime+loopTime * 1000L;
int tol = 0;
int num = 0;
System.out.println(
"开始时间"+currentTime);
System.out.println(
"结束时间"+endTime);
while (currentTime < endTime) {
currentTime
=new Date().getTime();
int offsets = 0;
offsets
= getPartitionsForTopic(topic,kafkaIp,consumer);
System.out.println(
"消费时间:"+currentTime+" 消费量:"+ (offsets-num));
int tps = offsets-num;
if (num != 0 && offsets - num != 0){
tol
= offsets - num + tol;

String querybody
= measurement+",host=server01,region=us-west value="+tps;
String url
= "http://"+influxdbIp+"/write?db="+influxdbName;
HttpClient4.writeInfluxdb(url,querybody,
null);
}
num
= offsets;
Thread.sleep(sampleInterval);
}
return sr;
}
/**
* 测试结束后调用
*
@param arg0
*/
@Override
public void teardownTest(JavaSamplerContext arg0) {
}
private static Consumer createConsumer(String BOOTSTRAP_SERVERS,String Group) {
final Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG, Group);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.
class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.
class.getName());
final Consumer cOnsumer= new KafkaConsumer<>(props);
return consumer;
}
public static int getPartitionsForTopic(String TOPIC,String BOOTSTRAP_SERVERS,String Group) {
final Consumer cOnsumer= createConsumer(BOOTSTRAP_SERVERS,Group);
AtomicInteger sonsumernum
= new AtomicInteger();
Collection

partitiOnInfos= consumer.partitionsFor(TOPIC);
System.out.println(
"Get the partition info as below:");
List
tp =new ArrayList();
partitionInfos.forEach(str
-> {
tp.add(
new TopicPartition(TOPIC,str.partition()));
consumer.assign(tp);
consumer.seekToEnd(tp);

sonsumernum.set(sonsumernum.get() + (int) consumer.committed(new TopicPartition(TOPIC, str.partition())).offset()); //获取每个分区提交的偏移量总和

});
return sonsumernum.get();
}
/**
* main方法测试程序是否可用,打包时 注释掉
*
@param args
*/
public static void main(String[] args) {
Arguments params
= new Arguments();
//设置参数
params.addArgument("kafkaIp", "IP:PORT","kafka集群信息");
params.addArgument(
"topic", "topic_name","topic");
params.addArgument(
"consumer", "group_name","消费者");
params.addArgument(
"influxdbIp", "IP:PORT","数据库ip");
params.addArgument(
"influxdbName", "jmeter","数据库名");
params.addArgument(
"measurement", "tps","表名");
params.addArgument(
"sampleInterval", "0.5","采样间隔时间");
params.addArgument(
"loopTime", "60","持续时间/s");
JavaSamplerContext arg0
= new JavaSamplerContext(params);
t3.jmeter.javasample.kafka.KafkaMonitor test
= new KafkaMonitor();
test.setupTest(arg0);
test.runTest(arg0);
test.teardownTest(arg0);
}
}

 


最终效果

 

 

另外自己也通过Highcharts写了一个简单的监控页面:

 

 

 

<html>
<head>
<meta charset="UTF-8" />
<title>Highchartstitle>
<script src="http://apps.bdimg.com/libs/jquery/2.1.4/jquery.min.js">script>
<script src="http://code.highcharts.com/highcharts.js">script>head>
<body>
<div id="container" style="width: 1000px; height: 800px; margin: 0 auto">div>
<script language="Javascript">
$(document).ready(
function() {
var chart = {
type:
'spline',
animation: Highcharts.svg,
// don't animate in IE
marginRight: 10,
events: {
load:
function () {
// set up the updating of the chart each second
var series = this.series[0];
setInterval(
function () {
var res = showAdress(5);
if(res.series){
var x = res.series[0].values[0][0];
var y = res.series[0].values[0][1];
series.addPoint([x, y],
true, true);
}
// series.addPoint([x, y], true, true);
}, 1000);
}
}
};
var title = {
text:
'kafka消费速度'
};
var xAxis = {
type:
'datetime',
tickPixelInterval:
150
};
var yAxis = {
title: {
text:
'TPS'
},
plotLines: [{
value:
0,
width:
1,
color:
'#808080'
}]
};
var tooltip = {
//设置x和y轴显示格式
formatter: function () {
return '' + this.series.name + '
' +
Highcharts.dateFormat(
'%Y-%m-%d %H:%M:%S', this.x) + '
' +
Highcharts.numberFormat(
this.y, 2);
}
};
var plotOptions = {
area: {
pointStart:
1940,
marker: {
enabled:
false,
symbol:
'circle',
radius:
2,
states: {
hover: {
enabled:
true
}
}
}
}
};
var legend = {
enabled:
false
};
var exporting = {
enabled:
false
};
var series= [{
name:
'消费tps',
data: (
function () {
// generate an array of random data
var data = [],time = (new Date()).getTime(),i;
var res = showAdress(60);
if(res.series){
var values = res.series[0].values;
//遍历数组并打印当前值和下标
values.forEach(function (element, index, array) {
console.log(element, index)
data.push({
x: element[
0],
y: element[
1]
});
})
}
console.log(res);
return data;
}())
}];
function showAdress(time)
{
var flag = false;//全局变量,以便下面做判断
var datas;
$.ajax
({
url:
"http://IP:PORT/query",
dataType:
"json",
type:
"get",
async:
false,
data: {
u:
"admin",
p:
"admin",
db:
"jmeter",
q:`SELECT mean(
"value") FROM "tps" WHERE time > now() - ${time}s GROUP BY time(1s) fill(null)`,
// q:'SELECT mean("value") FROM "tps" WHERE time > 1614757792613ms and time <1614757896397ms GROUP BY time(1s) fill(null)',
epoch:"ms"
},
success:
function(res){
if(null != res){
// datas = res.results[0].series[0].values;
datas = res.results[0];
flag
= true;
}
},
error:
function(){
alert(
'failed!');
},
});
if(flag){
return datas;
}
}
var json = {};
json.chart
= chart;
json.title
= title;
json.tooltip
= tooltip;
json.xAxis
= xAxis;
json.yAxis
= yAxis;
json.legend
= legend;
json.exporting
= exporting;
json.series
= series;
json.plotOptions
= plotOptions;
Highcharts.setOptions({
global: {
useUTC:
false
}
});
$(
'#container').highcharts(json);
});
script>
body>
html>

 



推荐阅读
author-avatar
荣清右
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有