热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Flume笔记一基础

FlumeFlume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume支持在日志系统中定

Flume

Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统, Flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。

架构
在这里插入图片描述

运行机制:

Flume 的核心是把数据从数据源(source)收集过来,在将收集到的数据送到指定的目的地(sink)。为了保证输送的过程一定成功,在送到目的地(sink)之前,会先缓存数据(channel),待数据真正到达目的地(sink)后,flume 在删除自己缓存的数据。

核心的角色是 agent, agent 本身是一个 Java 进程, 一般运行在日志收集节点。 flume 采集系统就是由一个个 agent 所连接起来形成。

agent的三个组件:

Source:
Source 是负责接收数据到 Flume Agent 的组件。Source 组件可以处理各种类型、各种格式的日志数据,包括 avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy。

Channel:
Channel 是位于 Source 和 Sink 之间的缓冲区。因此,Channel 允许 Source 和 Sink 运作在不同的速率上。Channel 是线程安全的,可以同时处理几个 Source 的写入操作和几个Sink 的读取操作。
Flume 自带两种 Channel:Memory Channel 和 File Channel 以及 Kafka Channel。
Memory Channel 是内存中的队列。Memory Channel 在不需要关心数据丢失的情景下适用。如果需要关心数据丢失,那么 Memory Channel 就不应该使用,因为程序死亡、机器宕机或者重启都会导致数据丢失。
File Channel 将所有事件写到磁盘。因此在程序关闭或机器宕机的情况下不会丢失数据。

Sink:
Sink 不断地轮询 Channel 中的事件且批量地移除它们,并将这些事件批量写入到存储或索引系统、或者被发送到另一个 Flume Agent。
Sink 组件目的地包括 hdfs、logger、avro、thrift、ipc、file、HBase、solr、自定义。

Event
传输单元,Flume 数据传输的基本单元,以 Event 的形式将数据从源头送至目的地。Event 由 Header 和 Body 两部分组成,Header 用来存放该 event 的一些属性,为 K-V 结构,Body 用来存放该条数据,形式为字节数组。

在这里插入图片描述

安装:

1.解压
2.修改flume-env.sh
添加java环境变量
3.验证
flume-ng version

案例:


nc

netcat 源

1.flume配置文件

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# 对当前agent的命名组件 a1:当前agent的名字 如果在同一节点有多个agent
# 需要区别开 source,sink,channel后边加s说明可能会有多个组件# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# 描述和配置当前的source 监听的节点和端口# Describe the sink
a1.sinks.k1.type = logger
# sink的类型是log# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# c1类型是内存级别 缓冲大小阈值单位:事件 一次传输的事件量# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
# 绑定source和sink到channel sink只能绑定一个channel 所以后面没有s

flume启动命令:

flume-ng agent --conf-file 配置文件 --name a1 -Dflume.root.logger=INFO,console
#agent:启动一个agent
#Dflume.root.logger=INFO,console 打印到控制台 不常用

flume启动后相当于开启了一个服务端
在这里插入图片描述
在另一个会话页面:

nc localhost 44444

相当于开启了一个客户端
此时在客户端输入就会在服务端以log形式打印到控制台
在这里插入图片描述
在这里插入图片描述

案例2


利用exec源监控某个文件

Exec Source在启动时运行给定的Unix命令,并期望进程在标准输出上产生连续的数据(除非属性logStdErr设置为true,否则stderr将被丢弃)。 如果进程由于任何原因退出,source也会退出,并且不会生成更多数据。

a1.sources = r1
a1.sinks = k1
a1.channels = c1a1.sources.r1.type = exec
a1.sources.r1.command = tail -f a1.sinks.k1.type = loggera1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

追加内容到要监控的文件

cat 2.txt >> flume.log

在这里插入图片描述

案例3:


flume-hdfs

flume要想将数据输出到hdfs,需要有hadoop相关jar包
在这里插入图片描述
flume官方手册

http://flume.apache.org/releases/content/1.7.0/FlumeUserGuide.html

滚动文件:rollsize 设为 hdfs块大小 128mb

滚动文件夹:用处:一天滚动一个文件夹

可以配合hive分区 按天分区load数据就会很方便

