热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Flume介绍及调优

一、概述Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flum

一、概述

Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。

当前Flume有两个版本Flume 0.9X版本的统称Flume-og,Flume1.X版本的统称Flume-ng。由于Flume-ng经过重大重构,与Flume-og有很大不同,使用时请注意区分。

 

二、Flume特性

(1)可靠性

当节点出现故障时,日志能够被传送到其他节点上而不会丢失。Flume提供了三种级别的可靠性保障,从强到弱依次分别为:end-to-end(收到数据agent首先将event写到磁盘上,当数据传送成功后,再删除;如果数据发送失败,可以重新发送),Store on failure(这也是scribe采用的策略,当数据接收方crash时,将数据写到本地,待恢复后,继续发送),Best effort(数据发送到接收方后,不会进行确认)。

(2)可扩展性

Flume采用了三层架构,分别为agent,collector和storage,每一层均可以水平扩展。其中,所有agent和collector由master统一管理,这使得系统容易监控和维护,且master允许有多个(使用ZooKeeper进行管理和负载均衡),这就避免了单点故障问题。

三、优势

(1)Flume可以将应用产生的数据存储到任何集中存储器中,比如HDFS,HBase

(2)当收集数据的速度超过将写入数据的时候,也就是当收集信息遇到峰值时,这时候收集的信息非常大,甚至超过了系统的写入数据能力,这时候,Flume会在数据生产者和数据收容器间做出调整,保证其能够在两者之间提供平稳的数据.

(3)提供上下文路由特征

(4) Flume的管道是基于事务,保证了数据在传送和接收时的一致性.

(5)Flume是可靠的,容错性高的,可升级的,易管理的,并且可定制的。

四、工作方式

Flume-og采用了多Master的方式。为了保证配置数据的一致性,Flume引入了ZooKeeper,用于保存配置数据,ZooKeeper本身可保证配置数据的一致性和高可用,另外,在配置数据发生变化时,ZooKeeper可以通知Flume Master节点。Flume Master间使用gossip协议同步数据。

Flume-ng最明显的改动就是取消了集中管理配置的 Master 和 Zookeeper,变为一个纯粹的传输工具。Flume-ng另一个主要的不同点是读入数据和写出数据由不同的工作线程处理(称为 Runner)。 在 Flume-og 中,读入线程同样做写出工作(除了故障重试)。如果写出慢的话(不是完全失败),它将阻塞 Flume 接收数据的能力。这种异步的设计使读入线程可以顺畅的工作而无需关注下游的任何问题。

五、Flume架构

Flume主要分为Source、Channel、Sink三个组件,他们包含在一个Agent中,一个Agent相当于一个独立的application,数据从源头经过Agent的这几个组件最后到达目的地。一个Flume服务可同时运行多个Agent,大致架构如下图:

 

数据源种类有很多,可以来自directory、http、kafka等,Flume提供了Source组件用来采集数据源。Source种类如下:

(1)spooling directory source:采集目录中的日志

(2)htttp source:采集http中的日志

(3)kafka source:采集kafka中的日志

采集到的日志需要进行缓存,Flume提供了Channel组件用来缓存数据。Channel种类如下:

(1)memory channel:缓存到内存中(最常用)

(2)JDBC channel:通过JDBC缓存到关系型数据库中

(3)kafka channel:缓存到kafka中

缓存的数据最终需要进行保存,Flume提供了Sink组件用来保存数据。Sink种类如下:

(1)HDFS sink:保存到HDFS中

(2)HBase sink:保存到HBase中

(3)Hive sink:保存到Hive中

(4)kafka sink:保存到kafka中

 

六、适用场景及案例分析

日志--->Flume--->实时计算(如kafka/MQ+Storm/Spark Streaming)、日志--->Flume--->离线计算(如ODPS、HDFS、HBase)、日志--->Flume--->ElasticSearch等。

下面结合一个大数据实时处理系统(Flume+Kafka+Spark Streaming+Redis)阐述下Flume在实际应用中所扮演的重要角色。

该实时处理系统整体架构如下:

