1、Logstash是一个数据收集引擎,相当于是ETL工具。截图来源慕课,尊重版本从你我做起。
Logstash分为三个阶段,第一个阶段Input是数据采集、第二个阶段是Filter数据解析和转换,第三个阶段是Output数据输出。
2、Logstash中的Pipeline概念。
1)、Pipeline是指Input-filter-output的三个阶段处理流程。
2)、队列管理。
3)、插件生命周期管理。3、Logstash中的Logstash Event概念。
1)、内部流转的数据表现形式,原始数据从Input进入之后,在内部流转的时候不是原始的数据,而是Logstash Event数据。Logstash Event就是一个Java Object,对外暴漏去修改或者获取内部字段的api
2)、原始数据在input被转换为Event,在output event被转换为目标格式数据。
3)、在配置文件中可以对Event中的属性进行增删改查。
关于数据流转,数据由Input进入之后,从Output输出。Codec(Code、Decode)将原始数据Data转换为Logstash Event,当数据输出的时候,Codec将Logstash Event转换为目标数据源需要的类型Data。
4、Logstash的安装,将包上传到服务器进行解压缩即可,如下所示:
1 tar -zxvf logstash-6.7.1.tar.gz -C /usr/local/soft/
Logstash的简单案例,如下所示:
1 # 输入,stdin是标准输入,按照每一行切分数据2 input {3 stdin {4 codec => line5 }6 }7 8 # 过滤为空9 filter {}
10
11 # 输出,stdout标准输出,将输出转换为json格式
12 output {
13 stdout {
14 codec => json
15 }
16 }
以每行进行切分数据,不过滤,然后输出为json格式的输入(Codec- Input Decoding)转换流程,如下所示:
当输入之后,要进行输出(Codec- Output Encoding),如下所示:
演示效果,如下所示:
1 [elsearch@k8s-master logstash-6.7.1]$ echo "foo2 > bar3 > " | bin/logstash -f config/line-to-json.conf4 Sending Logstash logs to /usr/local/soft/logstash-6.7.1/logs which is now configured via log4j2.properties5 [2021-01-30T19:11:13,330][INFO ][logstash.setting.writabledirectory] Creating directory {:setting=>"path.queue", :path=>"/usr/local/soft/logstash-6.7.1/data/queue"}6 [2021-01-30T19:11:13,357][INFO ][logstash.setting.writabledirectory] Creating directory {:setting=>"path.dead_letter_queue", :path=>"/usr/local/soft/logstash-6.7.1/data/dead_letter_queue"}7 [2021-01-30T19:11:14,189][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified8 [2021-01-30T19:11:14,258][INFO ][logstash.runner ] Starting Logstash {"logstash.version"=>"6.7.1"}9 [2021-01-30T19:11:14,396][INFO ][logstash.agent ] No persistent UUID file found. Generating new UUID {:uuid=>"e449e4d8-b26e-49fa-b12d-3d452b50aac4", :path=>"/usr/local/soft/logstash-6.7.1/data/uuid"}
10 [2021-01-30T19:11:24,816][INFO ][logstash.pipeline ] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>2, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50}
11 [2021-01-30T19:11:24,943][INFO ][logstash.pipeline ] Pipeline started successfully {:pipeline_id=>"main", :thread=>"#
12 [2021-01-30T19:11:25,080][INFO ][logstash.agent ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
13 {"@version":"1","host":"k8s-master","message":"foo","@timestamp":"2021-01-30T11:11:25.063Z"}{"@version":"1","host":"k8s-master","message":"bar","@timestamp":"2021-01-30T11:11:25.102Z"}{"@version":"1","host":"k8s-master","message":"","@timestamp":"2021-01-30T11:11:25.102Z"}[2021-01-30T19:11:25,394][INFO ][logstash.pipeline ] Pipeline has terminated {:pipeline_id=>"main", :thread=>"#
14 [2021-01-30T19:11:25,516][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600}
15 [elsearch@k8s-master logstash-6.7.1]$