Flume
Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统, Flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。
架构
运行机制:
Flume 的核心是把数据从数据源(source)收集过来,在将收集到的数据送到指定的目的地(sink)。为了保证输送的过程一定成功,在送到目的地(sink)之前,会先缓存数据(channel),待数据真正到达目的地(sink)后,flume 在删除自己缓存的数据。
核心的角色是 agent, agent 本身是一个 Java 进程, 一般运行在日志收集节点。 flume 采集系统就是由一个个 agent 所连接起来形成。
agent的三个组件:
Source:
Source 是负责接收数据到 Flume Agent 的组件。Source 组件可以处理各种类型、各种格式的日志数据,包括 avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy。
Channel:
Channel 是位于 Source 和 Sink 之间的缓冲区。因此,Channel 允许 Source 和 Sink 运作在不同的速率上。Channel 是线程安全的,可以同时处理几个 Source 的写入操作和几个Sink 的读取操作。
Flume 自带两种 Channel:Memory Channel 和 File Channel 以及 Kafka Channel。
Memory Channel 是内存中的队列。Memory Channel 在不需要关心数据丢失的情景下适用。如果需要关心数据丢失,那么 Memory Channel 就不应该使用,因为程序死亡、机器宕机或者重启都会导致数据丢失。
File Channel 将所有事件写到磁盘。因此在程序关闭或机器宕机的情况下不会丢失数据。
Sink:
Sink 不断地轮询 Channel 中的事件且批量地移除它们,并将这些事件批量写入到存储或索引系统、或者被发送到另一个 Flume Agent。
Sink 组件目的地包括 hdfs、logger、avro、thrift、ipc、file、HBase、solr、自定义。
Event
传输单元,Flume 数据传输的基本单元,以 Event 的形式将数据从源头送至目的地。Event 由 Header 和 Body 两部分组成,Header 用来存放该 event 的一些属性,为 K-V 结构,Body 用来存放该条数据,形式为字节数组。
安装:
1.解压
2.修改flume-env.sh
添加java环境变量
3.验证
flume-ng version
案例:
nc
netcat 源
1.flume配置文件
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# 对当前agent的命名组件 a1:当前agent的名字 如果在同一节点有多个agent
# 需要区别开 source,sink,channel后边加s说明可能会有多个组件# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# 描述和配置当前的source 监听的节点和端口# Describe the sink
a1.sinks.k1.type = logger
# sink的类型是log# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# c1类型是内存级别 缓冲大小阈值单位:事件 一次传输的事件量# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
# 绑定source和sink到channel sink只能绑定一个channel 所以后面没有s
flume启动命令:
flume-ng agent --conf-file 配置文件 --name a1 -Dflume.root.logger=INFO,console
#agent:启动一个agent
#Dflume.root.logger=INFO,console 打印到控制台 不常用
flume启动后相当于开启了一个服务端
在另一个会话页面:
nc localhost 44444
相当于开启了一个客户端
此时在客户端输入就会在服务端以log形式打印到控制台
案例2
利用exec源监控某个文件
Exec Source在启动时运行给定的Unix命令,并期望进程在标准输出上产生连续的数据(除非属性logStdErr设置为true,否则stderr将被丢弃)。 如果进程由于任何原因退出,source也会退出,并且不会生成更多数据。
a1.sources = r1
a1.sinks = k1
a1.channels = c1a1.sources.r1.type = exec
a1.sources.r1.command = tail -f a1.sinks.k1.type = loggera1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
追加内容到要监控的文件
cat 2.txt >> flume.log
案例3:
flume-hdfs
flume要想将数据输出到hdfs,需要有hadoop相关jar包
flume官方手册
http://flume.apache.org/releases/content/1.7.0/FlumeUserGuide.html
滚动文件:rollsize 设为 hdfs块大小 128mb
滚动文件夹:用处:一天滚动一个文件夹
可以配合hive分区 按天分区load数据就会很方便
a2.sources = r2
a2.sinks = k2
a2.channels = c2a2.sources.r2.type = exec
a2.sources.r2.command = tail -F /opt/test.log
a2.sources.r2.shell = /bin/bash -c #解析方式a2.sinks.k2.type = hdfs
a2.sinks.k2.hdfs.path = hdfs://192.168.56.20:9000/flume/%Y%m%d/%Ha2.sinks.k2.hdfs.round = true # 按照时间滚动文件夹
a2.sinks.k2.hdfs.roundValue = 1 # 多长时间创建一个新文件夹
a2.sinks.k2.hdfs.roundUnit = hour # 重新定义时间单位a2.sinks.k2.hdfs.useLocalTimeStamp = true # 使用本地时间戳a2.sinks.k2.hdfs.batchSize = 1000 # 积攒多少个Event flush到hdfs一次
a2.sinks.k2.hdfs.fileType = DataStream # 设置文件类型
a2.sinks.k2.hdfs.rollIntreval = 60 # 多久生成一个新文件 s
a2.sinks.k2.hdfs.rollSize = 134217700 # 文件滚动与Event数量无关 设置大小比一个hdfs块128MB稍小
a2.sinks.k2.hdfs.rollCount = 0 # 文件滚动与Event数量无关a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2
案例4:
监控多个文件
spooldir source
通过此源,您可以通过将要摄取的文件放入磁盘上的“Spooling”目录中来摄取数据。该源将监视指定目录中的新文件,并从出现的新文件中解析事件。事件解析逻辑是可插入的。将给定文件完全读入通道后,将其重命名以指示完成(或选择删除)。
与Exec源不同,此源是可靠的,即使Flume重新启动或终止,它也不会丢失数据。为了获得这种可靠性,必须仅将不可变的唯一命名的文件放入Spooling目录中。Flume尝试检测这些问题情况,如果违反这些条件,将返回失败:
如果将文件放入Spooling目录后写入文件,Flume将在其日志文件中打印错误并停止处理。
如果以后再使用文件名,Flume将在其日志文件中打印错误并停止处理。
为避免上述问题,将唯一的标识符(例如时间戳)添加到日志文件名称(当它们移到Spooling目录中时)可能会很有用。
尽管有此来源的可靠性保证,但是在某些情况下,如果发生某些下游故障,则事件可能会重复。这与Flume其他组件提供的保证是一致的。
a2.sources = r2
a2.sinks = k2
a2.channels = c2a2.sources.r2.type = spooldir
a2.sources.r2.spoolDir = /opt/module/flume/upload a2.sinks.k2.type = hdfs
a2.sinks.k2.hdfs.path = hdfs://192.168.56.20:9000/flume/%Y%m%d/%Ha2.sinks.k2.hdfs.round = true # 按照时间滚动文件夹
a2.sinks.k2.hdfs.roundValue = 1 # 多长时间创建一个新文件夹
a2.sinks.k2.hdfs.roundUnit = hour # 重新定义时间单位a2.sinks.k2.hdfs.useLocalTimeStamp = true # 使用本地时间戳a2.sinks.k2.hdfs.batchSize = 1000 # 积攒多少个Event flush到hdfs一次
a2.sinks.k2.hdfs.fileType = DataStream # 设置文件类型
a2.sinks.k2.hdfs.rollIntreval = 60 # 多久生成一个新文件 s
a2.sinks.k2.hdfs.rollSize = 134217700 # 文件滚动与Event数量无关 设置大小比一个hdfs块128MB稍小
a2.sinks.k2.hdfs.rollCount = 0 # 文件滚动与Event数量无关a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2
先上传 后改名为已读
但是不能动态监控变化的文件
案例5:
监控动态多文件
1.7版本 Talldir实现断点续传
在通过Flume收集日志的业务场景中,一般都会遇到下面的情况,在日志收集服务器的某个目录下,会按照一段时间生成一个日志文件,并且日志会不断的追加到这个文件中,比如,每小时一个命名规则为log_20151015_10.log的日志文件,所有10点产生的日志都会追加到这个文件中,到了11点,就会生成另一个log_20151015_11.log的文件。
这种场景如果通过flume(1.6)收集,当前提供的Spooling Directory Source和Exec Source均不能满足动态实时收集的需求,在当前正在开发的flume1.7版本中,提供了一个非常好用的TaildirSource,使用这个source,可以监控一个目录,并且使用正则表达式匹配该目录中的文件名进行实时收集。
a1.sources.r1.type = TAILDIRa1.sources.r1.filegroups = f1a1.sources.r1.filegroups.f1 = 第一个路径a1.sources.r1.positionFile = 路径
位置文件:
实现断点续传 json格式 inode(linux文件系统文件标识)记录了被监控文件位置信息