比如我们要实时统计用户在某个网站上的PV(页面浏览量)、UV(独立访客),那么,对于Flume而言,它的作用就是在于采集用户数据,并且将其发送到kafka集群中指定的topic上。

在我们的场景中,需要配置三个Flume Agent,其中两个Flume Agent分别部署在两台Web服务器上,用来采集Web服务器上的日志数据,然后其数据的下沉方式都发送到另外一个Flume Agent上。

部署在Web服务器上的两个Flume Agent添加配置文件flume-sink-avro.conf,其配置内容如下:

配置完成后,启动Flume Agent,即可对日志文件进行监听:

Flume Consolidation Agent添加配置文件flume-source_avro-sink_kafka.conf,其配置内容如下:

配置完成后,启动Flume Agent,即可对avro的数据进行监听:

完成上述操作后,如果在Web服务器上有新增的日志数据,就会被我们的Flume程序监听到,并且最终会传输到Kafka的f-k-s topic中,通过Flume强大的数据采集功能,为整个实时处理系统提供了数据保障,之后就可以进行后续的一系列操作。

另外,想要利用Flume采集到更有价值、更符合各自业务需求的数据,我们不得不谈到Flume的事务及拦截器的功劳。

 

 1、事务

事务保证了数据的可用性(有别于数据库中的事务)。下图的数据流是spooling directory source-> memory channel-> kafka sink,其中memory channel维护了两个事务,分别是PUT事务和Take事务。

1)PUT事务

(1)批量数据循环PUT到putList中;

(2)Commit,把putList队列中的数据offer到queue队列中,然后释放信号量,清空(clear)putList队列;

(3)Rollback,清空(clear)putList队列。

2)Take事务

(1)检查takeList队列大小是否够用,从queue队列中poll;

(2)Event到takeList队列中;

(3)Commit,表明被Sink正确消费掉,清空(clear)takeList队列;

(4)Rollback,异常出现,则把takeList队列中的Event返还到queue队列顶部。

 

 2、拦截器

有的时候我们希望通过Flume将读取的数据按照业务类型分开存储,或是丢弃或修改一些数据,这时可以考虑使用拦截器Interceptor。

拦截器通过定义类继承org.apache.flume.interceptor.Interceptor接口来实现,用户可以通过该节点定义规则来修改或者丢弃事件。Flume支持链式拦截,通过在配置中指定构建的拦截器类的名称,在source的配置中,拦截器被指定为一个以空格为间隔的列表,它按照指定的顺序调用,一个拦截器返回的事件列表被传递到链中的下一个拦截器,当一个拦截器要丢弃某些事件时,拦截器只需要在返回事件列表时不返回该事件即可,若拦截器要丢弃所有事件,则其返回一个空的事件列表。

      public interface Interceptor {

         public void initialize();

         public Event intercept(Event event);

         public List intercept(List events);

         public void close();

         public interface Builder extends Configurable {

            public Interceptor build();

         }

}

Flume内置拦截器列举如下:

(1)时间戳拦截器

  该拦截器的作用是将时间戳插入到Flume的事件报头中。Source连接到时间戳拦截器的配置:

a1.sources.r1.interceptors=timestamp a1.sources.r1.interceptors.timestamp.type=timestamp a1.sources.r1.interceptors.timestamp.preserveExisting=false

(2)主机拦截器

  该拦截器插入服务器的ip地址或者主机名,agent将这些内容插入到事件的报头中。Source连接到主机拦截器的配置:

a1.sources.r1.interceptors=host

a1.sources.r1.interceptors.host.type=host

a1.sources.r1.interceptors.host.useIP=false a1.sources.r1.interceptors.timestamp.preserveExisting=true

(3)静态拦截器

  该拦截器的作用是将k/v插入到事件的报头中。Source连接到静态拦截器的配置:

a1.sources.r1.interceptors = static

  a1.sources.r1.interceptors.static.type=static

a1.sources.r1.interceptors.static.key=logs a1.sources.r1.interceptors.static.value=logFlume a1.sources.r1.interceptors.static.preserveExisting=false

