热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

笨鸟的平凡之路FLUME

笨鸟的平凡之路-FLUME,Go语言社区,Golang程序员人脉社
Flume的简介

Flume是一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统。
◆ Flume可以采集文件,socket数据包、文件、文件夹、kafka等各种形式源数据,又可以将采集到的数据(下沉sink)输出到HDFS、hbase、hive、kafka等众多外部存储系统中
◆ 一般的采集需求,通过对flume的简单配置即可实现
◆ Flume针对特殊场景也具备良好的自定义扩展能力,
因此,flume可以适用于大部分的日常数据采集场景

Flume的运行机制

1、 Flume分布式系统中最核心的角色是agent,flume采集系统就是由一个个agent所连接起来形成
2、 每一个agent相当于一个数据传递员,内部有三个组件:
a) Source:采集组件,用于跟数据源对接,以获取数据
b) Channel:传输通道组件,用于从source将数据传递到sink
c) Sink:下沉组件,用于往下一级agent传递数据或者往最终存储系统传递数据
在这里插入图片描述

Flume的安装

Flume的安装在此不赘述.

Flume的简单测试(以CDH版为例)

1.需要在CDH的界面查看Flume的Agent代理名称,这个代理名称涉及后续conf文件的编辑,非常重要
步骤:CDH主界面->Flume->配置->代理名称
在这里插入图片描述

2.创建被监控文件以及配置文件的编写(以与kafka集成为例)
2.1:创建被监控的文件
在装有Flume的节点上创建文件
vi /opt/a.txt
2.2:需要在安装了Flume的节点上编写conf配置文件,配置文件的具体位置由自己定义,自己记得就好.
示例:vi /home/testFlume.conf(必须以.conf结尾)
文件内容如下,注意这里的tier1就是在CDH上查看到的代理名称,因人而异,有的是a1等等:
tier1.sources = r1
tier1.sinks = k1
tier1.channels = c1

tier1.sources.r1.type = exec
tier1.sources.r1.command = tail -F /opt/a.txt
tier1.sources.r1.shell = /bin/sh -c

tier1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
tier1.sinks.k1.topic = test
tier1.sinks.k1.brokerList = kafka1:9092,kafka2:9092
tier1.sinks.k1.requiredAcks = 1
tier1.sinks.k1.batchSize = 20
tier1.sinks.k1.channel = c1

tier1.channels.c1.type = memory
tier1.channels.c1.capacity = 1000000
tier1.channels.c1.transactiOnCapacity= 10000

tier1.sources.r1.channels = c1
tier1.sinks.k1.channel = c1

2.3:在装有kafka的节点创建topic(与配置文件中保持一致)
kafka-topics --create --zookeeper zk1:2181,zk2:2181,zk3:2181 --replication-factor 1 --partitions 1 --topic test

2.4:创建消费者
(从头开始消费)
kafka-console-consumer --bootstrap-server kafka1:9092,kafka2:9092 --from-beginning --topic test
(非从头开始消费)
kafka-console-consumer --bootstrap-server kafka1:9092,kafka2:9092 --topic test

2.5:在装有flume的节点启动flume
flume-ng agent -n tier1 -c /opt/cm-5.11.2/run/cloudera-scm-agent/process/767-flume-AGENT/ -f /home/testFlume.conf -Dflume.root.logger=INFO,console

注释:
tier1:是agent的代理名称
/opt/cm-5.11.2/run/cloudera-scm-agent/process/767-flume-AGENT/:是通过命令行 find / -name flume.conf 找到的(如果不是CDH版的找到flume的conf目录,一般是/opt/apache-flume-1.6.0-bin/conf/)
/home/testFlume.conf:自己编写的配置文件

2.6:向被监控的文件中追加数据
echo hello flume >>/opt/a.txt

2.7:观察装有kafka节点的消费者控制台是否hello flume

Flume数据丢失和数据重复

1.提高flume的版本,1.7版本之后,可以解决断点续传和数据重复问题
-----1.7版本的flume提供Taildir Source方式
它定期以JSON格式写入给定位置文件上每个文件的最后读取位置。如果Flume由于某种原因stop或down,它可以从文件position处重新开始tail。
具体的配置:
pro.sources = s1
pro.channels = c1
pro.sinks = k1

