作者:小子少耍酷10 | 来源:互联网 | 2023-09-05 03:50
Flume的简介
Flume是一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统。
◆ Flume可以采集文件,socket数据包、文件、文件夹、kafka等各种形式源数据,又可以将采集到的数据(下沉sink)输出到HDFS、hbase、hive、kafka等众多外部存储系统中
◆ 一般的采集需求,通过对flume的简单配置即可实现
◆ Flume针对特殊场景也具备良好的自定义扩展能力,
因此,flume可以适用于大部分的日常数据采集场景
Flume的运行机制
1、 Flume分布式系统中最核心的角色是agent,flume采集系统就是由一个个agent所连接起来形成
2、 每一个agent相当于一个数据传递员,内部有三个组件:
a) Source:采集组件,用于跟数据源对接,以获取数据
b) Channel:传输通道组件,用于从source将数据传递到sink
c) Sink:下沉组件,用于往下一级agent传递数据或者往最终存储系统传递数据
Flume的安装
Flume的安装在此不赘述.
Flume的简单测试(以CDH版为例)
1.需要在CDH的界面查看Flume的Agent代理名称,这个代理名称涉及后续conf文件的编辑,非常重要
步骤:CDH主界面->Flume->配置->代理名称
2.创建被监控文件以及配置文件的编写(以与kafka集成为例)
2.1:创建被监控的文件
在装有Flume的节点上创建文件
vi /opt/a.txt
2.2:需要在安装了Flume的节点上编写conf配置文件,配置文件的具体位置由自己定义,自己记得就好.
示例:vi /home/testFlume.conf(必须以.conf结尾)
文件内容如下,注意这里的tier1就是在CDH上查看到的代理名称,因人而异,有的是a1等等:
tier1.sources = r1
tier1.sinks = k1
tier1.channels = c1
tier1.sources.r1.type = exec
tier1.sources.r1.command = tail -F /opt/a.txt
tier1.sources.r1.shell = /bin/sh -c
tier1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
tier1.sinks.k1.topic = test
tier1.sinks.k1.brokerList = kafka1:9092,kafka2:9092
tier1.sinks.k1.requiredAcks = 1
tier1.sinks.k1.batchSize = 20
tier1.sinks.k1.channel = c1
tier1.channels.c1.type = memory
tier1.channels.c1.capacity = 1000000
tier1.channels.c1.transactiOnCapacity= 10000
tier1.sources.r1.channels = c1
tier1.sinks.k1.channel = c1
2.3:在装有kafka的节点创建topic(与配置文件中保持一致)
kafka-topics --create --zookeeper zk1:2181,zk2:2181,zk3:2181 --replication-factor 1 --partitions 1 --topic test
2.4:创建消费者
(从头开始消费)
kafka-console-consumer --bootstrap-server kafka1:9092,kafka2:9092 --from-beginning --topic test
(非从头开始消费)
kafka-console-consumer --bootstrap-server kafka1:9092,kafka2:9092 --topic test
2.5:在装有flume的节点启动flume
flume-ng agent -n tier1 -c /opt/cm-5.11.2/run/cloudera-scm-agent/process/767-flume-AGENT/ -f /home/testFlume.conf -Dflume.root.logger=INFO,console
注释:
tier1:是agent的代理名称
/opt/cm-5.11.2/run/cloudera-scm-agent/process/767-flume-AGENT/:是通过命令行 find / -name flume.conf 找到的(如果不是CDH版的找到flume的conf目录,一般是/opt/apache-flume-1.6.0-bin/conf/)
/home/testFlume.conf:自己编写的配置文件
2.6:向被监控的文件中追加数据
echo hello flume >>/opt/a.txt
2.7:观察装有kafka节点的消费者控制台是否hello flume
Flume数据丢失和数据重复
1.提高flume的版本,1.7版本之后,可以解决断点续传和数据重复问题
-----1.7版本的flume提供Taildir Source方式
它定期以JSON格式写入给定位置文件上每个文件的最后读取位置。如果Flume由于某种原因stop或down,它可以从文件position处重新开始tail。
具体的配置:
pro.sources = s1
pro.channels = c1
pro.sinks = k1
pro.sources.s1.type = TAILDIR
pro.sources.s1.positiOnFile= /home/dev/flume/flume-1.8.0/log/taildir_position.json
pro.sources.s1.filegroups = f1
pro.sources.s1.filegroups.f1 = /home/dev/log/moercredit/logstash.log
pro.sources.s1.headers.f1.headerKey1 = aaa
pro.sources.s1.fileHeader = true
pro.channels.c1.type = memory
pro.channels.c1.capacity = 1000
pro.channels.c1.transactiOnCapacity= 100
pro.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
pro.sinks.k1.kafka.topic = moercredit_log_test
pro.sinks.k1.kafka.bootstrap.servers = cdh1:9092,cdh2:9092,cdh3:9092
pro.sinks.k1.kafka.flumeBatchSize = 20
pro.sinks.k1.kafka.producer.acks = 1pro.sinks.k1.kafka.producer.linger.ms = 1
pro.sinks.k1.kafka.producer.compression.type = snappy
pro.sources.s1.channels = c1
pro.sinks.k1.channel = c1
2.如果不是1.7版本的flume
将channels.type方式从memory改成file
memory的形式是基于内存缓冲数据
file的形式是设置检查点
两者的区别:
type选择memory时Channel的性能最好,但是如果Flume进程意外挂掉可能会丢失数据;type选择file时Channel的容错性更好,但是性能上会比memory channel差。
具体配置:
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /xuyi/a.txt
a1.sources.r1.shell = /bin/sh -c
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = test
a1.sinks.k1.brokerList = kafka1:9092,kafka2:9092
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.batchSize = 20
a1.sinks.k1.channel = c1
#a1.channels.c1.type = memory
a1.channels.c1.type = file
#这两个文件夹最好手动创建
a1.channels.c1.checkpointDir = /opt/checkpointDir
a1.channels.c1.dataDirs = /opt/dataDirs
a1.channels.c1.capacity = 1000000
a1.channels.c1.transactiOnCapacity= 10000
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
参考地址: http://rdc.hundsun.com/portal/article/941.html https://blog.csdn.net/Abysscarry/article/details/89420560