(4)正则过滤拦截器

  该拦截器可以过滤掉不需要的日志,也可以根据需要收集满足正则条件的日志。

Source连接到正则过滤拦截器的配置:

a1.sources.r1.interceptors=regex

a1.sources.r1.interceptors.regex.type=REGEX_FILTER a1.sources.r1.interceptors.regex.regex=.* a1.sources.r1.interceptors.regex.excludeEvents=false

其中regex=.*匹配除“\n”之外的任何字符,excludeEvents=false默认收集匹配到的事件;若为true,则会删除匹配到的event,收集未匹配到的。

 

七、注意事项

1、Flume的停止

      使用kill停止Flume进程,不可使用kill -9,因为Flume内部注册了很多钩子函数执行善后工作,如果使用kill -9会导致钩子函数不执行,使用kill时,Flume内部进程会监控到用户的操作,然后调用钩子函数,执行一些善后操作,正常退出。

2、Flume数据丢失问题

      Flume可能丢失数据的情况是Channel采用memoryChannel,agent宕机导致数据丢失,或者Channel存储数据已满,导致Source不再写入,未写入的数据丢失。另外,Flume有可能造成数据的重复,例如数据已经成功由Sink发出,但是没有接收到响应,Sink会再次发送数据,此时可能会导致数据的重复。    

3、Sink从Channel中读取数据的方式

   默认情况下,Sink获取数据的方式是:当Source向Channel发送一条数据的时候,Sink会通过循环的方式获取一条数据,然后再发送给客户端。

   Sink可以分为KafkaSink和AvroSink, 它们都是通过循环的方式获取数据,但是 KafkaSink可以通过配置topic进行批量从客户端读取。但原理还是一条一条的从Channel读取数据,只是在Sink中存在缓存机制,当数据量达到某一数量的时候,会将数据批量发送到客户端。

  4、CPU占用过高的问题

  若程序运行出现CPU占用过高的现象,则可以在代码中加入休眠sleep,这样的话,就可以释放CPU资源,注意,内存资源不会释放,因为线程还未结束,是可用状态。

 

八、性能调优

Flume经常被用在生产环境中收集后端产生的日志,一个Flume进程就是一个Agent,要充分发挥Flume的性能最主要的是要调好Flume的配置参数。

Flume agent配置分为三个部分:Source、Channel、Sink。

1、Source

(1)增加Source个数(使用tairDirSource时可增加filegroups个数)可以增大Source读取数据的能力。例如:当某一个目录产生的文件过多时需要将这个文件目录拆分成多个文件目录,同时配置好多个Source以保证Source有足够的能力获取到新产生的数据。

(2)batchSize参数决定Source一次批量传输到Channel的event条数,适当调大这个参数可以提高Source搬运Event到Channel时的性能。

2、Channel 

(1)type选择memory时Channel的性能最好,但是如果Flume进程意外挂掉可能会丢失数据;type选择file时Channel的容错性更好,但是性能上会比memory channel差。使用file Channel时dataDirs配置多个不同盘下的目录可以提高性能。

(2)capacity参数决定Channel可容纳最大的event条数;transactionCapacity参数决定每次Source往channel里面写的最大event条数和每次Sink从channel里面读的最大event条数;transactionCapacity需要大于Source和Sink的batchSize参数;byteCapacity是Channel的内存大小,单位是byte。  

 

3、Sink 

(1)增加Sink的个数可以增加Sink消费event的能力。当然Sink也不是越多越好,够用就行,过多的Sink会占用系统资源,造成系统资源不必要的浪费。

(2)batchSize参数决定Sink一次批量从Channel读取的event条数,适当调大这个参数可以提高Sink从Channel搬出event的性能。