pro.sources.s1.type = TAILDIR
pro.sources.s1.positiOnFile= /home/dev/flume/flume-1.8.0/log/taildir_position.json
pro.sources.s1.filegroups = f1
pro.sources.s1.filegroups.f1 = /home/dev/log/moercredit/logstash.log
pro.sources.s1.headers.f1.headerKey1 = aaa
pro.sources.s1.fileHeader = true

pro.channels.c1.type = memory
pro.channels.c1.capacity = 1000
pro.channels.c1.transactiOnCapacity= 100

pro.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
pro.sinks.k1.kafka.topic = moercredit_log_test
pro.sinks.k1.kafka.bootstrap.servers = cdh1:9092,cdh2:9092,cdh3:9092
pro.sinks.k1.kafka.flumeBatchSize = 20
pro.sinks.k1.kafka.producer.acks = 1
pro.sinks.k1.kafka.producer.linger.ms = 1
pro.sinks.k1.kafka.producer.compression.type = snappy

pro.sources.s1.channels = c1
pro.sinks.k1.channel = c1

2.如果不是1.7版本的flume
将channels.type方式从memory改成file
memory的形式是基于内存缓冲数据
file的形式是设置检查点
两者的区别:
type选择memory时Channel的性能最好,但是如果Flume进程意外挂掉可能会丢失数据;type选择file时Channel的容错性更好,但是性能上会比memory channel差。
具体配置:
a1.sources = r1
a1.sinks = k1
a1.channels = c1

a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /xuyi/a.txt
a1.sources.r1.shell = /bin/sh -c

a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = test
a1.sinks.k1.brokerList = kafka1:9092,kafka2:9092
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.batchSize = 20
a1.sinks.k1.channel = c1

#a1.channels.c1.type = memory
a1.channels.c1.type = file
#这两个文件夹最好手动创建
a1.channels.c1.checkpointDir = /opt/checkpointDir
a1.channels.c1.dataDirs = /opt/dataDirs

a1.channels.c1.capacity = 1000000
a1.channels.c1.transactiOnCapacity= 10000

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

参考地址:
http://rdc.hundsun.com/portal/article/941.html
https://blog.csdn.net/Abysscarry/article/details/89420560


