在我之前的一篇文章 “Beats:使用 Linux 系统上的 Rsyslog 收集日志并导入 Elasticsearch”,我讲述了如何使用 Rsyslog 在 Linux 系统上收集日志并发送到 Elasticsearch 中。在那个解决方案里,我们配置 Rsyslog 为一个 client 模式,并发送日志到 Elasticsearch 中。我们在 Logstash 里对日志进行结构化。
在今天的文章中,我将使用另外一种方式。我们把 Rsyslog 同时配置为 server 及 client 模式。通过 server 模式,我可以把各个应用或其它系统的日志统一收集在一起,并通过 client 模式发送到 Logstash 中。我们甚至可以直接使用 Rsyslog 的配置直接使日志结构化。除非我们需要特殊的处理,否则我们可以甚至在 Logstash 中省去 filter 这个部分。
介绍理解组织生成的数百万条日志行可能是一项艰巨的挑战。 一方面,这些日志行提供了应用程序性能、服务器性能指标和安全性的视图。 另一方面,日志管理和分析可能非常耗时,这可能会阻碍这些日益必要的服务的采用。
rsyslog、Elasticsearch 和 Logstash 等开源软件提供了传输、转换和存储日志数据的工具。
在本教程中,你将学习如何创建一个集中的 rsyslog 服务器来存储来自多个系统的日志文件,然后使用 Logstash 将它们发送到 Elasticsearch 服务器。 从那里,你可以决定如何最好地分析数据。
本教程教你如何集中 syslog 生成或接收的日志,特别是称为 rsyslog 的变体。 Syslog 和基于 syslog 的工具(如 rsyslog)从内核和许多运行以保持类 UNIX 服务器运行的程序中收集重要信息。由于 syslog 是一个标准,而不仅仅是一个程序,因此许多软件项目都支持将数据发送到 syslog。通过集中这些数据,你可以更轻松地审核安全性、监控应用程序行为并跟踪其他重要的服务器信息。
然后,你可以从集中式或聚合 rsyslog 服务器将数据转发到 Logstash,后者可以在将日志数据发送到 Elasticsearch 之前进一步解析和丰富您的日志数据。
本教程的最终目标是:
在今天的练习中,我不准备进行安装描述。我希望你已经安装好自己的:
- 一个系统的 rsyslog 将被配置为 server 模式
- 其它系统的 rsyslog 将被配置为 client 模式
为了说明问题的方便,我在 Ubuntu 20.04 的 Linux 机器上部署上面部署 Elastic Stack 及 Rsyslog。在这个机器上的 Rsyslog 将被配置为服务器模式。我的机器的 IP 地址是 192.168.0.4。你可以通过如下的方式来获得:
ifconfig -a | grep 192
$ ifconfig -a | grep 192inet 192.168.0.4 netmask 255.255.255.0 broadcast 192.168.0.255
或者:
ifconfig -a
你可以使用其它的 Linux 机器部署 Rsyslog,并按照下面的方式来配置为 client 模式。它们的 syslog 日志将被发送到集中处理的 Rsyslog server 中,并最终通过 Logstash 导入到 Elastiicsearch 中。
在本节中,我们将 rsyslog-server 配置为能够在端口 514 上接收来自其他 syslog 服务器的数据的中央服务器。要将 rsyslog-server 配置为从其他 syslog 服务器接收数据,请在 rsyslog-server 上编辑 /etc/rsyslog.conf:
/etc/rsyslog.conf
# provides UDP syslog reception
module(load="imudp")
input(type="imudp" port="514")# provides TCP syslog reception
module(load="imtcp")
input(type="imtcp" port="514")
找到已经在 rsyslog.conf 中注释掉的这些行,并它们的注释取消掉。最终的结果就像上面显示的那样。这样,我们同时启动了 TCP 及 UDP 。
每个部分的第一行 module(load="imudp") 和 module(load="imtcp") 分别加载 imudp 和 imtcp 模块。 imudp 代表输入模块 udp,imtcp 代表输入模块 tcp。 这些模块侦听来自其他系统日志服务器的传入数据。
每个部分的第二行(input(type="imudp" port="514") 和 input(type="imtcp" port="514"))表明 rsyslog 应该为这些协议启动各自的 UDP 和 TCP 服务器,这些协议侦听端口 514(这是 syslog 默认端口)。
我们重新启动 rsyslog 服务:
sudo service rsyslog restart
我们可以通过如下的命令来检查 rsyslog 服务是否运行正常:
service rsyslog status
提示:你可以使用如下的命令来检查 rsyslog 的配置文件:
sudo rsyslogd -N1
你可以使用如下的命令来检查 rsyslog 服务的运行日志:
journalctl -u rsyslog
配置其它系统的 Rsyslog 发送日志到集中服务器
我们在其他的 Linux 机器把 Rsyslog 配置为 client 模式,并把该机器的日志发送到 Rsyslog server 里去。这个 server 也就是我们的集中处理及上传日志的服务。针对我的情况它的 IP 地址是 192.168.0.4。
在 Ubuntu 的安装中,我们找到路径 /etc/rsyslog.d:
# pwd
/etc/rsyslog.d
root@liuxgu:/etc/rsyslog.d# ls
20-ufw.conf 50-default.conf
我们修改 50-default.conf 这个文件。在文件顶部的 log by facility 之前添加以下行,将 private_ip_of_ryslog_server 替换为你的中央服务器的私有 IP:
*.* @private_ip_of_ryslog_server:514
行的第一部分 (.) 表示我们要发送所有消息。 虽然它超出了本教程的范围,但你可以将 rsyslog 配置为仅发送某些消息。 该行的其余部分解释了如何发送数据以及将数据发送到何处。 在我们的例子中,IP 地址前的 @ 符号告诉 rsyslog 使用 UDP 发送消息。 将此更改为@@ 以使用 TCP。 接下来是安装了 rsyslog 和 Logstash 的 rsyslog-server 的私有 IP 地址。 冒号后面的数字是要使用的端口号。
重新启动 rsyslog 以启用更改:
sudo service rsyslog restart
恭喜! 你现在正在将系统日志消息发送到中央服务器!
Elasticsearch 要求它接收到的所有文档都是 JSON 格式,而 rsyslog 提供了一种通过模板的方式来实现这一点的方法。
在这一步中,我们将配置我们的中央 rsyslog 服务器以使用 JSON 模板来格式化日志数据,然后将其发送到 Logstash,然后将其发送到 Elasticsearch。
回到 rsyslog-server 中央服务器,创建一个新的配置文件,在发送到 Logstash 之前将消息格式化为 JSON 格式:
sudo vi /etc/rsyslog.d/01-json-template.conf
完全按照所示将以下内容复制到文件中:
/etc/rsyslog.d/01-json-template.conf
template(name="json-template"type="list") {constant(value="{")constant(value="\"@timestamp\":\"") property(name="timereported" dateFormat="rfc3339")constant(value="\",\"@version\":\"1")constant(value="\",\"message\":\"") property(name="msg" format="json")constant(value="\",\"sysloghost\":\"") property(name="hostname")constant(value="\",\"severity\":\"") property(name="syslogseverity-text")constant(value="\",\"facility\":\"") property(name="syslogfacility-text")constant(value="\",\"programname\":\"") property(name="programname")constant(value="\",\"procid\":\"") property(name="procid")constant(value="\"}\n")
}
除了第一个和最后一个,请注意此模板生成的行在它们的开头有一个逗号。 这是为了维护 JSON 结构,并通过整齐地排列所有内容来帮助保持文件可读。 此模板以 Elasticsearch 和 Logstash 期望接收它们的方式格式化你的消息。 这是它们的样子:
{"@timestamp" : "2015-11-18T18:45:00Z","@version" : "1","message" : "Your syslog message here","sysloghost" : "hostname.example.com","severity" : "info","facility" : "daemon","programname" : "my_program","procid" : "1234"
}
提示:如果你想自定义日志数据,rsyslog.com 文档会显示 rsyslog 中可用的变量。 但是,你必须以 JSON 格式将其发送到 Logstash,然后再发送到 Elasticsearch。
正在发送的数据尚未使用此格式。 下一步显示配置服务器以使用此模板文件。
现在我们有了定义正确 JSON 格式的模板文件,让我们配置 rsyslog 中央服务器以将数据发送到 Logstash,它在本教程的同一台服务器上。
启动时,rsyslog 将查看 /etc/rsyslog.d 中的文件并从中创建其配置。 让我们添加我们自己的配置文件来扩展配置。
创建 /etc/rsyslog.d/60-output.conf:
sudo vi /etc/rsyslog.d/60-output.conf
将以下行复制到此文件:
# This line sends all lines to defined IP address at port 10514,
# using the "json-template" format template
*.* @private_ip_logstash:10514;json-template
开头的 *.* 表示处理所有日志消息的行的其余部分。 @ 符号表示使用 UDP(使用 @@ 代替使用 TCP)。 @ 后的 IP 地址或主机名是转发消息的位置。 在我们的例子中,我们使用 rsyslog 中央服务器的私有 IP 地址,因为 rsyslog 中央服务器和 Logstash 服务器安装在同一台服务器上。 这必须与您配置 Logstash 以在下一步中侦听的私有 IP 地址相匹配。
接下来是端口号。 本教程使用端口 50515。请注意,Logstash 服务器必须使用相同的协议侦听相同的端口。 最后一部分是我们的模板文件,它展示了如何在传递数据之前格式化数据。
不要重新启动 rsyslog。 首先,我们必须配置 Logstash 以接收消息。
我们首先按照好 Logstash,然后创建如下的配置文件:
/etc/logstash/conf.d/syslog.conf
# pwd
/etc/logstash/conf.d
root@liuxgu:/etc/logstash/conf.d# ls
syslog.conf
input {udp {host => "logstash_private_ip"port => 50515codec => "json"type => "rsyslog"}
}output {elasticsearch {hosts => ["https://elasticsearch_private_ip:9200"]user => elasticpassword => passwordssl_certificate_verification => truecacert => "/etc/logstash/config/certs/ca.crt"}# stdout { codec => rubydebug }
}
在上面,你必须根据自己时间的 IP 地址进行替换。在上面,我的 rsyslog 中央服务器及 Elasticsearch 都处于 192.168.0.4 这个 IP 地址的服务器上。如果你使用安全,请按照上面的方式配置好证书。否则在上面的配置中,安全的部分可以略去。在上面的配置中,我们并没有使用任何的 filter 来对数据进行任何的处理。我们在 rsyslog 中已经采用 JSON template 对数据进行了结构化处理。如果我们需要对数据更进一步处理,那么我们可以采用 filter 来操作。
根据定义,系统日志协议是 UDP,因此此配置反映了该标准。
在输入块中,通过将 logstash_private_ip 替换为 rsyslog 服务器的私有 IP 地址来设置 Logstash 主机地址,该服务器上也安装了 Logstash。
输入块将 Logstash 配置为侦听端口 50515,因此它不会与同一台机器上的 syslog 实例竞争。 小于 1024 的端口需要以 root 身份运行 Logstash,这不是一个好的安全实践。
请务必将 elasticsearch_private_ip 替换为您的 Elasticsearch 服务器的私有 IP 地址。 输出块显示了一个简单的条件配置。 它的目标是只允许匹配的事件通过。 在这种情况下,这只是“类型”为“rsyslog”的事件。
我们可以使用如下的命令来启动 Logstash:
sudo service logstash restart
或者使用如下的命令来重新启动 rsyslog:
sudo service rsyslog restart
提示:要对 Logstash 进行故障排除,请使用 sudo service logstash stop 停止服务并在前台使用详细消息运行它:
/opt/logstash/bin/logstash -f /etc/logstash/conf.d/logstash.conf --verbose
它将包含常用信息,例如使用 Logstash 使用的 IP 地址和 UDP 端口进行验证:
Starting UDP listener {:address=>"192.168.0.4:50515", :level=>:info}
验证数据采集
我们打开 Kibana。我们需要为采集进来的数据创建一个叫做 logstash-* 的索引模式。我们可以在 Discover 中查看已经被导入的数据:
从上面,我们可以看出来显示的字段确实是我们使用 JSON template 进行处理的结构化数据。
我们可以使用 telnet 向中央 Rsyslog 服务器发送一些数据。我们通过 telnet 连接到中央 Rsyslog 服务器,并向它发送一些信息:
telnet 192.168.0.4 514
在上面,192.168.0.4 是我的 Rsyslog 中央服务器的地址。我们向它发送如下的日志信息:
<30>Aug 4 10:52:20 cooltest logstash[13329]: "type" &#61;> "rsyslog"
在上面&#xff0c;我们把 sysloghost 的值置为 nice。等一会儿&#xff0c;我们在 Kibana 的界面来进行搜索&#xff1a;
我们可以在 /var/log/syslog 中查看到最新生成的 log 信息&#xff1a;
在 Kibana 中&#xff0c;我们可以查看到如下的信息&#xff1a;
这说明&#xff0c;我们的 Rsyslog 中央服务器是正常工作的。
你的日志现在在 Elasticsearch 中。 下一步是什么&#xff1f; 考虑阅读 Kibana 可以做什么来可视化 Elasticsearch 中的数据&#xff0c;包括折线图和条形图、饼图、地图等。 Kibana 101&#xff1a;可视化入门解释了如何使用 Kibana Web 界面来搜索和可视化日志。