作者:前前后后zzyyix | 来源:互联网 | 2023-09-23 20:43
篇首语:本文由编程笔记#小编为大家整理,主要介绍了flume拦截器及问题解决相关的知识,希望对你有一定的参考价值。
概述
Flume 除了主要的三大组件 Source、Channel和 Sink,还有一些其他灵活的组件,如拦截器、SourceRunner运行器、Channel选择器和Sink处理器等。
组件框架图
今天主要来看看拦截器,先看下组件框架流程图,熟悉了大致框架流程学习起来必然会更加轻松:
- 接收事件
- 根据配置选择对应的Source运行器(EventDrivenSourceRunner 和 PollableSourceRunner)
- 处理器处理事件(Load-Balancing Sink 和 Failover Sink 处理器)
- 将事件传递给拦截器链
- 将每个事件传递给Channel选择器
- 返回写入事件的Channel列表
- 将所有事件写入每个必需的Channel,只有一个事务被打开
- 可选Channel(配置可选Channel后不管其是否写入成功)
拦截器
拦截器(Interceptor)是简单插件式组件,设置在Source和Channel之间,Source接收到event在写入到对应的Channel之前,可以通过调用的拦截器转换或者删除过滤掉一部分event。通过拦截器后返回的event数不能大于原本的数量。在一个Flume 事件流程中,可以添加任意数量的拦截器转换或者删除从单个Source中来的事件,Source将同一个事务的所有事件event传递给Channel处理器,进而依次可以传递给多个拦截器,直至从最后一个拦截器中返回的最终事件event写入到对应的Channel中。
flume-1.7版本支持的拦截器:
编写自定义拦截器
自定义的拦截器编写,我们只需要实现一个Interceptor接口即可,该接口的定义如下:
public interface Interceptor
/* 任何需要拦截器初始化或者启动的操作就可以定义在此,无则为空即可 */
public void initialize();
/* 每次只处理一个Event */
public Event intercept(Event event);
/* 量处理Event */
public List<Event> intercept(List<Event> events);
/*需要拦截器执行的任何closing/shutdown操作&#xff0c;一般为空 */
public void close();
/* 获取配置文件中的信息&#xff0c;必须要有一个无参的构造方法 */
public interface Builder extends Configurable
public Interceptor build();
接口中的几个方法或者内部接口含义代码中已经标注&#xff0c;需要留意的地方就是考虑到多线程运行Source时&#xff0c;需要保证编写的代码是线程安全的。这里就不展示自定义拦截器代码了&#xff0c;仿照已有的拦截器&#xff0c;可以很容易的编写一个简单功能的自定义拦截器的。
实际使用及问题
问题&#xff1a;
目前环境中使用的都是tailSource、hdfsSink&#xff0c;在sink时根据时间对日志分割成不同的目录&#xff0c;但是实际过程中存在一些延迟&#xff0c;导致sink写入hdfs时的时间和日志文件中记录的时间存在一些差异&#xff1b;并且不能保留原有的日志文件名。
需求&#xff1a;
- 根据日志中记录的时间对文件进行分目录存储
- 将source端读取的日志名字符串添加至hdfsSink写入hdfs的文件名中&#xff08;在hdfs文件中可以根据文件名区分日志&#xff09;
日志格式如下&#xff1a;
2017/01/13 13:30:00 ip:123.178.46.252 message:["s":"bbceif1484117100097","u":"354910072847819","id":"2x1kfBk63z","e":
2017/01/13 14:50:00 ip:123.178.46.252 message:["s":"bbceif1484117100097","u":"354910072847819","id":"2x1kfBk63z","e":
2017/01/13 15:52:00 ip:123.178.46.252 message:["s":"bbceif1484117100097","u":"354910072847819","id":"2x1kfBk63z","e":
2017/01/13 16:53:00 ip:123.178.46.252 message:["s":"bbceif1484117100097","u":"354910072847819","id":"2x1kfBk63z","e":
2017/01/14 13:50:00 ip:123.178.46.252 message:["s":"bbceif1484117100097","u":"354910072847819","id":"2x1kfBk63z","e":
2017/01/14 13:50:00 ip:123.178.46.252 message:["s":"bbceif1484117100097","u":"354910072847819","id":"2x1kfBk63z","e":
2017/01/14 14:50:00 ip:123.178.46.252 message:["s":"bbceif1484117100097","u":"354910072847819","id":"2x1kfBk63z","e":
2017/01/14 14:56:00 ip:123.178.46.252 message:["s":"bbceif1484117100097","u":"354910072847819","id":"2x1kfBk63z","e":
如何实现以上需求&#xff1f;
- 要了解TaildirSource如何读取日志文件&#xff0c;按行读取还是按数据量大小&#xff1f;
分析代码可知&#xff0c;无论单个事件操作还是批量操作均是按行读取 - hdfsSink如何对文件进行分目录&#xff1f;
若定义了hdfs.useLocalTimeStamp &#61; true &#xff0c;则是根据本地时间戳分目录&#xff0c;否则是从事件的header中获取时间戳。
明白了这两个问题&#xff0c;就可以继续往前走了。
实现需求1
Source端&#xff1a;
经过调研查阅资料发现&#xff0c;有拦截器就可以直接实现该目标功能。使用RegexExtractorInterceptor正则抽取拦截器&#xff0c;匹配日志中的时间字符串&#xff0c;将其添加至Event的header中&#xff08;header的key值为timestamp&#xff09;&#xff0c;写入header时序列化只能使用org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer&#xff08;该序列化器内部根据配置传入的pattern将时间转换为时间戳格式&#xff09;&#xff1a;
agent1.sources.r1.interceptors &#61; inter
agent1.sources.r1.interceptors.inter.type &#61; regex_extractor
agent1.sources.r1.interceptors.inter.regex &#61; ^(\\\\d\\\\d\\\\d\\\\d/\\\\d\\\\d/\\\\d\\\\d\\\\s\\\\d\\\\d:\\\\d\\\\d:\\\\d\\\\d).*
agent1.sources.r1.interceptors.inter.serializers &#61; s1
#agent1.sources.r1.interceptors.inter.serializers.s1.type &#61; org.apache.flume.interceptor.RegexExtractorInterceptorPassThroughSerializer //该序列化内部只是将传入的匹配项直接返回return
agent1.sources.r1.interceptors.inter.serializers.s1.type &#61; org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer
agent1.sources.r1.interceptors.inter.serializers.s1.name &#61; timestamp
agent1.sources.r1.interceptors.inter.serializers.s1.pattern &#61; yyyy/MM/dd HH:mm:ss
Sink端&#xff1a;
Sink端只需要注意不要设置hdfs.useLocalTimeStamp 为 true&#xff0c;也就是不使用本地时间&#xff0c;默认为false即可。
agent1.sinks.k1.type &#61; hdfs
agent1.sinks.k1.channel &#61; c2
agent1.sinks.k1.hdfs.path &#61; /user/portal/tmp/syx/test2/%Y%m%d/%Y%m%d%H
agent1.sinks.k1.hdfs.filePrefix &#61; events-%[localhost]-%timestamp //%[localhost] 获取主机名&#xff0c;%timestamp 获取事件header中key为timestamp的值value
#agent1.sinks.k1.hdfs.useLocalTimeStamp &#61; true //注意此处直接使用Event header中的timestamp&#xff0c;不适用本地时间戳
agent1.sinks.k1.hdfs.callTimeout &#61; 100000
实现需求2
tailDirSource端使用参数&#xff1a;
fileHeader | false | Whether to add a header storing the absolute path filename. |
fileHeaderKey | file | Header key to use when appending absolute path filename to event header. |
fileHeader 设置为 true &#xff0c;可以将日志文件的绝对路径存储在事件的header中&#xff1b;
fileHeaderKey 目前来说不需要设置&#xff0c;它指定了存储在header中路径的key 名&#xff08;header中是以key-value对存储&#xff09;&#xff0c;默认为 file。如下&#xff1a;
Event: headers:timestamp&#61;1452581700000, file&#61;/home/hadoop_portal/tiany/test.log body: 32 30 31 36 2F 30 31 2F 31 32 20 31 34 3A 35 35 2016/01/12 14: