目录
Flume概述
应用场景
Flume优势
Flume架构
Flume与Kafka对比
Flume+Kafka双剑合璧构建大数据平台日志采集
Flume安装配置
flume是由cloudera软件公司产出的可分布式日志收集系统,后与2009年被捐赠了apache软件基金会,为hadoop相关组件之一。尤其近几年随着flume的不断被完善以及升级版本的逐一推出,特别是flume-ng;同时flume内部的各种组件不断丰富,用户在开发的过程中使用的便利性得到很大的改善,现已成为apache top项目之一。apache Flume是一个从可以收集例如日志,事件等数据资源,并将这些数量庞大的数据从各项数据资源中集中起来存储的工具/服务,或者数集中机制。flume具有高可用,分布式,配置工具,其设计的原理也是基于将数据流,如日志数据从各种网站服务器上汇集起来存储到HDFS,HBase等集中存储器中。
比如我们在做一个电子商务网站,然后我们想从消费用户中访问点特定的节点区域来分析消费者的行为或者购买意图. 这样我们就可以更加快速的将他想要的推送到界面上,实现这一点,我们需要将获取到的她访问的页面以及点击的产品数据等日志数据信息收集并移交给Hadoop平台上去分析。而Flume正是帮我们做到这一点。现在流行的内容推送,比如广告定点投放以及新闻私人定制也是基于次,不过不一定是使用FLume,毕竟优秀的产品很多,比如facebook的Scribe,还有Apache新出的另一个明星项目chukwa,还有淘宝Time Tunnel。
1. Flume可以将应用产生的数据存储到任何集中存储器中,比如HDFS,HBase(本次设计理念:HDFS存储最原始采集到的数据,hdfs存储清洗过的数据)
2.当收集数据的速度超过将写入数据的时候,也就是当收集信息遇到峰值时,这时候收集的信息非常大,甚至超过了系统的写入数据能力,这时候,Flume会在数据生产者和数据收容器间做出调整,保证其能够在两者之间提供一共平稳的数据。
3.提供上下文路由特征。
4.Flume的管道是基于事务,保证了数据在传送和接收时的一致性。
5.Flume是可靠的,容错性高的,可升级的,易管理的,并且可定制的。
Flume具有以下特征:
1.Flume可以高效率的将多个网站服务器中收集的日志信息存入HDFS/HBase中。
2.使用Flume,我们可以将从多个服务器中获取的数据迅速的移交给Hadoop中。
3.除了日志信息,Flume同时也可以用来接入收集规模宏大的社交网络节点事件数据,比如facebook,twitter,电商网站如亚马
逊,flipkart等。
4.支持各种接入资源数据的类型以及接出数据类型。
5.支持多路径流量,多管道接入流量,多管道接出流量,上下文路由等。
6.可以被水平扩展。
1)外部结构
上图所示,数据发生器(如:facebook,twitter)产生的数据被被单个的运行在数据发生器所在服务器上的agent所收集,之后数据收容器从各个agent上汇集数据并将采集到的数据存入到HDFS或者HBase中。
2)Flume 事件--event
事件作为Flume内部数据传输的最基本单元.它是由一个转载数据的字节数组(该数据组是从数据源接入点传入,并传输给传输器,也就是HDFS/HBase)和一个可选头部构成。典型的Flume 事件如下面结构所示:
我们在将event在私人定制插件时比如:flume-hbase-sink插件是,获取的就是event然后对其解析,并依据情况做过滤等,然后在传输给HBase或者HDFS。
flume的核心是把数据从数据源(source)收集过来,在将收集到的数据送到指定的目的地(sink)。为了保证输送的过程一定成功,在送到目的地(sink)之前,会先缓存数据(channel),待数据真正到达目的地(sink)后,flume再删除自己缓存的数据。 在整个数据的传输的过程中,流动的是event,即事务保证是在event级别进行的。那么什么是event呢?event将传输的数据进行封装,是flume传输数据的基本单位,如果是文本文件,通常是一行记录,event也是事务的基本单位。event从source,流向channel,再到sink,本身为一个字节数组,并可携带headers(头信息)信息。event代表着一个数据的最小完整单元,从外部数据源来,向外部的目的地去。
一个完整的event包括:event headers、event body、event信息(即文本文件中的单行记录),如下所以:
其中event信息就是flume收集到的日记记录。
3)Flume Agent
我们在了解了Flume的外部结构之后,知道了Flume内部有一个或者多个Agent,然而对于每一个Agent来说,它就是一个独立的守护进程(JVM),它从客户端那儿接收收集,或者从其他的Agent哪儿接收,然后迅速的将获取的数据传给下一个目的节点sink,或者agent。如图所示flume的基本模型 :
flume之所以这么神奇,是源于它自身的一个设计,这个设计就是agent,agent本身是一个java进程,运行在日志收集节点—所谓日志收集节点就是服务器节点。
Agent主要由:source,channel,sink三个组件组成。
①source:source组件是专门用来收集数据的,从数据发生器接收数据,并将接收的数据以Flume的event格式传递给一个或者多个通道channal,Flume提供多种数据接收的方式,比如avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy、自定义。
②channal是一种短暂的存储容器,它将从source处接收到的event格式的数据缓存起来,直到它们被sinks消费掉,它在source和sink间起着一共桥梁的作用,channal是一个完整的事务,这一点保证了数据在收发的时候的一致性。并且它可以和任意数量的source和sink链接。支持的类型有: JDBC channel,File System channel,Memort channel等。
③sink:将数据存储到集中存储器比如Hbase和HDFS,它从channals消费数据(events)并将其传递给目标地。目标地可能是另一个sink,也可能HDFS,HBase。
它的组合形式举例:
Flume的主要插件:
1.Interceptors拦截器
用于source和channel之间,用来更改或者检查Flume的events数据。
2.管道选择器channels Selectors
在多管道是被用来选择使用那一条管道来传递数据(events)。管道选择器又分为如下两种:
默认管道选择器: 每一个管道传递的都是相同的events多路复用通道选择器: 依据每一个event的头部header的地址选择管道。
3.sink线程
用于激活被选择的sinks群中特定的sink,用于负载均衡。
考虑单一应用场景,从简化系统的角度考虑,在满足应用需求的情况下可能只使用一个比较好。但是考虑到现有系统业务发展,为了后面的灵活扩展,在先用系统设计时留有一定的扩展性感觉更重要,可能使用Flume+kafka架构相对只使用Kafka会多占用1-2台机器做Flume日志采集,但是为了方便以后日志数据处理方式的扩展,可以采用Flume+kafka架构。
Flume :管道 ----个人认为比较适合有多个生产者场景,或者有写入Hbase、HDFS和kafka需求的场景。
Kafka :消息队列-----由于Kafka是Pull模式,因此适合有多个消费者的场景。
目前应用场景,一台日志转发机负责产生日志。后端需要通过Strom消费日志信息,建议可以设置成log-->Kafka->Strom.如果以后有写入Hbase或者HDFS的需求可以,在Kafka后面再接上Strom,或者在日志转发机上直接日志落地,由Flume去读取日志消息。
大数据平台每天会产生大量的日志,处理这些日志需要特定的日志系统。
一般而言,这些系统需要具有以下特征:
①构建应用系统和分析系统的桥梁,并将它们之间的关联解耦;
②支持近实时的在线分析系统和类似于Hadoop之类的离线分析系统;
③具有高可扩展性。即:当数据量增加时,可以通过增加节点进行水平扩展。
为此建议将日志采集分析系统分为如下几个模块:
①数据采集模块:负责从各节点上实时采集数据,建议选用Flume-NG来实现。
②☆☆☆☆☆数据接入模块:由于采集数据的速度和数据处理的速度不一定同步,因此添加一个消息中间件来作为缓冲,建议选用Kafka来实现。
③流式计算模块:对采集到的数据进行实时分析,建议选用Storm来实现。
④数据输出模块:对分析后的结果持久化,可以使用HDFS、MySQL等。
1)日志采集选型--Flume:
a.Flume组件特点:
Flume是一个分布式、可靠、高可用的海量日志采集、聚合和传输的日志收集系统。支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。
b.Flume的设计目标
可靠性:
Flume的核心是把数据从数据源收集过来,再送到目的地。为了保证输送一定成功,在送到目的地之前,会先缓存数据,
待数据真正到达目的地后,删除自己缓存的数据。Flume 使用事务性的方式保证传送Event整个过程的可靠性。
可扩展性:
Flume中只有一个角色Agent,其中包含Source、Sink、Channel三种组件。一个Agent的Sink可以输出到另一个
Agent的Source。这样通过配置可以实现多个层次的流配置。
功能可扩展性:
Flume自带丰富的Source、Sink、Channel实现。用户也可以根据需要添加自定义的组件实现, 并在配置中使用起来。
除了单Agent的架构外,还可以将多个Agent组合起来形成多层的数据流架构:
①多个Agent顺序连接:将多个Agent顺序连接起来,将最初的数据源经过收集,存储到最终的存储系统中。一般情况下,应该控制这种顺序连接的Agent的数量,因为数据流经的路径变长了,如果不考虑Failover的话,出现故障将影响整个Flow上的Agent收集服务。
②多个Agent的数据汇聚到同一个Agent:这种情况应用的场景比较多,适用于数据源分散的分布式系统中数据流汇总。
③多路(Multiplexing)Agent:多路模式一般有两种实现方式,一种是用来复制,另一种是用来分流。复制方式可以将最前端的数据源复制多份,分别传递到多个Channel中,每个Channel接收到的数据都是相同的。分流方式,Selector可以根据Header的值来确定数据传递到哪一个Channel。
④实现Load Balance功能:Channel中Event可以均衡到对应的多个Sink组件上,而每个Sink组件再分别连接到一个独立的Agent上,这样可以实现负载均衡。
2)日志采集选型--Kafka:
Producer(生产者)->Broker(经纪人)->Consumer(消费者)
a.Kafka组件特点:
kafka实际上是一个消息发布订阅系统。Producer向某个Topic发布消息,而Consumer订阅某个Topic的消息。一旦有新的关于某个Topic的消息,Broker会传递给订阅它的所有Consumer。
b.Kafka的设计目标:
①数据在磁盘上的存取代价为O(1)
Kafka以Topic来进行消息管理,每个Topic包含多个Partition,每个Partition对应一个逻辑log,由多个Segment组成。每个Segment中存储多条消息。消息id由其逻辑位置决定,即从消息id可直接定位到消息的存储位置,避免id到位置的额外映射。
②为发布和订阅提供高吞吐量
Kafka每秒可以生产约25万消息(50 MB),每秒处理55万消息(110 MB)。
③分布式系统,易于向外扩展
所有的Producer、Broker和Consumer都会有多个,均为分布式的。无需停机即可扩展机器。
c.Kafka的架构
Kafka是一个分布式的、可分区的、可复制的消息系统,维护消息队列。
Kafka的整体架构非常简单,是显式分布式架构,Producer、Broker和Consumer都可以有多个。
Producer,consumer实现Kafka注册的接口,数据从Producer发送到Broker,Broker承担一个中间缓存和分发的作用。
Broker分发注册到系统中的Consumer。Broker的作用类似于缓存,即活跃的数据和离线处理系统之间的缓存。客户端和服务器端的通信,是基于简单、高性能、且与编程语言无关的TCP协议。
3)Flume与Kafka的比较
Flume和Kafka都是优秀的日志系统,其都能实现数据采集、数据传输、负载均衡、容错等一系列的需求, 但是两者之间还是有着一定的差别。
由以上可以看出Flume和Kafka还是各有优缺点的,所以要配合使用:
Flume适用于没有编程的配置解决方案,由于提供了丰富的source、channel、sink实现,各种数据源的引入只是配置变更就可实现。Kafka 适用于对数据管道的吞吐量、可用性要求都很高的解决方案,基本需要编程实现数据的生产和消费。
总结:☆☆☆☆☆
建议采用Flume作为数据的生产者,这样可以不用编程就实现数据源的引入,并采用Kafka Sink作为数据的消费者,这样可以得到较高的吞吐量和可靠性。如果对数据的可靠性要求高的话,可以采用Kafka Channel来作为Flume的Channel使用。
Flume对接Kafka:
Flume作为消息的生产者,将生产的消息数据(日志数据、业务请求数据等)通过Kafka Sink发布到Kafka中。
以下是对接配置:
对接示例:
假设现有Flume实时读取/data1/logs/component_role.log的数据并导入到Kafka的mytopic主题中:
环境预设为:
Zookeeper 的地址为 zdh100:2181 zdh101:2181 zdh102:2181
Kafka broker的地址为 zdh100:9092 zdh101:9092 zdh102:9093
配置Flume agent,如下修改Flume配置:
gent1.sources = logsrc
agent1.channels = memcnl
agent1.sinks = kafkasink
#source section
agent1.sources.logsrc.type = exec
agent1.sources.logsrc.command = tail -F /data1/logs/component_role.log
agent1.sources.logsrc.shell = /bin/sh -c
agent1.sources.logsrc.batchSize = 50
agent1.sources.logsrc.channels = memcnl
# Each sink's type must be defined
agent1.sinks.kafkasink.type = org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.kafkasink.brokerList=zdh100:9092, zdh101:9092,zdh102:9092
agent1.sinks.kafkasink.topic=mytopic
agent1.sinks.kafkasink.requiredAcks = 1
agent1.sinks.kafkasink.batchSize = 20
agent1.sinks.kafkasink.channel = memcnl
# Each channel's type is defined.
agent1.channels.memcnl.type = memory
agent1.channels.memcnl.capacity = 1000
启动该Flume节点:
/home/mr/flume/bin/flume-ng agent -c
/home/mr/flume/conf -f /home/mr/flume/conf/flume-conf.properties -n agent1 -Dflume.monitoring.type=
http -Dflume.monitoring.port=10100
动态追加日志数据,执行命令向 /data1/logs/component_role.log 添加数据:
echo "测试代码" >> /data1/logs/component_role.log
echo "检测Flume+Kafka数据管道通畅" >> /data1/logs/component_role.log
验证Kafka数据接收结果,执行命令检查Kafka收到的数据是否正确,应该可以呈现刚才追加的数据:
/home/mr/kafka/bin/kafka-console-consumer.sh --zookeeper zdh100:2181 --topic mytopic --from-beginning
输出结果如下:
1)flume单节点安装
apache-flume-1.8.0-bin.tar.gz ---2017.10.04号发布
cd /usr/software
解压:tar -zxvf apache-flume-1.8.0-bin.tar.gz
ln -s apache-flume-1.8.0-bin flume
编辑环境变量:
vim /etc/profile
export JAVA_HOME
export HADOOP_HOME
export HBASE_HOME
export HIVE_HOME
export FLUME_HOME=/usr/software/flume
export PATH
source /etc/profile
配置flume-env.sh添加Java路径:
cd /usr/software/flume/conf
cp -r flume-env.sh.template flume-env.sh
chmod 777 flume-env.sh
vim flume-env.sh
export JAVA_HOME=/usr/software/java/jdk1.8.0_152
配置完成后版本测试:flume-ng version
编辑$flume_home/conf/flume-conf.properties
cp -r flume-conf.properties.template flume-conf.properties
vim flume-conf.properties
#agent1 name
agent1.sources=source1
agent1.sinks=sink1
agent1.channels=channel1
#Spooling Directory
#set source1
agent1.sources.source1.type=spooldir
agent1.sources.source1.spoolDir=/usr/software/flume/dir/logdfs #本地url
agent1.sources.source1.channels=channel1
agent1.sources.source1.fileHeader = false
agent1.sources.source1.interceptors = i1
agent1.sources.source1.interceptors.i1.type = timestamp
#set sink1
agent1.sinks.sink1.type=hdfs
agent1.sinks.sink1.hdfs.path=/home/hdfs/flume/logdfs #HDFS url
agent1.sinks.sink1.hdfs.fileType=DataStream
agent1.sinks.sink1.hdfs.writeFormat=TEXT
agent1.sinks.sink1.hdfs.rollInterval=1
agent1.sinks.sink1.channel=channel1
agent1.sinks.sink1.hdfs.filePrefix=%Y-%m-%d
#rollInterval:默认值:30。hdfs sink间隔多长将临时文件滚动成最终目标文件,单位:秒;
如果设置成0,则表示不根据时间来滚动文件;
注:滚动(roll)指的是,hdfs sink将临时文件重命名成最终目标文件,并新打开一个临时文件来写入数据;
#filePrefix:默认值:FlumeData。 写入hdfs的文件名前缀,可以使用flume提供的日期及%{host}表达式。
#set channel1
agent1.channels.channel1.type=file
agent1.channels.channel1.checkpointDir=/usr/software/flume/dir/logdfstmp/point #本地url
agent1.channels.channel1.dataDirs=/usr/software/flume/dir/logdfstmp #本地url
#agent1.channels.channel1.capacity=1000 #当channel的类型是memory时,激活capacity,存储容量为1000
#agent.channels.c1.transactionCapacity=100 #转换容量
编辑完保存flume-conf.properties
添加执行权限:chmod 777 flume-conf.properties
单点配置完成,copy到其它节点
scp -r apache-flume-1.8.0-bin hadoop02@hadoop02:/home/hadoop02
记得其它节点的变量也要编辑vim /etc/profile,编辑完之后source /etc/profile
logdfs目录下创建文件test-flume.txt,写入内容flume1+kafka+hadoop+hive+sqoop+mysql+spring+js+ajax+html
启动flume:
bin/flume-ng agent -n agent1 -c conf -f conf/flume-conf.properties -Dflume.root.logger=DEBUG,console
-c (--conf) : flume的conf文件路径
-f (--conf-file) : 自定义的flume配置文件
-n (--name): 自定义的flume配置文件中agent的name
注:命令中的agent1表示配置文件中的Agent的Name,如配置文件中的agent1。flume-conf.properties表示
配置文件所在配置,需填写准确的配置文件路径。
注:配置中的目录若不存在,需在HDFS中提前创建。
启动成功!以前的txt变身完毕
hdfs出现相应文件
hdfs dfs -cat /home/hdfs/flume/logdfs/2018-05-17.1526548481565
把hadoop01中的配置文件flume-conf.properties copy到其它机器:
scp -r flume-conf.properties hadoop02@hadoop02:/home/hadoop02
scp -r flume-conf.properties hadoop03@hadoop03:/home/hadoop03
scp -r flume-conf.properties hadoop04@hadoop04:/home/hadoop04
scp -r flume-conf.properties hadoop05@hadoop05:/home/hadoop05
这样,每台机器都可以作为单点··
2)flume高可用配置部署
名称 HOST 角色
Agent1 hadoop01 Web Server
Agent2 hadoop02 Web Server
Agent3 hadoop03 Web Server
Collector1 hadoop04 AgentMstr1
Collector2 hadoop05 AgentMstr1
Agent1,Agent2,Agent3数据分别流入到Collector1和Collector2,Flume NG本身提供了Failover机制,
可以自动切换和恢复。
下图为flume高可用五台机器的架构图
根据上面的单点flume配置,基本配置完成,再添加两个配置文件,它们分别是flume-client.properties和flume-server.properties
cd /usr/software/flume/conf
touch flume-client.properties
touch flume-server.properties
开始配置:
配置Agent1,Agent2,Agent3,分别位于hadoop01~hadoop03三台机器,配置相同,把agent1->agengt2相应的变更,三个agent名如下所示:
注:一个agent1只能采集一个nginx的access.log的日志,所以想让agent2和agent3都采集到数据需要在hadoop02和hadoop03都配置nginx(以后做)
vim flume-client.properties
#agent1 name
agent1.channels = c1
agent1.sources = r1
agent1.sinks = k1 k2
#set gruop
agent1.sinkgroups = g1
#set channel
agent1.channels.c1.type = memory
agent1.channels.c1.capacity = 100000 #100kb,单位字节(bytes)
agent1.channels.c1.transactionCapacity = 10000 #10kb,单位字节(bytes)
agent1.channels.c1.keep-alive=30
agent1.sources.r1.channels = c1
agent1.sources.r1.type = exec
agent1.sources.r1.command = tail -F /usr/local/nginx/logs/access.log
#配置需要监控的日志输出目录
#agent1.sources.r1.interceptors = i1 i2
#agent1.sources.r1.interceptors.i1.type = static
#agent1.sources.r1.interceptors.i1.key = Type
#agent1.sources.r1.interceptors.i1.value = LOGIN
#agent1.sources.r1.interceptors.i2.type = timestamp
# set sink1
agent1.sinks.k1.channel = c1
agent1.sinks.k1.type = avro
agent1.sinks.k1.hostname = hadoop04 #指定Collector1
agent1.sinks.k1.port = 52020
# set sink2
agent1.sinks.k2.channel = c1
agent1.sinks.k2.type = avro
agent1.sinks.k2.hostname = hadoop05 #指定Collector2
agent1.sinks.k2.port = 52020
#set sink group
agent1.sinkgroups.g1.sinks = k1 k2
#set failover
agent1.sinkgroups.g1.processor.type = failover
agent1.sinkgroups.g1.processor.priority.k1 = 10 #优先权k1>k2
agent1.sinkgroups.g1.processor.priority.k2 = 1
agent1.sinkgroups.g1.processor.maxpenalty = 10000
保存退出,hadoop01~03为agent1~agent3,配置均一致。
注意:两个collector中ip/主机名不同需要修改,其它一致
编辑hadoop04下的配置文件
vim flume-server.properties
#set Agent name
collector1.sources = r1
collector1.channels = c1
collector1.sinks = k1
#set channel
collector1.channels.c1.type = memory
collector1.channels.c1.capacity = 1000000 #100kb,单位字节(bytes)
collector1.channels.c1.transactionCapacity = 100000 #10kb,单位字节(bytes)
collector1.channels.c1.keep-alive = 30 #单位是秒,默认是3秒。此参数用来控制channel满时影响source的发送,
channel空时影响sink的消费,就是等待时间,超过这个时间就甩异常,
# other node 一般不需配置,但是有些情况很有用,比如你得场景是每分钟开头集中发一次数据,
collector1.sources.r1.type = avro 这时每分钟的开头量可能比较大,后面会越来越小,这时你可以调大这个参数,不至
collector1.sources.r1.bind = hadoop04 于出现channel满了得情况;
collector1.sources.r1.port = 52020
#collector1.sources.r1.interceptors = i1
#collector1.sources.r1.interceptors.i1.type = static 就是batchSize一定不能大于transactionCapacity
#collector1.sources.r1.interceptors.i1.key = Collector
#collector1.sources.r1.interceptors.i1.value = hadoop04
collector1.sources.r1.channels = c1
#set sink to hdfs
collector1.sinks.k1.type=hdfs
collector1.sinks.k1.channel=c1 #采集后路径上传路径
collector1.sinks.k1.hdfs.useLocalTimeStamp = true #true代表使用当地时间戳
#collector1.sinks.k1.hdfs.path=/home/hdfs/flume/logdfs
collector1.sinks.k1.hdfs.path=/home/hdfs/flume/logdfs/%Y-%m-%d
collector1.sinks.k1.hdfs.minBlockReplicas=1 #默认值:HDFS副本数,写入HDFS文件块的最小副本数。
collector1.sinks.k1.hdfs.batchSize = 1000 #默认值:100,每个批次刷新到HDFS上的events数量;
collector1.sinks.k1.hdfs.fileType=DataStream
collector1.sinks.k1.hdfs.writeFormat=TEXT
collector1.sinks.k1.hdfs.callTimeout=600000 #callTimeout单位是毫秒600000毫秒=10分钟
collector1.sinks.k1.hdfs.rollInterval=0 #默认单位:秒。rollInterval的配置表示每个120秒回滚到下一个文件,hdfs sink间隔多长将临时文件滚动成最终目标文件
collector1.sinks.k1.hdfs.rollSize=52428800 #rollSize默认值:1024,当临时文件达到该大小(单位:bytes)时,
#1048576 byte=1MB,52428800=50MB 滚动成目标文件;如果设置成0,则表示不根据临时文件大小来滚动文件;
collector1.sinks.k1.hdfs.rollCount=0 #rollCount默认值:10,当events数据达到该数量时候,将临时文件滚动成目标文件;
collector1.sinks.k1.hdfs.round=true #round的理解是,每3分钟产生一个新文件
collector1.sinks.k1.hdfs.roundValue=10
collector1.sinks.k1.hdfs.roundUnit=minute
collector1.sinks.k1.hdfs.filePrefix=nginx-%H%M%S
collector1.sinks.k1.hdfs.fileSuffix=.log
#collector1.sinks.k1.hdfs.filePrefix=%Y-%m-%d
#collector1.sinks.k1.hdfs.inUseSuffix = .tmp #临时文件后缀名
collector1.sinks.k1.hdfs.idleTimeout = 600 #idleTimeout:默认值:0当目前被打开的临时文件在该参数指定的时间(秒)内,没有任何数据写入,则将该临时文件关闭并重命名成目标文件
编辑hadoop05下的配置文件
vim flume-server.properties
#set Agent name
collector2.sources = r1
collector2.channels = c1
collector2.sinks = k1
#set channel
collector2.channels.c1.type = memory
collector2.channels.c1.capacity = 1000000 #100kb,单位字节(bytes)
collector2.channels.c1.transactionCapacity = 100000 #10kb,单位字节(bytes)
collector2.channels.c1.keep-alive = 30
# other node,nna to nns
collector2.sources.r1.type = avro
collector2.sources.r1.bind = hadoop05
collector2.sources.r1.port = 52020
#collector2.sources.r1.interceptors = i1
#collector2.sources.r1.interceptors.i1.type = static
#collector2.sources.r1.interceptors.i1.key = Collector
#collector2.sources.r1.interceptors.i1.value = hadoop05
collector2.sources.r1.channels = c1
#set sink to hdfs
collector2.sinks.k1.type=hdfs
collector2.sinks.k1.channel=c1 #采集后路径上传路径
collector2.sinks.k1.hdfs.useLocalTimeStamp = true #true代表使用当地时间戳
#collector2.sinks.k1.hdfs.path=/home/hdfs/flume/logdfs
collector2.sinks.k1.hdfs.path=/home/hdfs/flume/logdfs/%Y-%m-%d
collector2.sinks.k1.hdfs.minBlockReplicas=1 #默认值:HDFS副本数,写入HDFS文件块的最小副本数。
collector2.sinks.k1.hdfs.batchSize = 1000 #默认值:100,每个批次刷新到HDFS上的events数量;
collector2.sinks.k1.hdfs.fileType=DataStream
collector2.sinks.k1.hdfs.writeFormat=TEXT
collector2.sinks.k1.hdfs.callTimeout=600000 #callTimeout单位是毫秒600000毫秒=10分钟
collector2.sinks.k1.hdfs.rollInterval=0 #默认单位:秒。rollInterval的配置表示每个120秒回滚到下一个文件,hdfs sink间隔多长将临时文件滚动成最终目标文件
collector2.sinks.k1.hdfs.rollSize=52428800 #rollSize默认值:1024,当临时文件达到该大小(单位:bytes)时,
#1048576 byte=1MB,52428800=50MB 滚动成目标文件;如果设置成0,则表示不根据临时文件大小来滚动文件;
collector2.sinks.k1.hdfs.rollCount=0 #rollCount默认值:10,当events数据达到该数量时候,将临时文件滚动成目标文件;
collector2.sinks.k1.hdfs.round=true #round的理解是,每3分钟产生一个新文件
collector2.sinks.k1.hdfs.roundValue=10
collector2.sinks.k1.hdfs.roundUnit=minute
collector2.sinks.k1.hdfs.filePrefix=nginx-%H%M%S
collector2.sinks.k1.hdfs.fileSuffix=.log
#collector2.sinks.k1.hdfs.filePrefix=%Y-%m-%d
#collector2.sinks.k1.hdfs.inUseSuffix = .tmp #临时文件后缀名
collector2.sinks.k1.hdfs.idleTimeout = 600 #idleTimeout:默认值:0当目前被打开的临时文件在该参数指定的时间(秒)内,没有任何数据写入,则将该临时文件关闭并重命名成目标文件
以上配置完所有节点后,开始启动!!
cp /usr/software/hadoop-3.0.1/etc/hadoop/core-site.xml /usr/software/flume/conf/
cp /usr/software/hadoop-3.0.1/etc/hadoop/hadoop-env.sh /usr/software/flume/conf/
cp /usr/software/hadoop-3.0.1/etc/hadoop/workers /usr/software/flume/conf/
3)flume高可用启动和测试
在agent节点上启动命令如下:(每个agent都需要启动)
bin/flume-ng agent -n agent1 -c conf -f conf/flume-client.properties -Dflume.root.logger=DEBUG,console
注:命令中的agent1表示配置文件中的Agent的Name,如配置文件中的agent1。flume-client.properties表示配置文件所在配置,需填写准确的配置文件路径。
在Collector节点上启动命令如下所示:
bin/flume-ng agent -n collector1 -c conf -f conf/flume-server.properties -Dflume.root.logger=DEBUG,console
注:命令中的collector1表示配置文件中的Agent的Name,如配置文件中的collector1。flume-server.properties表示配置文件所在配置,需填写准确的配置文件路径。
☆注意:一定先启动collector两个机器再启动agent三台机器,因为不启动collector没有Flume的进程“Application” 会报错:
agent和collector的flume进程名都是:Application
为了在远程自动启动shell过程中不出现在命令行出现众多的打印内容,且停留在打印信息上无法输入其它命令,而且
为了防止ctrl+c,可以直接停止的情况,所以用以下命令,让启动flume时不打印信息,且只运行在后台:
首先在/usr/software/路径下创建touch nohup.out文件
nohup bin/flume-ng agent -n collector1 -c conf -f conf/flume-server.properties -Dflume.root.logger=INFO,console > /usr/software/shell-script/nohup.out 2>&1 &
nohup bin/flume-ng agent -n collector2 -c conf -f conf/flume-server.properties -Dflume.root.logger=INFO,console > /usr/software/shell-script/nohup.out 2>&1 &
nohup bin/flume-ng agent -n agent1 -c conf -f conf/flume-client.properties -Dflume.root.logger=INFO,console > /usr/software/shell-script/nohup.out 2>&1 &
高可用故障转移测试:
我们在Agent1节点上传文件,由于我们配置Collector1的权重比Collector2大,所以 Collector1优先采集并上传到存储系统。然后我们kill掉Collector1,此时有Collector2负责日志的采集上传工作,之后,我 们手动恢复Collector1节点的Flume服务,再次在Agent1上次文件,发现Collector1恢复优先级别的采集工作。
在hadoop02的/usr/software目录下创建两个文件
touch test.log
touch test-agagin.log
写入内容后,启动hadoop01~hadoop03的agent,和hadoo04、hadoop05的collector
因为配置文件中配置的是:agent1.sources.r1.command = tail -F /usr/software/flume/dir/logdfs/test.log
所以只有在/usr/software/flume/dir/logdfs目录下名字叫test.log的文件才会被执行,显然这种配置是十分局限的,此处只为测试用,之后会根据具体情况进行配置。