作者:企鹅 | 来源:互联网 | 2023-07-03 15:17
首先介绍一下什么是Filebeat+Kafka+ELK,ELK+Filebeat就是通过Filebeat收集数据,然后通过logstash发送到es上,然后直接发送的话,如果数据过
首先介绍一下什么是Filebeat+Kafka+ELK,ELK+Filebeat就是通过Filebeat收集数据,然后通过logstash发送到es上,然后直接发送的话,如果数据过大,或者遇到其他别的一些问题,在高并发环境下,同步请求来不及处理,请求往往会发生阻塞。所以我们使用Kafka通过异步处理请求,从而缓解系统的压力。Kafka就是消息队列,把Filebeat收集的数据输出到Kafka,然后通过logstash把经过消息队列排序的消息有序的发往es储存,从而避免一个时间系统请求过多带来的系统崩盘。
一、环境准备
node1节点(内存4G):192.168.235.101 部署ES、Kibana、Filebeat
node2节点(内存4G):192.168.235.102 部署ES
apache节点:192.168.235.103 部署logstash、apache
zookeeper集群(部署zookeeper):
server1:192.168.235.104
server2:192.168.235.105
server3:192.168.235.106
关闭防火墙
以下实验建立在以上环境当中,已经全部完成,详情见之前博客。
二、安装kafka
在zookeeper集群中三台机子上均要安装
![在这里插入图片描述](https://img.php1.cn/3cd4a/1eebe/cd5/8be1ccb5166feb93.webp)
![在这里插入图片描述](https://img.php1.cn/3cd4a/1e618/cd5/af17da15769ccb2e.jpeg)
修改配置文件
![在这里插入图片描述](https://img.php1.cn/3cd4a/1eebe/cd5/dc7ef30f57b727c7.jpeg)
![在这里插入图片描述](https://img.php1.cn/3cd4a/1eebe/cd5/011ac27956d007f0.webp?,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3dlaXhpbl81NTYwOTk1MQ==,size_16,color_FFFFFF,t_70)
![在这里插入图片描述](https://img.php1.cn/3cd4a/1eebe/cd5/1e3db12dd78db092.webp?,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3dlaXhpbl81NTYwOTk1MQ==,size_16,color_FFFFFF,t_70)
![在这里插入图片描述](https://img.php1.cn/3cd4a/1eebe/cd5/5b97d3b808d031e2.webp?,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3dlaXhpbl81NTYwOTk1MQ==,size_16,color_FFFFFF,t_70)
![在这里插入图片描述](https://img.php1.cn/3cd4a/1eebe/cd5/617c1173853af4b6.webp?,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3dlaXhpbl81NTYwOTk1MQ==,size_16,color_FFFFFF,t_70)
![在这里插入图片描述](https://img.php1.cn/3cd4a/1eebe/cd5/4fae50aeee651818.webp?,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3dlaXhpbl81NTYwOTk1MQ==,size_16,color_FFFFFF,t_70)
修改环境变量
![在这里插入图片描述](https://img.php1.cn/3cd4a/1eebe/cd5/617c1173853af4b6.webp)
![在这里插入图片描述](https://img.php1.cn/3cd4a/18ace/696/1d8e759bd3e6bbec.jpeg)
![在这里插入图片描述](https://img.php1.cn/3cd4a/1eebe/cd5/0d80e8a685a9a87b.png)
配置启动脚本
![在这里插入图片描述](https://img.php1.cn/3cd4a/1eebe/cd5/bcafc120671304eb.webp)
![在这里插入图片描述](https://img.php1.cn/3cd4a/1eebe/cd5/bff2716168d1ed7b.webp?,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3dlaXhpbl81NTYwOTk1MQ==,size_16,color_FFFFFF,t_70)
#!/bin/bash
#chkconfig:2345 22 88
#description:Kafka Service Control Script
KAFKA_HOME='/usr/local/kafka'
case $1 in
start)
echo "---------- Kafka 启动 ------------"
${KAFKA_HOME}/bin/kafka-server-start.sh -daemon ${KAFKA_HOME}/config/server.properties
;;
stop)
echo "---------- Kafka 停止 ------------"
${KAFKA_HOME}/bin/kafka-server-stop.sh
;;
restart)
$0 stop
$0 start
;;
status)
echo "---------- Kafka 状态 ------------"
count=$(ps -ef | grep kafka | egrep -cv "grep|$$")
if [ "$count" -eq 0 ];then
echo "kafka is not running"
else
echo "kafka is running"
fi
;;
*)
echo "Usage: $0 {start|stop|restart|status}"
esac
![在这里插入图片描述](https://img.php1.cn/3cd4a/1eebe/cd5/6789f68dabde0aed.png)
创建topic
![在这里插入图片描述](https://img.php1.cn/3cd4a/1eebe/cd5/1113165c4904ecc5.webp?,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3dlaXhpbl81NTYwOTk1MQ==,size_16,color_FFFFFF,t_70)
查看当前服务器中的所有 topic
kafka-topics.sh --list --zookeeper 192.168.235.104:2181,192.168.235.105:2181,192.168.235.106:2181
查看某个 topic 的详情
kafka-topics.sh --describe --zookeeper 192.168.235.104:2181,192.168.235.105:2181,192.168.235.106:2181
发布消息
kafka-console-producer.sh --broker-list 192.168.235.104:9092,192.168.235.105:9092,192.168.235.106:9092 --topic name1
消费消息
kafka-console-consumer.sh --bootstrap-server 192.168.235.104:9092,192.168.235.105:9092,192.168.235.106:9092 --topic name1 --from-beginning
--from-beginning:会把主题中以往所有的数据都读取出来
修改分区数
kafka-topics.sh --zookeeper 192.168.235.104:2181,192.168.235.105:2181,192.168.235.106:2181 --alter --topic name1 --partitions 6
删除 topic
kafka-topics.sh --delete --zookeeper 192.168.235.104:2181,192.168.235.105:2181,192.168.235.106:2181 --topic name1
三、Filebeat+Kafka+ELK
修改Filebeat配置文件
![在这里插入图片描述](https://img.php1.cn/3cd4a/1eebe/cd5/7d7ef3f69d479716.webp)
![在这里插入图片描述](https://img.php1.cn/3cd4a/1eebe/cd5/d84f9786330d9e41.png?,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3dlaXhpbl81NTYwOTk1MQ==,size_16,color_FFFFFF,t_70)
![在这里插入图片描述](https://img.php1.cn/3cd4a/1eebe/cd5/99b88427bc9ce0dc.webp?,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3dlaXhpbl81NTYwOTk1MQ==,size_16,color_FFFFFF,t_70)
启动 filebeat
![在这里插入图片描述](https://img.php1.cn/3cd4a/1eebe/cd5/dc7ef30f57b727c7.jpeg)
在logstash上新建一个配置文件
![在这里插入图片描述](https://img.php1.cn/3cd4a/1eebe/cd5/5287a7b3296ea13e.webp)
![在这里插入图片描述](https://img.php1.cn/3cd4a/1eebe/cd5/ff61bfdd3c0af92e.webp?,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3dlaXhpbl81NTYwOTk1MQ==,size_16,color_FFFFFF,t_70)
input {
kafka {
bootstrap_servers => "192.168.235.104:9092,192.168.235.105:9092,192.168.235.106:9092"
topics => "filebeat_name1"
group_id => "name123"
auto_offset_reset => "earliest"
}
}
output {
elasticsearch {
hosts => ["192.168.235.101:9200"]
index => "filebeat-%{+YYYY.MM.dd}"
}
stdout {
codec => rubydebug
}
}
启动 logstash
![在这里插入图片描述](https://img.php1.cn/3cd4a/1eebe/cd5/d84f9786330d9e41.png)
登录 Kibana
![在这里插入图片描述](https://img.php1.cn/3cd4a/1eebe/cd5/8be1ccb5166feb93.webp?,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3dlaXhpbl81NTYwOTk1MQ==,size_16,color_FFFFFF,t_70)
![在这里插入图片描述](https://img.php1.cn/3cd4a/1eebe/cd5/e62700fe09f8933e.webp?,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3dlaXhpbl81NTYwOTk1MQ==,size_16,color_FFFFFF,t_70)
可以查看到node1节点的信息,以上的操作是使用filebeat收集了node1的数据,传送到zookeeper集群,zookeeper这边是通过kafka接受,kafka作为一个消息队列,他可以对数据进行处理,然后传送给通过logstash传送到es中进行储存,最后通过kibana展示出来。