a2.sources = r2
a2.sinks = k2
a2.channels = c2a2.sources.r2.type = exec
a2.sources.r2.command = tail -F /opt/test.log
a2.sources.r2.shell = /bin/bash -c #解析方式a2.sinks.k2.type = hdfs
a2.sinks.k2.hdfs.path = hdfs://192.168.56.20:9000/flume/%Y%m%d/%Ha2.sinks.k2.hdfs.round = true # 按照时间滚动文件夹
a2.sinks.k2.hdfs.roundValue = 1 # 多长时间创建一个新文件夹
a2.sinks.k2.hdfs.roundUnit = hour # 重新定义时间单位a2.sinks.k2.hdfs.useLocalTimeStamp = true # 使用本地时间戳a2.sinks.k2.hdfs.batchSize = 1000 # 积攒多少个Event flush到hdfs一次
a2.sinks.k2.hdfs.fileType = DataStream # 设置文件类型
a2.sinks.k2.hdfs.rollIntreval = 60 # 多久生成一个新文件 s
a2.sinks.k2.hdfs.rollSize = 134217700 # 文件滚动与Event数量无关 设置大小比一个hdfs块128MB稍小
a2.sinks.k2.hdfs.rollCount = 0 # 文件滚动与Event数量无关a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2

案例4:


监控多个文件

spooldir source

通过此源,您可以通过将要摄取的文件放入磁盘上的“Spooling”目录中来摄取数据。该源将监视指定目录中的新文件,并从出现的新文件中解析事件。事件解析逻辑是可插入的。将给定文件完全读入通道后,将其重命名以指示完成(或选择删除)。

与Exec源不同,此源是可靠的,即使Flume重新启动或终止,它也不会丢失数据。为了获得这种可靠性,必须仅将不可变的唯一命名的文件放入Spooling目录中。Flume尝试检测这些问题情况,如果违反这些条件,将返回失败:

如果将文件放入Spooling目录后写入文件,Flume将在其日志文件中打印错误并停止处理。
如果以后再使用文件名,Flume将在其日志文件中打印错误并停止处理。
为避免上述问题,将唯一的标识符(例如时间戳)添加到日志文件名称(当它们移到Spooling目录中时)可能会很有用。

尽管有此来源的可靠性保证,但是在某些情况下,如果发生某些下游故障,则事件可能会重复。这与Flume其他组件提供的保证是一致的。

a2.sources = r2
a2.sinks = k2
a2.channels = c2a2.sources.r2.type = spooldir
a2.sources.r2.spoolDir = /opt/module/flume/upload a2.sinks.k2.type = hdfs
a2.sinks.k2.hdfs.path = hdfs://192.168.56.20:9000/flume/%Y%m%d/%Ha2.sinks.k2.hdfs.round = true # 按照时间滚动文件夹
a2.sinks.k2.hdfs.roundValue = 1 # 多长时间创建一个新文件夹
a2.sinks.k2.hdfs.roundUnit = hour # 重新定义时间单位a2.sinks.k2.hdfs.useLocalTimeStamp = true # 使用本地时间戳a2.sinks.k2.hdfs.batchSize = 1000 # 积攒多少个Event flush到hdfs一次
a2.sinks.k2.hdfs.fileType = DataStream # 设置文件类型
a2.sinks.k2.hdfs.rollIntreval = 60 # 多久生成一个新文件 s
a2.sinks.k2.hdfs.rollSize = 134217700 # 文件滚动与Event数量无关 设置大小比一个hdfs块128MB稍小
a2.sinks.k2.hdfs.rollCount = 0 # 文件滚动与Event数量无关a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2

先上传 后改名为已读

但是不能动态监控变化的文件

案例5:


监控动态多文件

1.7版本 Talldir实现断点续传

在通过Flume收集日志的业务场景中,一般都会遇到下面的情况,在日志收集服务器的某个目录下,会按照一段时间生成一个日志文件,并且日志会不断的追加到这个文件中,比如,每小时一个命名规则为log_20151015_10.log的日志文件,所有10点产生的日志都会追加到这个文件中,到了11点,就会生成另一个log_20151015_11.log的文件。

这种场景如果通过flume(1.6)收集,当前提供的Spooling Directory Source和Exec Source均不能满足动态实时收集的需求,在当前正在开发的flume1.7版本中,提供了一个非常好用的TaildirSource,使用这个source,可以监控一个目录,并且使用正则表达式匹配该目录中的文件名进行实时收集。

a1.sources.r1.type = TAILDIRa1.sources.r1.filegroups = f1a1.sources.r1.filegroups.f1 = 第一个路径a1.sources.r1.positionFile = 路径

