作者:mobiledu2502910233 | 来源:互联网 | 2023-05-19 06:29
前言在平时工作中,我们需要对相关日志进行分析,随着平台的允许,日志会越来越大,不便于分析,此时我们需要将日志写入es,在这个过程中logstash起到中间转发的作用,类似于ETL工
前言
在平时工作中,我们需要对相关日志进行分析,随着平台的允许,日志会越来越大,不便于分析,此时我们需要将日志写入es,在这个过程中logstash起到中间转发的作用,类似于ETL工具。
1、搭建EL环境(此处没有使用Kibana)
(1)、安装es(5.6.16)
下载地址:https://elasticsearch.cn/download/
安装步骤:https://www.cnblogs.com/cq-yangzhou/p/9310431.html
(2)、安装logstash
下载地址:https://elasticsearch.cn/download/
安装步骤:解压即可。
(3)、安装IK分词器
下载地址:https://github.com/medcl/elasticsearch-analysis-ik/releases
安装步骤:解压,将里面的内容拷贝到es的plugins/ik(ik目录自己创建)目录下面,重启es即可
2、搭建springboot+logstash环境
(1)、引入logstash的mven依赖
net.logstash.logback
logstash-logback-encoder
5.1
(2)、编写logback-spring.xml放在resources目录下面
127.0.0.1:4567
(3)、编写日志类LogStashUtil
@Slf4j
public class LogStashUtil {
public static void sendMessage(String username, String type, String content,
Date createTime,String parameters){
JSONObject jsOnObject= new JSONObject();
jsonObject.putOpt("username",username);
jsonObject.putOpt("type",type);
jsonObject.putOpt("content",content);
jsonObject.putOpt("parameters",parameters);
jsonObject.putOpt("createTime", createTime);
log.info(jsonObject.toString());
}
}
(4)、编写logstash对应的日志收集配置文件
input {
tcp {
mode => "server"
host => "0.0.0.0"
port => 4567
codec => json{
charset=>"UTF-8"
}
}
}
filter {
json {
source => "message"
#移除的字段,不会存入es
remove_field => ["message","port","thread_name","logger_name","@version","level_value","tags"]
}
date {
match => [ "createTime", "UNIX_MS" ]
target => "@timestamp"
}
ruby {
code => "event.set('timestamp', event.get('@timestamp').time.localtime + 8*60*60)"
}
ruby {
code => "event.set('@timestamp',event.get('timestamp'))"
}
mutate {
remove_field => ["timestamp"]
}
date {
match => [ "createTime", "UNIX_MS" ]
target => "createTime"
}
ruby {
code => "event.set('createTime', event.get('createTime').time.localtime + 8*60*60)" #时间加8个小时
}
}
output {
elasticsearch {
hosts => "localhost:9200"
index => "springboot-logstash-%{+YYYY.MM}"
document_type => access
#模板 指定生成索引的mapping模板,配置ik分词器
template_overwrite => true
template=> "F:/elk/logstash-5.6.16/template/logstash.json"
}
}
补充:模板编写
{
"template": "*",
"version": 00001,
"settings": {
"index.refresh_interval": "5s"
},
"mappings": {
"_default_": {
"_all": {
"enabled": true,
"norms": false
},
"dynamic_templates": [
{
"message_field": {
"path_match": "message",
"match_mapping_type": "string",
"mapping": {
"type": "text",
"norms": false
}
}
},
{
"string_fields": {
"match": "*",
"match_mapping_type": "string",
"mapping": {
"type": "text",
"norms": false,
"analyzer": "ik_max_word",#只需要添加这一行即可设置分词器为ik_max_word
"fields": {
"keyword": {
"type": "keyword"
}
}
}
}
}
],
"properties": {
"@timestamp": {
"type": "date",
"include_in_all": false
},
"@version": {
"type": "keyword",
"include_in_all": false
}
}
}
}
}
(5)、启动es和logstash
1、启动es
进入es的bin目录
elasticsearch &
2、启动logstash
logstash -f message.conf(指定对应的日志配置文件路径) -d(后台运行)
(6)、单元测试,调用日志收集类LogStashUtil
@Test
public void contextLoads() {
LogStashUtil.sendMessage(
"admin","添加年度管理计划","测试",new Date(),"测试");
}
(7)、查看es中的结果