目录
1、数据收集工具/系统产生背景
2、专业的数据收集工具
2.1、Chukwa
2.2、Scribe
2.3、Fluentd
2.4、Logstash
2.5、Apache Flume
3、Flume概述
3.1、Flume概念
3.2、Flume版本介绍
3.3、Flume数据源和输出方式
4、Flume体系结构/核心组件
4.1、概述
4.2、Flume三大核心组件
4.3、Flume经典部署方案
4.3.1、单Agent采集数据
4.3.2、多Agent串联
4.3.3、多Agent合并串联
4.3.4、多路复用
5、Flume实战案例
5.1、安装部署Flume
5.2、Flume实战案例
5.2.1、采集目录到HDFS
5.2.2、采集文件到HDFS
5.2.3、多路复用采集
5.2.4、多Agent串联采集
5.2.5、高可用部署采集
5.2.6、更多Source 和 Sink组件
6、综合案例
6.1、案例场景/需求
6.2、场景分析
6.3、数据处理流程分析
6.4、需求实现
Hadoop业务的整体开发流程:
任何完整的大数据平台,一般都会包括以下基本处理过程:
其中,数据采集是所有数据系统必不可少的,随着大数据越来越被重视,数据采集的挑战也变的尤为突出。这其中包括:
我们今天就来看看当前可用的一些数据采集的产品,重点关注一些它们是如何做到高可靠, 高性能和高扩展。
总结,数据来源大体包括:
Apache Chukwa 是 Apache 旗下另一个开源的数据收集平台,它远没有其他几个有名。Chukwa基于 Hadoop 的 HDFS 和 MapReduce 来构建(显而易见,它用 Java 来实现),提供扩展性和可靠性。Chukwa 同时提供对数据的展示,分析和监视。很奇怪的是它的上一次 Github 的更新事是7 年前。可见该项目应该已经不活跃了。
官网:http://chukwa.apache.org/
Scribe 是 Facebook 开源的日志收集系统,在 Facebook 内部已经得到的应用。它能够从各种日志源上收集日志,存储到一个中央存储系统(可以是 NFS,HDFS,或者其他分布式文件系统等)上,以便于进行集中统计分析处理。
官网:https://www.scribesoft.com/
Fluentd 是另一个开源的数据收集框架。Fluentd 使用 C/Ruby 开发,使用 JSON 文件来统一日志数据。它的可插拔架构,支持各种不同种类和格式的数据源和数据输出。最后它也同时提供了高可靠和很好的扩展性。
官网:https://www.fluentd.org/
Logstash 是著名的开源数据栈 ELK(ElasticSearch,Logstash,Kibana)中的那个 L。几乎在大部分的情况下 ELK 作为一个栈是被同时使用的。所有当你的数据系统使用 ElasticSearch 的情况下,Logstash 是首选。Logstash 用 JRuby 开发,所以运行时依赖 JVM。
官网:https://www.elastic.co/cn/products/logstash
Flume 是 Apache 旗下,开源,高可靠,高扩展,容易管理,支持客户扩展的数据采集系统。Flume 使用 JRuby 来构建,所以依赖 Java 运行环境。Flume 最初是由 Cloudera 的工程师设计 用于合并日志数据的系统,后来逐渐发展用于处理流数据事件。
官网:http://flume.apache.org/
Flume 是一个分布式、高可靠、高可用的海量日志聚合系统,支持在系统中定制各类数据发送方,用于收集数据,同时,Flume 提供对数据的简单处理,并写到各种数据接收方的能力。
1、 Apache Flume 是一个分布式、可靠、高可用的海量日志采集、聚合和传输的系统,和Sqoop 同属于数据采集系统组件,但是 Sqoop 用来采集关系型数据库数据,而 Flume 用来采集流动型数据。
2、 Flume 名字来源于原始的近乎实时的日志数据采集工具,现在被广泛用于任何流事件数据的采集,它支持从很多数据源聚合数据到 HDFS。
3、 一般的采集需求,通过对 flume 的简单配置即可实现。Flume 针对特殊场景也具备良好的自定义扩展能力,因此,flume 可以适用于大部分的日常数据采集场景。
4、 Flume 最初由 Cloudera 开发,在 2011 年贡献给了 Apache 基金会,2012 年变成了 Apache的顶级项目。Flume OG(Original Generation)是 Flume 最初版本,后升级换代成 Flume NG(Next/New Generation)。
5、 Flume 的优势:可横向扩展、延展性、可靠性。
Flume 在 0.9.x and 1.x 之间有较大的架构调整:
官网文档:http://flume.apache.org/FlumeUserGuide.html
Flume 提供了从 console(控制台)、RPC(Thrift-RPC)、text(文件)、tail(UNIX tail)、syslog(syslog 日 志系统,支持 TCP 和 UDP 等 2 种模式),exec(命令执行)等数据源上收集数据的能力,在我们的系统中目前使用 exec 方式进行日志采集。
Flume 的数据接受方,可以是 console(控制台)、text(文件)、dfs(HDFS 文件)、RPC(Thrift-RPC)和 syslogTCP(TCP syslog 日志系统)等。最常用的是 Kafka。
Flume 的数据流由事件(Event)贯穿始终。事件是 Flume 的基本数据单位,它携带日志数据(字节数组形式)并且携带有头信息,这些 Event 由 Agent 外部的 Source 生成,当 Source 捕获事件后会进行特定的格式化,然后 Source 会把事件推入(单个或多个)Channel 中。你可以把Channel 看作是一个缓冲区,它将保存事件直到 Sink 处理完该事件。Sink 负责持久化日志或者把事件推向另一个 Source。
Flume 以 agent 为最小的独立运行单位。一个 agent 就是一个 JVM。单 agent 由 Source、Sink 和 Channel 三大组件构成。如下图:
组件 | 功能 |
Agent | 使用 JVM 运行 Flume。每台机器运行一个 agent,但是可以在一个agent 中包含多个 sources 和 sinks。 |
Client | 生产数据,运行在一个独立的线程。 |
Source | 从Client收集数据,传递给Channel。 |
Sink | 从Channel收集数据,运行在一个独立线程上。 |
Channel | 连接Sources 和 Sinks,这个有点像一个队列。 |
Events | 可以是日志记录、avro对象等。 |
Event
Client
Agent
Source
Channel
Agent之Channel
Agent之Sink
Iterator
Channel Selector
Sink Processor
1、Flume 的安装非常简单,只需要解压即可,当然,前提是已有 Hadoop 环境上传安装包到 数据源所在节点上
然后解压 tar -zxvf apache-flume-1.8.0-bin.tar.gz
然后进入 flume 的目录,修改 conf 下的 flume-env.sh,在里面配置 JAVA_HOME
2、根据数据采集的需求配置采集方案,描述在配置文件中(文件名可任意自定义)
3、指定采集方案配置文件,在相应的节点上启动 flume agent
先用一个最简单的例子来测试一下程序环境是否正常:
1、在$FLUME_HOME/agentconf 目录下创建一个数据采集方案,该方案就是从一个网络端口收集数据,也就是创一个任意命名的配置文件如下:netcat-logger.properties文件内容如下:
# 定义这个 agent 中各个组件的名字 a1.sources = r1
# 描述和配置 source 组件:r1 a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44444
# 描述和配置 sink 组件:k1 a1.sinks.k1.type = logger
# 描述和配置 channel 组件,此处使用是内存缓存的方式 a1.channels.c1.type = memory a1.channels.c1.transactionCapacity = 100
# 描述和配置 source channel sink 之间的连接关系 a1.sources.r1.channels = c1 |
2、启动 agent 去采集数据:
在$FLUME_HOME 下执行如下命令:
bin/flume-ng agent -c conf -f agentconf/netcat-logger.properties -n a1 - Dflume.root.logger=INFO,console
-c conf 指定 flume 自身的配置文件所在目录 -n a1 指定我们这个 agent 的名字 |
3、测试
先要往 agent 的 source 所监听的端口上发送数据,让 agent 有数据可采
例如在本机节点,使用 telnet localhost 44444 命令就可以
如果这个命令的执行过程中发现抛出异常说:command not found
那么请使用:sudo yum -y install telnet 这个命令进行 telnet 的安装
输入两行数据:
hello huangbo
1234
4、Flume-Agent 接收的结果
采集需求:某服务器的某特定目录下,会不断产生新的文件,每当有新文件出现,就需要把文件采集到 HDFS 中去
根据需求,首先定义以下 3 大要素:
配置文件编写:spooldir-hdfs.properties
#定义三大组件的名称 agent1.sources = source1 agent1.sinks = sink1 agent1.channels = channel1
# 配置 source 组件 agent1.sources.source1.spoolDir = /home/hadoop/logs/ agent1.sources.source1.fileHeader = false
#配置拦截器 agent1.sources.source1.interceptors.i1.type = host agent1.sources.source1.interceptors.i1.hostHeader = hostname
# 配置sink组件 agent1.sinks.sink1.hdfs.path=hdfs://myha01/flume_log/%y-%m-%d/%H-%M agent1.sinks.sink1.hdfs.filePrefix = events agent1.sinks.sink1.hdfs.maxOpenFiles = 5000 agent1.sinks.sink1.hdfs.batchSize= 100 agent1.sinks.sink1.hdfs.writeFormat =Text agent1.sinks.sink1.hdfs.rollSize = 102400 agent1.sinks.sink1.hdfs.rollCount = 1000000 agent1.sinks.sink1.hdfs.rollInterval = 60 #agent1.sinks.sink1.hdfs.round = true #agent1.sinks.sink1.hdfs.roundValue = 10 #agent1.sinks.sink1.hdfs.roundUnit = minute agent1.sinks.sink1.hdfs.useLocalTimeStamp = true
# Use a channel which buffers events in memory agent1.channels.channel1.type = memory agent1.channels.channel1.keep-alive = 120 agent1.channels.channel1.capacity = 500000 agent1.channels.channel1.transactionCapacity = 600
# Bind the source and sink to the channel agent1.sources.source1.channels = channel1 agent1.sinks.sink1.channel = channel1 |
Channel 参数解释:
启动:bin/flume-ng agent -c conf -f agentconf/spooldir-hdfs.properties -n agent1
测试:
采集需求:比如业务系统使用 log4j 生成的日志,日志内容不断增加,需要把追加到日志文件中的数据实时采集到 HDFS
根据需求,首先定义以下 3 大要素:
配置文件编写:tail-hdfs.properties
agent1.sources = source1 agent1.sinks = sink1 agent1.channels = channel1
# Describe/configure tail -F source1 agent1.sources.source1.channels = channel1
#configure host for source agent1.sources.source1.interceptors.i1.type = host agent1.sources.source1.interceptors.i1.hostHeader = hostname
# Describe sink1 agent1.sinks.sink1.hdfs.filePrefix = tomcat_
agent1.sinks.sink1.hdfs.maxOpenFiles = 5000 agent1.sinks.sink1.hdfs.batchSize= 100 agent1.sinks.sink1.hdfs.fileType = DataStream agent1.sinks.sink1.hdfs.writeFormat =Text agent1.sinks.sink1.hdfs.rollSize = 102400 agent1.sinks.sink1.hdfs.rollCount = 1000000 agent1.sinks.sink1.hdfs.rollInterval = 60 agent1.sinks.sink1.hdfs.round = true agent1.sinks.sink1.hdfs.roundValue = 10 agent1.sinks.sink1.hdfs.roundUnit = minute agent1.sinks.sink1.hdfs.useLocalTimeStamp = true
# Use a channel which buffers events in memory agent1.channels.channel1.type = memory agent1.channels.channel1.keep-alive = 120 agent1.channels.channel1.capacity = 500000 agent1.channels.channel1.transactionCapacity = 600
# Bind the source and sink to the channel agent1.sources.source1.channels = channel1 agent1.sinks.sink1.channel = channel1 |
启动:bin/flume-ng agent -c conf -f agentconf/tail-hdfs.properties -n agent1
测试:
作业。
架构设计:从 hadoop04 的 flume agent 传送数据到 hadoop05 的 flume agent
如现在我在两台机器上的测试,192.168.123.104 和 192.168.123.105 上面做 agent 的传递,分别是:
hadoop04:tail-avro.properties
hadoop05:avro-hdfs.properties
第一步:准备 hadoop04
在 IP 为 192.168.123.104 的 hadoop04 上的 agentconf 下创建一个 tail-avro.properties:
a1.sources = r1 a1.sinks = k1 a1.channels = c1
# Describe/configure the source a1.sources.r1.type = exec a1.sources.r1.channels = c1
# Describe the sink a1.sinks.k1.type = avro a1.sinks.k1.channel = c1 a1.sinks.k1.hostname = hadoop05 a1.sinks.k1.port = 4141 a1.sinks.k1.batch-size = 2
# Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 |
第二步:准备 hadoop05
再在 IP 为 192.168.123.105 的 hadoop05 机器上配置采集方案 avro-hdfs.properties:
a1.sources = r1 a1.sinks = k1 a1.channels = c1
# Describe/configure the source a1.sources.r1.type = avro a1.sources.r1.channels = c1 a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 4141
# Describe k1 a1.sinks.k1.hdfs.filePrefix = date_
# Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 |
第三步:最终测试
1、首先启动 hadoop05 机器上的 agent bin/flume-ng agent -c conf -n Dflume.root.logger=INFO,console
2、再启动 hadoop04 上的 agent bin/flume-ng agent -c conf -n Dflume.root.logger=INFO,console
3、执行一个普通的脚本往 hadoop04 的/home/hadoop/testlog/date.log 中追加数据:
4、至此会发现在 hadoop04 agent 发送的数据会转到 hadoop05 agent,然后被 sink 到了HDFS 的对应目录hdfs://myha01/testlog/flume-event/ |
第一步:
Flume-NG 的高可用架构图
图中,我们可以看出,Flume 的存储可以支持多种,这里只列举了 HDFS 和 Kafka(如:存储 最新的一周日志,并给 Storm 系统提供实时日志流)。
第二步:节点分配
Flume 的 Agent 和 Collector 分布如下表所示:
名称 | Host | 角色 |
Agent1 | hadoop02 | 日志服务器 |
Agent2 | hadoop03 | 日志服务器 |
Agent3 | hadoop04 | 日志服务器 |
Collector1 | hadoop04 | AgentMaster1 |
Collector2 | hadoop05 | AgentMaster2 |
图中所示,Agent1,Agent2,Agent3 数据分别流入到 Collector1 和 Collector2,Flume NG 本 身提供了 Failover 机制,可以自动切换和恢复。在上图中,有 3 个产生日志服务器分布在不 同的机房,要把所有的日志都收集到一个集群中存储。下面我们开发配置 Flume NG 集群
第三步:配置信息
在下面单点 Flume 中,基本配置都完成了,我们只需要新添加两个配置文件,它们是ha_agent.properties 和 ha_collector.properties,其配置内容如下所示:
ha_agent.properties 配置:
#agent name: agent1 agent1.channels = c1 agent1.sources = r1 agent1.sinks = k1 k2
#set gruop agent1.sinkgroups = g1
#set channel agent1.channels.c1.capacity = 1000 agent1.channels.c1.transactionCapacity = 100
agent1.sources.r1.channels = c1 agent1.sources.r1.type = exec
agent1.sources.r1.interceptors = i1 i2 agent1.sources.r1.interceptors.i1.type = static agent1.sources.r1.interceptors.i1.key = Type agent1.sources.r1.interceptors.i1.value = LOGIN agent1.sources.r1.interceptors.i2.type = timestamp
# set sink1 agent1.sinks.k1.type = avro agent1.sinks.k1.hostname = hadoop04 agent1.sinks.k1.port = 52020
# set sink2 agent1.sinks.k2.type = avro agent1.sinks.k2.hostname = hadoop05 agent1.sinks.k2.port = 52020
#set sink group agent1.sinkgroups.g1.sinks = k1 k2
#set failover agent1.sinkgroups.g1.processor.priority.k1 = 10 agent1.sinkgroups.g1.processor.priority.k2 = 1 agent1.sinkgroups.g1.processor.maxpenalty = 10000 |
ha_collector.properties 配置:
#set agent name a1.sources = r1 a1.channels = c1 a1.sinks = k1
#set channel a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100
# other node,nna to nns a1.sources.r1.type = avro ## 当前主机为什么,就修改成什么主机名 a1.sources.r1.bind = hadoop04 a1.sources.r1.port = 52020 a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = static a1.sources.r1.interceptors.i1.key = Collector a1.sources.r1.interceptors.i1.value = hadoop04 a1.sources.r1.channels = c1
#set sink to hdfs a1.sinks.k1.hdfs.fileType=DataStream a1.sinks.k1.hdfs.writeFormat=TEXT a1.sinks.k1.hdfs.rollInterval=10 a1.sinks.k1.hdfs.filePrefix=%Y-%m-%d a1.sinks.k1.channel=c1 |
注意:在把 ha_collector.properties 文件拷贝到另外一台 collector 的时候,记得更改该配置文 件中的主机名。在该配置文件中有注释
第四步:启动
先启动 hadoop04 和 hadoop05 上的 collector 角色:
然后启动 hadoop02,hadoop03,hadoop04 上的 agent 角色: |
更多 Sources:http://flume.apache.org/FlumeUserGuide.html#flume-sources
更多 Channels:http://flume.apache.org/FlumeUserGuide.html#flume-channels
更多 Sinks:http://flume.apache.org/FlumeUserGuide.html#flume-sinks
A、B 两台日志服务机器实时生产日志主要类型为 access.log、nginx.log、web.log。现在要求把 A、B 机器中的 access.log、nginx.log、web.log 采集汇总到 C 机器上然后统一收集到 HDFS中。但是在 hdfs 中要求的目录为:
/source/logs/access/20160101/**
/source/logs/nginx/20160101/**
/source/logs/web/20160101/**
第一:准备 3 台服务器
服务器 A 对应的 IP 为 192.168.123.103,主机名为 hadoop03
服务器 B 对应的 IP 为 192.168.123.104,主机名为 hadoop04
服务器 C 对应的 IP 为 192.168.123.105,主机名为 hadoop05
第二:设计采集方案 exec_source_avro_sink.properties
在服务器hadoop03和服务器hadoop04上的$FLUME_HOME/agentconf 创建采集方案的配置
文件 exec_source_avro_sink.properties,文件内容为:
# 指定各个核心组件 a1.sources = r1 r2 r3 a1.sinks = k1 a1.channels = c1
# 准备数据源 a1.sources.r1.type = exec
a1.sources.r2.type = exec a1.sources.r2.interceptors = i2 a1.sources.r2.interceptors.i2.type = static a1.sources.r2.interceptors.i2.key = type a1.sources.r2.interceptors.i2.value = nginx
a1.sources.r3.type = exec a1.sources.r3.interceptors = i35 a1.sources.r3.interceptors.i3.key = type a1.sources.r3.interceptors.i3.value = web
# Describe the sink a1.sinks.k1.type = avro a1.sinks.k1.hostname = hadoop05 a1.sinks.k1.port = 41414
# Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 20000 a1.channels.c1.transactionCapacity = 10000
# Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sources.r2.channels = c1 a1.sources.r3.channels = c1 a1.sinks.k1.channel = c1 |
第三:准备 avro_source_hdfs_sink.properties 配置文件
在服务器 C 上的$FLUME_HOME/agentconf 中创建配置文件 avro_source_hdfs_sink.properties文件内容为:
#定义 agent 名,source、channel、sink 的名称 a1.sources = r1
#定义 source a1.sources.r1.type = avro a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port =41414
#添加时间拦截器 a1.sources.r1.interceptors.i1.type=org.apache.flume.interceptor.TimestampInterceptor$Builder
#定义 channels a1.channels.c1.capacity = 20000 a1.channels.c1.transactionCapacity = 10000
#定义 sink a1.sinks.k1.hdfs.path=hdfs://myha01/source/logs/%{type}/%Y%m%d a1.sinks.k1.hdfs.filePrefix =events #时间类型 #生成的文件不按条数生成 a1.sinks.k1.hdfs.threadsPoolSize=10
#组装 source、channel、sink a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 |
第四:启动
配置完成之后,在服务器 A 和 B 上的/home/hadoop/data 有数据文件 access.log、nginx.log、web.log。
先启动服务器 C(hadoop05)上的 flume,启动命令:在 flume 安装目录下执行:
bin/flume-ng agent -c conf -f agentconf/avro_source_hdfs_sink.properties -name a1 - Dflume.root.logger=DEBUG,console
然后在启动服务器上的 A(hadoop03)和 B(hadoop04),启动命令:在 flume 安装目录下执 行:
bin/flume-ng agent -c conf -f agentconf/exec_source_avro_sink.properties -name a1 - Dflume.root.logger=DEBUG,console
第五:测试
自行测试