热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

方方面面|也就是_flume拦截器及问题解决

篇首语:本文由编程笔记#小编为大家整理,主要介绍了flume拦截器及问题解决相关的知识,希望对你有一定的参考价值。概述

篇首语:本文由编程笔记#小编为大家整理,主要介绍了flume拦截器及问题解决相关的知识,希望对你有一定的参考价值。



概述

Flume 除了主要的三大组件 Source、Channel和 Sink,还有一些其他灵活的组件,如拦截器、SourceRunner运行器、Channel选择器和Sink处理器等。


组件框架图

今天主要来看看拦截器,先看下组件框架流程图,熟悉了大致框架流程学习起来必然会更加轻松: 






  1. 接收事件
  2. 根据配置选择对应的Source运行器(EventDrivenSourceRunner 和 PollableSourceRunner)
  3. 处理器处理事件(Load-Balancing Sink 和 Failover Sink 处理器)
  4. 将事件传递给拦截器链
  5. 将每个事件传递给Channel选择器
  6. 返回写入事件的Channel列表
  7. 将所有事件写入每个必需的Channel,只有一个事务被打开
  8. 可选Channel(配置可选Channel后不管其是否写入成功)

拦截器

拦截器(Interceptor)是简单插件式组件,设置在Source和Channel之间,Source接收到event在写入到对应的Channel之前,可以通过调用的拦截器转换或者删除过滤掉一部分event。通过拦截器后返回的event数不能大于原本的数量。在一个Flume 事件流程中,可以添加任意数量的拦截器转换或者删除从单个Source中来的事件,Source将同一个事务的所有事件event传递给Channel处理器,进而依次可以传递给多个拦截器,直至从最后一个拦截器中返回的最终事件event写入到对应的Channel中。 
flume-1.7版本支持的拦截器: 






编写自定义拦截器

自定义的拦截器编写,我们只需要实现一个Interceptor接口即可,该接口的定义如下:


  1. public interface Interceptor
  2. /* 任何需要拦截器初始化或者启动的操作就可以定义在此,无则为空即可 */
  3. public void initialize();
  4. /* 每次只处理一个Event */
  5. public Event intercept(Event event);
  6. /* 量处理Event */
  7. public List<Event> intercept(List<Event> events);
  8. /*需要拦截器执行的任何closing/shutdown操作&#xff0c;一般为空 */
  9. public void close();
  10. /* 获取配置文件中的信息&#xff0c;必须要有一个无参的构造方法 */
  11. public interface Builder extends Configurable
  12. public Interceptor build();

接口中的几个方法或者内部接口含义代码中已经标注&#xff0c;需要留意的地方就是考虑到多线程运行Source时&#xff0c;需要保证编写的代码是线程安全的。这里就不展示自定义拦截器代码了&#xff0c;仿照已有的拦截器&#xff0c;可以很容易的编写一个简单功能的自定义拦截器的。


实际使用及问题


问题&#xff1a;

目前环境中使用的都是tailSource、hdfsSink&#xff0c;在sink时根据时间对日志分割成不同的目录&#xff0c;但是实际过程中存在一些延迟&#xff0c;导致sink写入hdfs时的时间和日志文件中记录的时间存在一些差异&#xff1b;并且不能保留原有的日志文件名。


需求&#xff1a;


  1. 根据日志中记录的时间对文件进行分目录存储
  2. 将source端读取的日志名字符串添加至hdfsSink写入hdfs的文件名中&#xff08;在hdfs文件中可以根据文件名区分日志&#xff09;