推荐阅读
  • 大数据领域的职业路径与角色解析
    本文将深入探讨大数据领域的各种职业和工作角色,帮助读者全面了解大数据行业的需求、市场趋势,以及从入门到高级专业人士的职业发展路径。文章还将详细介绍不同公司对大数据人才的需求,并解析各岗位的具体职责、所需技能和经验。 ... [详细]
  • javascript分页类支持页码格式
    前端时间因为项目需要,要对一个产品下所有的附属图片进行分页显示,没考虑ajax一张张请求,所以干脆一次性全部把图片out,然 ... [详细]
  • HBase在金融大数据迁移中的应用与挑战
    随着最后一台设备的下线,标志着超过10PB的HBase数据迁移项目顺利完成。目前,新的集群已在新机房稳定运行超过两个月,监控数据显示,新集群的查询响应时间显著降低,系统稳定性大幅提升。此外,数据消费的波动也变得更加平滑,整体性能得到了显著优化。 ... [详细]
  • hdfs数据节点分发什么协议_HDFS主要节点解说(一)节点功能
    HDFS是一个主从(MaterSlave)体系结构。从终于用户的角度来看,它就像传统的文件系统一样,能够通过文件夹路径对文件运行CRUD(Create、 ... [详细]
  • 阿里云 Aliplayer高级功能介绍(八):安全播放
    如何保障视频内容的安全,不被盗链、非法下载和传播,阿里云视频点播已经有一套完善的机 ... [详细]
  • 本文介绍了 Python 中的基本数据类型,包括不可变数据类型(数字、字符串、元组)和可变数据类型(列表、字典、集合),并详细解释了每种数据类型的使用方法和常见操作。 ... [详细]
  • 直播带货系统中的推流技术详解
    本文介绍了RTMP(实时消息传输协议)及其在直播带货系统中的应用,并详细探讨了带货直播系统的连麦方案,包括服务端合流和客户端合流的优势与劣势。 ... [详细]
  • 兆芯X86 CPU架构的演进与现状(国产CPU系列)
    本文详细介绍了兆芯X86 CPU架构的发展历程,从公司成立背景到关键技术授权,再到具体芯片架构的演进,全面解析了兆芯在国产CPU领域的贡献与挑战。 ... [详细]
  • 微信小程序详解:概念、功能与优势
    微信公众平台近期向200位开发者发送了小程序的内测邀请。许多人对微信小程序的概念还不是很清楚。本文将详细介绍微信小程序的定义、功能及其独特优势。 ... [详细]
  • 秒建一个后台管理系统?用这5个开源免费的Java项目就够了
    秒建一个后台管理系统?用这5个开源免费的Java项目就够了 ... [详细]
  • V8不仅是一款著名的八缸发动机,广泛应用于道奇Charger、宾利Continental GT和BossHoss摩托车中。自2008年以来,作为Chromium项目的一部分,V8 JavaScript引擎在性能优化和技术创新方面取得了显著进展。该引擎通过先进的编译技术和高效的垃圾回收机制,显著提升了JavaScript的执行效率,为现代Web应用提供了强大的支持。持续的优化和创新使得V8在处理复杂计算和大规模数据时表现更加出色,成为众多开发者和企业的首选。 ... [详细]
  • 在搭建Hadoop集群以处理大规模数据存储和频繁读取需求的过程中,经常会遇到各种配置难题。本文总结了作者在实际部署中遇到的典型问题,并提供了详细的解决方案,帮助读者避免常见的配置陷阱。通过这些经验分享,希望读者能够更加顺利地完成Hadoop集群的搭建和配置。 ... [详细]
  • 本文介绍了如何使用Hive分析用户最长连续登录天数的方法。首先对数据进行排序,然后计算相邻日期之间的差值,接着按用户ID分组并累加连续登录天数,最后求出每个用户的最大连续登录天数。此外,还探讨了该方法在其他领域的应用,如股票市场中最大连续涨停天数的分析。 ... [详细]
  • 分布式一致性算法:Paxos 的企业级实战
    一、简介首先我们这个平台是ES专题技术的分享平台,众所周知,ES是一个典型的分布式系统。在工作和学习中,我们可能都已经接触和学习过多种不同的分布式系统了,各 ... [详细]
  • Hadoop的分布式架构改进与应用
    nsitionalENhttp:www.w3.orgTRxhtml1DTDxhtml1-transitional.dtd ... [详细]
author-avatar
黄家驹1994
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有