在这里插入图片描述
位置文件:
实现断点续传 json格式 inode(linux文件系统文件标识)记录了被监控文件位置信息
在这里插入图片描述


推荐阅读
  • 本文深入探讨了HTTP请求和响应对象的使用,详细介绍了如何通过响应对象向客户端发送数据、处理中文乱码问题以及常见的HTTP状态码。此外,还涵盖了文件下载、请求重定向、请求转发等高级功能。 ... [详细]
  • 本文详细探讨了HTML表单中GET和POST请求的区别,包括它们的工作原理、数据传输方式、安全性及适用场景。同时,通过实例展示了如何在Servlet中处理这两种请求。 ... [详细]
  • 采用IKE方式建立IPsec安全隧道
    一、【组网和实验环境】按如上的接口ip先作配置,再作ipsec的相关配置,配置文本见文章最后本文实验采用的交换机是H3C模拟器,下载地址如 ... [详细]
  • 本文将介绍如何编写一些有趣的VBScript脚本,这些脚本可以在朋友之间进行无害的恶作剧。通过简单的代码示例,帮助您了解VBScript的基本语法和功能。 ... [详细]
  • 本文详细介绍了如何在Linux系统上安装和配置Smokeping,以实现对网络链路质量的实时监控。通过详细的步骤和必要的依赖包安装,确保用户能够顺利完成部署并优化其网络性能监控。 ... [详细]
  • 本文详细介绍了Java编程语言中的核心概念和常见面试问题,包括集合类、数据结构、线程处理、Java虚拟机(JVM)、HTTP协议以及Git操作等方面的内容。通过深入分析每个主题,帮助读者更好地理解Java的关键特性和最佳实践。 ... [详细]
  • 本文详细介绍如何利用已搭建的LAMP(Linux、Apache、MySQL、PHP)环境,快速创建一个基于WordPress的内容管理系统(CMS)。WordPress是一款流行的开源博客平台,适用于个人或小型团队使用。 ... [详细]
  • 如何配置Unturned服务器及其消息设置
    本文详细介绍了Unturned服务器的配置方法和消息设置技巧,帮助用户了解并优化服务器管理。同时,提供了关于云服务资源操作记录、远程登录设置以及文件传输的相关补充信息。 ... [详细]
  • 360SRC安全应急响应:从漏洞提交到修复的全过程
    本文详细介绍了360SRC平台处理一起关键安全事件的过程,涵盖从漏洞提交、验证、排查到最终修复的各个环节。通过这一案例,展示了360在安全应急响应方面的专业能力和严谨态度。 ... [详细]
  • 本文详细分析了Hive在启动过程中遇到的权限拒绝错误,并提供了多种解决方案,包括调整文件权限、用户组设置以及环境变量配置等。 ... [详细]
  • 深入理解Redis的数据结构与对象系统
    本文详细探讨了Redis中的数据结构和对象系统的实现,包括字符串、列表、集合、哈希表和有序集合等五种核心对象类型,以及它们所使用的底层数据结构。通过分析源码和相关文献,帮助读者更好地理解Redis的设计原理。 ... [详细]
  • Python 异步编程:ASGI 服务器与框架详解
    自 Python 3.5 引入 async/await 语法以来,异步编程迅速崛起,吸引了大量开发者的关注。本文将深入探讨 ASGI(异步服务器网关接口)及其在现代 Python Web 开发中的应用,介绍主流的 ASGI 服务器和框架。 ... [详细]
  • 本文详细介绍了Ionic框架的使用方法及其与Angular的集成。Ionic框架是一个强大的前端开发工具,适用于构建跨平台的移动应用程序。文章将探讨如何引入必要的CSS和JavaScript文件,并解释bundle.js中包含的核心功能,如路由等。 ... [详细]
  • Python处理Word文档的高效技巧
    本文详细介绍了如何使用Python处理Word文档,涵盖从基础操作到高级功能的各种技巧。我们将探讨如何生成文档、定义样式、提取表格数据以及处理超链接和图片等内容。 ... [详细]
  • 使用Solr从MySQL导入数据构建全量索引
    为了更好地掌握Solr的各项功能,本文档将在本地Windows环境中演示如何从MySQL数据库中导入数据至Solr,并构建全量索引。这将有助于开发者熟悉Solr的数据处理流程,尤其是在无法直接在生产服务器上进行实践的情况下。 ... [详细]
author-avatar
手机用户2502907603
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有