日志格式如下&#xff1a;


  1. 2017/01/13 13:30:00 ip:123.178.46.252 message:["s":"bbceif1484117100097","u":"354910072847819","id":"2x1kfBk63z","e":
  2. 2017/01/13 14:50:00 ip:123.178.46.252 message:["s":"bbceif1484117100097","u":"354910072847819","id":"2x1kfBk63z","e":
  3. 2017/01/13 15:52:00 ip:123.178.46.252 message:["s":"bbceif1484117100097","u":"354910072847819","id":"2x1kfBk63z","e":
  4. 2017/01/13 16:53:00 ip:123.178.46.252 message:["s":"bbceif1484117100097","u":"354910072847819","id":"2x1kfBk63z","e":
  5. 2017/01/14 13:50:00 ip:123.178.46.252 message:["s":"bbceif1484117100097","u":"354910072847819","id":"2x1kfBk63z","e":
  6. 2017/01/14 13:50:00 ip:123.178.46.252 message:["s":"bbceif1484117100097","u":"354910072847819","id":"2x1kfBk63z","e":
  7. 2017/01/14 14:50:00 ip:123.178.46.252 message:["s":"bbceif1484117100097","u":"354910072847819","id":"2x1kfBk63z","e":
  8. 2017/01/14 14:56:00 ip:123.178.46.252 message:["s":"bbceif1484117100097","u":"354910072847819","id":"2x1kfBk63z","e":

如何实现以上需求&#xff1f;


  1. 要了解TaildirSource如何读取日志文件&#xff0c;按行读取还是按数据量大小&#xff1f; 
    分析代码可知&#xff0c;无论单个事件操作还是批量操作均是按行读取
  2. hdfsSink如何对文件进行分目录&#xff1f; 
    若定义了hdfs.useLocalTimeStamp &#61; true &#xff0c;则是根据本地时间戳分目录&#xff0c;否则是从事件的header中获取时间戳。

明白了这两个问题&#xff0c;就可以继续往前走了。


实现需求1

Source端&#xff1a; 
经过调研查阅资料发现&#xff0c;有拦截器就可以直接实现该目标功能。使用RegexExtractorInterceptor正则抽取拦截器&#xff0c;匹配日志中的时间字符串&#xff0c;将其添加至Event的header中&#xff08;header的key值为timestamp&#xff09;&#xff0c;写入header时序列化只能使用org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer&#xff08;该序列化器内部根据配置传入的pattern将时间转换为时间戳格式&#xff09;&#xff1a;


  1. agent1.sources.r1.interceptors &#61; inter
  2. agent1.sources.r1.interceptors.inter.type &#61; regex_extractor
  3. agent1.sources.r1.interceptors.inter.regex &#61; ^(\\\\d\\\\d\\\\d\\\\d/\\\\d\\\\d/\\\\d\\\\d\\\\s\\\\d\\\\d:\\\\d\\\\d:\\\\d\\\\d).*
  4. agent1.sources.r1.interceptors.inter.serializers &#61; s1
  5. #agent1.sources.r1.interceptors.inter.serializers.s1.type &#61; org.apache.flume.interceptor.RegexExtractorInterceptorPassThroughSerializer //该序列化内部只是将传入的匹配项直接返回return
  6. agent1.sources.r1.interceptors.inter.serializers.s1.type &#61; org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer
  7. agent1.sources.r1.interceptors.inter.serializers.s1.name &#61; timestamp
  8. agent1.sources.r1.interceptors.inter.serializers.s1.pattern &#61; yyyy/MM/dd HH:mm:ss

Sink端&#xff1a; 
Sink端只需要注意不要设置hdfs.useLocalTimeStamp 为 true&#xff0c;也就是不使用本地时间&#xff0c;默认为false即可。


  1. agent1.sinks.k1.type &#61; hdfs
  2. agent1.sinks.k1.channel &#61; c2
  3. agent1.sinks.k1.hdfs.path &#61; /user/portal/tmp/syx/test2/%Y%m%d/%Y%m%d%H
  4. agent1.sinks.k1.hdfs.filePrefix &#61; events-%[localhost]-%timestamp //%[localhost] 获取主机名&#xff0c;%timestamp 获取事件header中key为timestamp的值value
  5. #agent1.sinks.k1.hdfs.useLocalTimeStamp &#61; true //注意此处直接使用Event header中的timestamp&#xff0c;不适用本地时间戳
  6. agent1.sinks.k1.hdfs.callTimeout &#61; 100000

实现需求2

tailDirSource端使用参数&#xff1a;




fileHeader false Whether to add a header storing the absolute path filename.
fileHeaderKey file Header key to use when appending absolute path filename to event header.


fileHeader 设置为 true &#xff0c;可以将日志文件的绝对路径存储在事件的header中&#xff1b; 
fileHeaderKey 目前来说不需要设置&#xff0c;它指定了存储在header中路径的key 名&#xff08;header中是以key-value对存储&#xff09;&#xff0c;默认为 file。如下&#xff1a;


  1. Event: headers:timestamp&#61;1452581700000, file&#61;/home/hadoop_portal/tiany/test.log body: 32 30 31 36 2F 30 31 2F 31 32 20 31 34 3A 35 35 2016/01/12 14:
推荐阅读
  • Nginx使用AWStats日志分析的步骤及注意事项
    本文介绍了在Centos7操作系统上使用Nginx和AWStats进行日志分析的步骤和注意事项。通过AWStats可以统计网站的访问量、IP地址、操作系统、浏览器等信息,并提供精确到每月、每日、每小时的数据。在部署AWStats之前需要确认服务器上已经安装了Perl环境,并进行DNS解析。 ... [详细]
  • Metasploit攻击渗透实践
    本文介绍了Metasploit攻击渗透实践的内容和要求,包括主动攻击、针对浏览器和客户端的攻击,以及成功应用辅助模块的实践过程。其中涉及使用Hydra在不知道密码的情况下攻击metsploit2靶机获取密码,以及攻击浏览器中的tomcat服务的具体步骤。同时还讲解了爆破密码的方法和设置攻击目标主机的相关参数。 ... [详细]
  • imx6ull开发板驱动MT7601U无线网卡的方法和步骤详解
    本文详细介绍了在imx6ull开发板上驱动MT7601U无线网卡的方法和步骤。首先介绍了开发环境和硬件平台,然后说明了MT7601U驱动已经集成在linux内核的linux-4.x.x/drivers/net/wireless/mediatek/mt7601u文件中。接着介绍了移植mt7601u驱动的过程,包括编译内核和配置设备驱动。最后,列举了关键词和相关信息供读者参考。 ... [详细]
  • Linux如何安装Mongodb的详细步骤和注意事项
    本文介绍了Linux如何安装Mongodb的详细步骤和注意事项,同时介绍了Mongodb的特点和优势。Mongodb是一个开源的数据库,适用于各种规模的企业和各类应用程序。它具有灵活的数据模式和高性能的数据读写操作,能够提高企业的敏捷性和可扩展性。文章还提供了Mongodb的下载安装包地址。 ... [详细]
  • mysql-cluster集群sql节点高可用keepalived的故障处理过程
    本文描述了mysql-cluster集群sql节点高可用keepalived的故障处理过程,包括故障发生时间、故障描述、故障分析等内容。根据keepalived的日志分析,发现bogus VRRP packet received on eth0 !!!等错误信息,进而导致vip地址失效,使得mysql-cluster的api无法访问。针对这个问题,本文提供了相应的解决方案。 ... [详细]
  • 本文介绍了在Linux下安装和配置Kafka的方法,包括安装JDK、下载和解压Kafka、配置Kafka的参数,以及配置Kafka的日志目录、服务器IP和日志存放路径等。同时还提供了单机配置部署的方法和zookeeper地址和端口的配置。通过实操成功的案例,帮助读者快速完成Kafka的安装和配置。 ... [详细]
  • 本文讨论了在openwrt-17.01版本中,mt7628设备上初始化启动时eth0的mac地址总是随机生成的问题。每次随机生成的eth0的mac地址都会写到/sys/class/net/eth0/address目录下,而openwrt-17.01原版的SDK会根据随机生成的eth0的mac地址再生成eth0.1、eth0.2等,生成后的mac地址会保存在/etc/config/network下。 ... [详细]
  • 本文介绍了在CentOS上安装Python2.7.2的详细步骤,包括下载、解压、编译和安装等操作。同时提供了一些注意事项,以及测试安装是否成功的方法。 ... [详细]
  • 树莓派语音控制的配置方法和步骤
    本文介绍了在树莓派上实现语音控制的配置方法和步骤。首先感谢博主Eoman的帮助,文章参考了他的内容。树莓派的配置需要通过sudo raspi-config进行,然后使用Eoman的控制方法,即安装wiringPi库并编写控制引脚的脚本。具体的安装步骤和脚本编写方法在文章中详细介绍。 ... [详细]
  • VScode格式化文档换行或不换行的设置方法
    本文介绍了在VScode中设置格式化文档换行或不换行的方法,包括使用插件和修改settings.json文件的内容。详细步骤为:找到settings.json文件,将其中的代码替换为指定的代码。 ... [详细]
  • Nginx使用(server参数配置)
    本文介绍了Nginx的使用,重点讲解了server参数配置,包括端口号、主机名、根目录等内容。同时,还介绍了Nginx的反向代理功能。 ... [详细]
  • baresip android编译、运行教程1语音通话
    本文介绍了如何在安卓平台上编译和运行baresip android,包括下载相关的sdk和ndk,修改ndk路径和输出目录,以及创建一个c++的安卓工程并将目录考到cpp下。详细步骤可参考给出的链接和文档。 ... [详细]
  • Android Studio Bumblebee | 2021.1.1(大黄蜂版本使用介绍)
    本文介绍了Android Studio Bumblebee | 2021.1.1(大黄蜂版本)的使用方法和相关知识,包括Gradle的介绍、设备管理器的配置、无线调试、新版本问题等内容。同时还提供了更新版本的下载地址和启动页面截图。 ... [详细]
  • 本文记录了在vue cli 3.x中移除console的一些采坑经验,通过使用uglifyjs-webpack-plugin插件,在vue.config.js中进行相关配置,包括设置minimizer、UglifyJsPlugin和compress等参数,最终成功移除了console。同时,还包括了一些可能出现的报错情况和解决方法。 ... [详细]
  • 本文介绍了在Mac上安装Xamarin并使用Windows上的VS开发iOS app的方法,包括所需的安装环境和软件,以及使用Xamarin.iOS进行开发的步骤。通过这种方法,即使没有Mac或者安装苹果系统,程序员们也能轻松开发iOS app。 ... [详细]
author-avatar
前前后后zzyyix
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有