推荐阅读
  • HBase在金融大数据迁移中的应用与挑战
    随着最后一台设备的下线,标志着超过10PB的HBase数据迁移项目顺利完成。目前,新的集群已在新机房稳定运行超过两个月,监控数据显示,新集群的查询响应时间显著降低,系统稳定性大幅提升。此外,数据消费的波动也变得更加平滑,整体性能得到了显著优化。 ... [详细]
  • 从0到1搭建大数据平台
    从0到1搭建大数据平台 ... [详细]
  • V8不仅是一款著名的八缸发动机,广泛应用于道奇Charger、宾利Continental GT和BossHoss摩托车中。自2008年以来,作为Chromium项目的一部分,V8 JavaScript引擎在性能优化和技术创新方面取得了显著进展。该引擎通过先进的编译技术和高效的垃圾回收机制,显著提升了JavaScript的执行效率,为现代Web应用提供了强大的支持。持续的优化和创新使得V8在处理复杂计算和大规模数据时表现更加出色,成为众多开发者和企业的首选。 ... [详细]
  • 本文详细介绍了HDFS的基础知识及其数据读写机制。首先,文章阐述了HDFS的架构,包括其核心组件及其角色和功能。特别地,对NameNode进行了深入解析,指出其主要负责在内存中存储元数据、目录结构以及文件块的映射关系,并通过持久化方案确保数据的可靠性和高可用性。此外,还探讨了DataNode的角色及其在数据存储和读取过程中的关键作用。 ... [详细]
  • 在《Cocos2d-x学习笔记:基础概念解析与内存管理机制深入探讨》中,详细介绍了Cocos2d-x的基础概念,并深入分析了其内存管理机制。特别是针对Boost库引入的智能指针管理方法进行了详细的讲解,例如在处理鱼的运动过程中,可以通过编写自定义函数来动态计算角度变化,利用CallFunc回调机制实现高效的游戏逻辑控制。此外,文章还探讨了如何通过智能指针优化资源管理和避免内存泄漏,为开发者提供了实用的编程技巧和最佳实践。 ... [详细]
  • 本文深入探讨了NoSQL数据库的四大主要类型:键值对存储、文档存储、列式存储和图数据库。NoSQL(Not Only SQL)是指一系列非关系型数据库系统,它们不依赖于固定模式的数据存储方式,能够灵活处理大规模、高并发的数据需求。键值对存储适用于简单的数据结构;文档存储支持复杂的数据对象;列式存储优化了大数据量的读写性能;而图数据库则擅长处理复杂的关系网络。每种类型的NoSQL数据库都有其独特的优势和应用场景,本文将详细分析它们的特点及应用实例。 ... [详细]
  • 第二章:Kafka基础入门与核心概念解析
    本章节主要介绍了Kafka的基本概念及其核心特性。Kafka是一种分布式消息发布和订阅系统,以其卓越的性能和高吞吐量而著称。最初,Kafka被设计用于LinkedIn的活动流和运营数据处理,旨在高效地管理和传输大规模的数据流。这些数据主要包括用户活动记录、系统日志和其他实时信息。通过深入解析Kafka的设计原理和应用场景,读者将能够更好地理解其在现代大数据架构中的重要地位。 ... [详细]
  • 在搭建Hadoop集群以处理大规模数据存储和频繁读取需求的过程中,经常会遇到各种配置难题。本文总结了作者在实际部署中遇到的典型问题,并提供了详细的解决方案,帮助读者避免常见的配置陷阱。通过这些经验分享,希望读者能够更加顺利地完成Hadoop集群的搭建和配置。 ... [详细]
  • Hadoop 2.6 主要由 HDFS 和 YARN 两大部分组成,其中 YARN 包含了运行在 ResourceManager 的 JVM 中的组件以及在 NodeManager 中运行的部分。本文深入探讨了 Hadoop 2.6 日志文件的解析方法,并详细介绍了 MapReduce 日志管理的最佳实践,旨在帮助用户更好地理解和优化日志处理流程,提高系统运维效率。 ... [详细]
  • NoSQL数据库,即非关系型数据库,有时也被称作Not Only SQL,是一种区别于传统关系型数据库的管理系统。这类数据库设计用于处理大规模、高并发的数据存储与查询需求,特别适用于需要快速读写大量非结构化或半结构化数据的应用场景。NoSQL数据库通过牺牲部分一致性来换取更高的可扩展性和性能,支持分布式部署,能够有效应对互联网时代的海量数据挑战。 ... [详细]
  • 本文介绍了如何使用Hive分析用户最长连续登录天数的方法。首先对数据进行排序,然后计算相邻日期之间的差值,接着按用户ID分组并累加连续登录天数,最后求出每个用户的最大连续登录天数。此外,还探讨了该方法在其他领域的应用,如股票市场中最大连续涨停天数的分析。 ... [详细]
  • 分布式一致性算法:Paxos 的企业级实战
    一、简介首先我们这个平台是ES专题技术的分享平台,众所周知,ES是一个典型的分布式系统。在工作和学习中,我们可能都已经接触和学习过多种不同的分布式系统了,各 ... [详细]
  • Hudi是一种数据湖的存储格式,在Hadoop文件系统之上提供了更新数据和删除数据的能力以及流式消费变化数据的能力。应用场景近实时数据摄取Hudi支持插入、更新和删除数据的能力。您 ... [详细]
  • javascript分页类支持页码格式
    前端时间因为项目需要,要对一个产品下所有的附属图片进行分页显示,没考虑ajax一张张请求,所以干脆一次性全部把图片out,然 ... [详细]
  • Python 实战:异步爬虫(协程技术)与分布式爬虫(多进程应用)深入解析
    本文将深入探讨 Python 异步爬虫和分布式爬虫的技术细节,重点介绍协程技术和多进程应用在爬虫开发中的实际应用。通过对比多进程和协程的工作原理,帮助读者理解两者在性能和资源利用上的差异,从而在实际项目中做出更合适的选择。文章还将结合具体案例,展示如何高效地实现异步和分布式爬虫,以提升数据抓取的效率和稳定性。 ... [详细]
author-avatar
小子少耍酷10
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有