Shipper->Broker->Indexer->ES
1.input
input { stdin {} }
output {
stdout { codec=> rubydebug }
}
file {
codec => multiline {
pattern => "^\s"
what => "previous"
}
path => ["xx","xx"]
exclude => "1.log"
add_field => [ "log_ip", "xx" ]
tags => "tag1"
#设置新事件的标志
delimiter => "\n"
#设置多长时间扫描目录,发现新文件
discover_interval => 15
#设置多长时间检测文件是否修改
stat_interval => 1
#监听文件的起始位置,默认是end
start_position => beginning
#监听文件读取信息记录的位置
sincedb_path => "E:/software/logstash-1.5.4/logstash-1.5.4/test.txt"
#设置多长时间会写入读取的位置信息
sincedb_write_interval => 15
}
2.filter
filter {
multiline {
# 指定合并规则――所有不是以数字开头的行需要被合并
pattern => "^[^\d]"
# 合并到哪里――上一行
what => "previous"
}
filter {
multiline {
type => "type" #类型,不多说
pattern => "pattern, a regexp" #参数,也可以认为是字符,有点像grep ,如果符合什么字符就交给下面的 what 去处理
negate => boolean
what => "previous" or "next" #这个是符合上面 pattern 的要求后具体怎么处理,处理方法有两种,合并到上面一条日志或者下面的日志
}
}
filter {
grep {
match => [ "@message", "PHP Fatal error" ]
drop => false
add_tag => [fatal_error]
}
grep {
tags => [fatal_error]
match => [ "@message", ".*(xbox\.com|xbox\.mib\.com\.cn|supports\.game\.mib\.com\.cn)" ]
drop => false
add_tag => [xboxerror]
}
}
#过滤掉内容包含5.3.3与down以外日志
filter {
if [message] !~ "5.3.3|down" {
ruby {
code => "event.cancel"
}
}
}
#使用自带的过滤规则显示更多的字段
filter {
grok {
match => {"message" => "%{COMBINEDAPACHELOG}"}
}
}
#合并不是以[开头的日志
filter {
multiline {
pattern => "^[^[]"
negate => true
what => "previous"
}
}
filter {
if [path] =~ "error" {
mutate { replace => { "type" => "apache_error" } }
grok {
match => { "message" => "%{COMBINEDAPACHELOG}" }
}
}
date {
match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ]
}
}
filter {
if [path] =~ "access" {
mutate { replace => { type => "apache_access" } }
grok {
match => { "message" => "%{COMBINEDAPACHELOG}" }
}
date {
match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ]
}
} else if [path] =~ "error" {
mutate { replace => { type => "apache_error" } }
} else {
mutate { replace => { type => "random_logs" } }
}
}
3.output
发邮件
output {
email {
match => [ "@message", "aaaaa" ]
to => "storyskya@gmail.com"
from => "monitor@mib.com.cn"
options => [ "smtpIporHost", "smtp.mibnet.com",
"port", "25",
"userName", "monitor@mib.com.cn",
"starttls", "true",
"password", "opmonitor",
"authenticationType", "login"
]
subject => "123"
body => '123'
via => smtp
}
}
output {
if [type] == "syslog" {
elasticsearch {
hosts => "172.16.0.102:9200"
index => "syslog-%{+YYYY.MM.dd}"
}
}
if [type] == "nginx" {
elasticsearch {
hosts => "172.16.0.102:9200"
index => "nglog-%{+YYYY.MM.dd}"
}
}
#匹配内容包含paramiko与simplejson的日志通邮件发送
if [message] =~ /paramiko|simplejson/ {
email {
to => "12222222@wo.cn"
from => "good_zabbix@163.com"
contenttype => "text/plain; charset=UTF-8"
address => "smtp.163.com"
username => "test@163.com"
password => "12344"
subject => "服务器%{host}日志异常"
body => "%{@timestamp} %{type}: %{message}"
}
}
}
output {
stdout { codec => rubydebug }
redis {
host => '192.168.1.104'
data_type => 'list'
key => 'logstash:redis'
}
}
output {
elasticsearch { host => localhost }
stdout { codec => rubydebug }
}
替换
mutate {
type => "phplog"
gsub => [ "@message","'", "\"" ]
}
调试
# /usr/local/logstash-1.5.2/bin/logstash -e 'input { stdin { } } output { stdout {} }'
curl '
logstash -e 'input{stdin{}}output{stdout{codec=>rubydebug}}'
# logstash agent -f logstash-simple.conf --verbose //开启debug模式
本文出自 “要有梦想,万一实现了呢” 博客,请务必保留此出处http://szgb17.blog.51cto.com/340201/1865408