作者:劲舞团之舞之精灵_559 | 来源:互联网 | 2023-06-17 09:03
本文日志收集流程:
- k8s 使用 daemonset 创建 filebeat 服务;
- filebeat 发送日志到 kafka;
- logstash 收集 kafka 接收到的日志;
- logstash 发送日志到 ES;
- kibana 从 ES 提取日志并展示。
其中除filebeat是搭建在k8s中外,其他组件均使用宿主机或docker搭建
一、部署kafka
参考文章:Dokcer 搭建 kafka_我的喵叫初六的博客-CSDN博客pull镜像docker pull wurstmeister/zookeeperdocker pull wurstmeister/kafka安装zookeeper我这里做了资源限制docker run -d \--name zookeeper \-p 2181:2181 \--memory=1024m --cpus=1 \-t wurstmeister/zookeeper安装kafkadocker run -d \--name kafka \-p 9092https://blog.csdn.net/weixin_38367535/article/details/121103419
二、部署logstash、ES、kibana
参考文章:ELK + Filebeat 7.13_我的喵叫初六的博客-CSDN博客之前编写过低版本的ELK日志收集,最近换公司,部署了一套单节点7.13版本ELK,7.13版本较之前6.x版本还是有很大区别的。一、环境ELK部署在同一台Centos7.9服务器上Filebeat用于收集nginx日志java日志使用的是阿里云NAS共享存储,所以直接将日志目录挂载到了ELK服务器上JDK 1.8二、下载安装1、下载# elasticsearchwget https://artifacts.elastic.co/downloads/elasti..https://blog.csdn.net/weixin_38367535/article/details/119183688
三、部署filebeat
下载 filebeat yaml 文件
curl -L -O https://raw.githubusercontent.com/elastic/beats/master/deploy/kubernetes/filebeat-kubernetes.yaml
文件内容:
# cat filebeat-kubernetes.yaml---
apiVersion: v1
kind: ConfigMap
metadata:name: filebeat-confignamespace: kube-systemlabels:k8s-app: filebeat
data:filebeat.yml: |-filebeat.inputs:- type: containerpaths:- /var/log/containers/*.logprocessors:- add_kubernetes_metadata:host: ${NODE_NAME}matchers:- logs_path:logs_path: "/var/log/containers/"# To enable hints based autodiscover, remove `filebeat.inputs` configuration and uncomment this:#filebeat.autodiscover:# providers:# - type: kubernetes# node: ${NODE_NAME}# hints.enabled: true# hints.default_config:# type: container# paths:# - /var/log/containers/*${data.kubernetes.container.id}.logprocessors:- add_cloud_metadata:- add_host_metadata:cloud.id: ${ELASTIC_CLOUD_ID}cloud.auth: ${ELASTIC_CLOUD_AUTH}output.kafka:enabled: truehosts: ["172.16.105.148:9092"]topic: 'k8s-uat-log'max_message_bytes: 5242880partition.round_robin:reachable_only: truekeep-alive: 120required_acks: 1
---
apiVersion: apps/v1
kind: DaemonSet
metadata:name: filebeatnamespace: kube-systemlabels:k8s-app: filebeat
spec:selector:matchLabels:k8s-app: filebeattemplate:metadata:labels:k8s-app: filebeatspec:serviceAccountName: filebeatterminationGracePeriodSeconds: 30hostNetwork: truednsPolicy: ClusterFirstWithHostNetcontainers:- name: filebeatimage: docker.elastic.co/beats/filebeat:7.4.2args: ["-c", "/etc/filebeat.yml","-e",]env:- name: ELASTICSEARCH_HOSTvalue: elasticsearch- name: ELASTICSEARCH_PORTvalue: "9200"- name: ELASTICSEARCH_USERNAMEvalue: elastic- name: ELASTICSEARCH_PASSWORDvalue: changeme- name: ELASTIC_CLOUD_IDvalue:- name: ELASTIC_CLOUD_AUTHvalue:- name: NODE_NAMEvalueFrom:fieldRef:fieldPath: spec.nodeNamesecurityContext:runAsUser: 0# If using Red Hat OpenShift uncomment this:#privileged: trueresources:limits:memory: 200Mirequests:cpu: 100mmemory: 100MivolumeMounts:- name: configmountPath: /etc/filebeat.ymlreadOnly: truesubPath: filebeat.yml- name: datamountPath: /usr/share/filebeat/data- name: varlibdockercontainersmountPath: /var/lib/docker/containersreadOnly: true- name: varlogmountPath: /var/logreadOnly: truevolumes:- name: configconfigMap:defaultMode: 0640name: filebeat-config- name: varlibdockercontainershostPath:path: /var/lib/docker/containers- name: varloghostPath:path: /var/log# data folder stores a registry of read status for all files, so we don't send everything again on a Filebeat pod restart- name: datahostPath:# When filebeat runs as non-root user, this directory needs to be writable by group (g+w).path: /var/lib/filebeat-datatype: DirectoryOrCreate
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:name: filebeat
subjects:
- kind: ServiceAccountname: filebeatnamespace: kube-system
roleRef:kind: ClusterRolename: filebeatapiGroup: rbac.authorization.k8s.io
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:name: filebeatnamespace: kube-system
subjects:- kind: ServiceAccountname: filebeatnamespace: kube-system
roleRef:kind: Rolename: filebeatapiGroup: rbac.authorization.k8s.io
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:name: filebeat-kubeadm-confignamespace: kube-system
subjects:- kind: ServiceAccountname: filebeatnamespace: kube-system
roleRef:kind: Rolename: filebeat-kubeadm-configapiGroup: rbac.authorization.k8s.io
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:name: filebeatlabels:k8s-app: filebeat
rules:
- apiGroups: [""] # "" indicates the core API groupresources:- namespaces- pods- nodesverbs:- get- watch- list
- apiGroups: ["apps"]resources:- replicasetsverbs: ["get", "list", "watch"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:name: filebeat# should be the namespace where filebeat is runningnamespace: kube-systemlabels:k8s-app: filebeat
rules:- apiGroups:- coordination.k8s.ioresources:- leasesverbs: ["get", "create", "update"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:name: filebeat-kubeadm-confignamespace: kube-systemlabels:k8s-app: filebeat
rules:- apiGroups: [""]resources:- configmapsresourceNames:- kubeadm-configverbs: ["get"]
---
apiVersion: v1
kind: ServiceAccount
metadata:name: filebeatnamespace: kube-systemlabels:k8s-app: filebeat
---
文件并未做过多修改,主要有镜像版本,我拉取文件的时候,文件中的filebeat版本是8.0,拉取失败了所以改成了7.4.2版本
pod cpu处建议做个限制,否则发生错误可能会占用很大cpu
另外就是删除了output部分,删除了原文的输出到ES,改为kafka
output.kafka:enabled: truehosts: ["172.16.105.148:9092"]topic: 'k8s-uat-log'max_message_bytes: 5242880partition.round_robin:reachable_only: truekeep-alive: 120required_acks: 1
注意这里的topci必须提前在kafka中创建一下。
启动pod: kubectl apply -f filebeat-kubernetes.yaml
pod状态正常后查看日志,出现2021-11-03T03:47:39.954Z INFO [monitoring] log/log.go:145 Non-zero metrics in the last 30s {"monitoring":xxxxxxxxx等字样表示已经发送给kafka了
这时我们去kafka查看一下
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic k8s-uat-log --from-beginning
这段命令在kafka服务器上执行
出现快速滚动的日志信息即表示已经接收到来自filebeat的日志了,记得快点ctrl c。。。
四、配置logstash
cat config/k8s-uat-log.confinput {kafka {bootstrap_servers => ["172.16.105.148:9092"]client_id => "k8s-uat-log1"group_id => "host_log"auto_offset_reset => "latest"consumer_threads => 5decorate_events => truetopics => ["k8s-uat-log"]type => "k8s-uat-log"codec => json}
}filter {mutate {remove_field => ["@metadata","ecs","stream","input","log","pod-template-hash","uid","architecture","containerized","hostname","os","agent"] # 删除字段}
}output {if [type] == "k8s-uat-log" {elasticsearch{hosts => ["127.0.0.1:9200"]index => "k8s-uat-log-%{+YYYY.MM.dd}"timeout => 300}}
}
input处指定kafka
client_id 自定义写
group_id 这个用来做logstash负载用的,比如你想多启动一个logstash收集同样的这个日志,那么group_id必须一致,client_id不同
多台logstash实例消费同一个topics时,需要保证kafka的分区不能只有一个,logstash的实例数不能大于kafka的分区数。
auto_offset_reset => "latest" 从最后提交的offset开始消费,避免logstash重启后从头消费,浪费资源数据重复
topics 指定 kafka topic
type 用于output部分匹配,多个input可以使用不同的type,达到output区分发送
codec 使用json格式
filter处使用了删除字段配置,这里你可以去掉,看自己需要啥就留什么
启动logstash
logstash日志出现to the committed offset FetchPosition{offset=104940字样表示启动成功,这里的offset就是上面 auto_offset_reset => "latest" 配置相关的字段
五、查看结果
打开kibana添加索引输入k8s-uat能出现自动匹配的日期索引就表示logstash已将日志发送到ES中了,配置好索引后日志收集工作就全部完成了。