热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

flume实时采集mysql数据到kafka中并输出

centos7(运行于vbox虚拟机)flume1.9.0(flume-ng-sql-source插件版本1.5.3)jdk1.8kafka(版本忘了后续更新)zookeeper(
  • centos7(运行于vbox虚拟机)
  • flume1.9.0(flume-ng-sql-source插件版本1.5.3)
  • jdk1.8
  • kafka(版本忘了后续更新)
  • zookeeper(版本忘了后续更新)
  • mysql5.7.24
  • xshell

准备工作

flume安装

暂略,后续更新

flume简介

Apache Flume是一个分布式的、可靠的、可用的系统,用于有效地收集、聚合和将大量日志数据从许多不同的源移动到一个集中的数据存储。在大数据生态圈中,flume经常用于完成数据采集的工作。

flume实时采集mysql数据到kafka中并输出

其实时性很高,延迟大约1-2s,可以做到准实时。

又因为mysql是程序员常用的数据库,所以以flume实时采集mysql数据库为例子。要了解flume如何采集数据,首先要初探其架构:

Flume 运行的核心是 Agent。Flume以agent为最小的独立运行单位。一个agent就是一个JVM。它是一个完整的数据收集工具,含有三个核心组件,分别是

source、 channel、 sink。通过这些组件, Event 可以从一个地方流向另一个地方,如下图所示。

flume实时采集mysql数据到kafka中并输出

三大组件

source

Source是数据的收集端,负责将数据捕获后进行特殊的格式化,将数据封装到事件(event) 里,然后将事件推入Channel中。

Flume提供了各种source的实现,包括Avro Source、Exce Source、Spooling Directory Source、NetCat Source、Syslog Source、Syslog TCP Source、Syslog UDP Source、HTTP Source、HDFS Source等。如果内置的Source无法满足需要, Flume还支持自定义Source。

可以看到原生flume的source并不支持sql source,所以我们需要添加插件,后续将提到如何添加。

channel

Channel是连接Source和Sink的组件,大家可以将它看做一个数据的缓冲区(数据队列),它可以将事件暂存到内存中也可以持久化到本地磁盘上, 直到Sink处理完该事件。

Flume对于Channel,则提供了Memory Channel、JDBC Chanel、File Channel,etc。

  • MemoryChannel可以实现高速的吞吐,但是无法保证数据的完整性。
  • MemoryRecoverChannel在官方文档的建议上已经建义使用FileChannel来替换。
  • FileChannel保证数据的完整性与一致性。在具体配置不现的FileChannel时,建议FileChannel设置的目录和程序日志文件保存的目录设成不同的磁盘,以便提高效率。

sink

Flume Sink取出Channel中的数据,进行相应的存储文件系统,数据库,或者提交到远程服务器。

Flume也提供了各种sink的实现,包括HDFS sink、Logger sink、Avro sink、File Roll sink、Null sink、HBase sink,etc。

Flume Sink在设置存储数据时,可以向文件系统中,数据库中,hadoop中储数据,在日志数据较少时,可以将数据存储在文件系中,并且设定一定的时间间隔保存数据。在日志数据较多时,可以将相应的日志数据存储到Hadoop中,便于日后进行相应的数据分析。

这个例子中,我使用了kafka作为sink

下载flume-ng-sql-source插件

到这里下载flume-ng-sql-source,最新版本是1.5.3。

下载完后解压,我通过idea运行程序,使用maven打包为jar包,改名为flume-ng-sql-source-1.5.3.jar

编译完的jar包要放在放到FLUME_HOME/lib下,FLUME_HOME是自己linux下flume的文件夹,比如我的是 /opt/install/flume

kafka安装

我们使用flume将数据采集到kafka, 并启动一个kafak的消费监控,就能看到实时数据了

jdk1.8安装

暂略,后续更新

zookeeper安装

暂略,后续更新

kafka安装

暂略,后续更新

mysql5.7.24安装

暂略,后续更新

flume抽取mysql数据到kafka实战

新建一个数据库和表

在完成上述的安装工作后就可以开始着手实现demo了

首先我们要抓取mysql的数据,那么必然需要一个数据库和表,并且要记住这个数据库和表的名字,之后这些信息要写入flume的配置文件。

创建数据库:

create database test

创建表:

create table fk(
id int UNSIGNED AUTO_INCREMENT,
name VARCHAR(100) NOT NULL,
PRIMARY KEY ( id )
);

新增配置文件(重要)

cd 到flume的conf文件夹中,新增一个文件mysql-flume.conf

[root@localhost ~]# cd /opt/install/flume
[root@localhost flume]# ls
bin        conf      doap_Flume.rdf  lib      NOTICE     RELEASE-NOTES  tools
CHANGELOG  DEVNOTES  docs            LICENSE  README.md  status
[root@localhost flume]# cd conf
[root@localhost conf]# ls
flume-conf.properties.template  log4j.properties
flume-env.ps1.template          mysql-connector-java-5.1.35
flume-env.sh                    mysql-connector-java-5.1.35.tar.gz
flume-env.sh.template           mysql-flume.conf

注:mysql-flume.conf本来是没有的,是我生成的,具体配置如下所示

在这个文件中写入:

a1.channels = ch-1
a1.sources = src-1
a1.sinks = k1
###########sql source#################
# For each one of the sources, the type is defined
a1.sources.src-1.type = org.keedio.flume.source.SQLSource
a1.sources.src-1.hibernate.connection.url = jdbc:mysql://youhost:3306/test?useUnicode=true&characterEncoding=utf-8&useSSL=false
# Hibernate Database connection properties
a1.sources.src-1.hibernate.connection.user = root
a1.sources.src-1.hibernate.connection.password = xxxxxxxx
a1.sources.src-1.hibernate.connection.autocommit = true
a1.sources.src-1.hibernate.dialect = org.hibernate.dialect.MySQL5Dialect
a1.sources.src-1.hibernate.connection.driver_class = com.mysql.jdbc.Driver
a1.sources.src-1.run.query.delay=5000
a1.sources.src-1.status.file.path = /opt/install/flume/status
a1.sources.src-1.status.file.name = sqlSource.status
# Custom query
a1.sources.src-1.start.from = 0
a1.sources.src-1.custom.query = select `id`, `name` from fk
a1.sources.src-1.batch.size = 1000
a1.sources.src-1.max.rows = 1000
a1.sources.src-1.hibernate.connection.provider_class = org.hibernate.connection.C3P0ConnectionProvider
a1.sources.src-1.hibernate.c3p0.min_size=1
a1.sources.src-1.hibernate.c3p0.max_size=10

################################################################
a1.channels.ch-1.type = memory
a1.channels.ch-1.capacity = 10000
a1.channels.ch-1.transactiOnCapacity= 10000
a1.channels.ch-1.byteCapacityBufferPercentage = 20
a1.channels.ch-1.byteCapacity = 800000

################################################################
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = testTopic
a1.sinks.k1.brokerList = 10.100.4.6:9092
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.batchSize = 20

这是我的文件,其中一些隐私信息已被我用其他字符串替代了,在写mysql-flume.conf时你可以复制上面的一段代码。下面是这段代码的详细注释,你可以更加带注释版本的代码来修改自己的conf文件

# a1表示agent的名称
# source是a1的输入源
# channels是缓冲区
# sinks是a1输出目的地,本例子sinks使用了kafka
a1.channels = ch-1
a1.sources = src-1
a1.sinks = k1
###########sql source#################
# For each one of the sources, the type is defined
a1.sources.src-1.type = org.keedio.flume.source.SQLSource
# 连接mysql的一系列操作,youhost改为你虚拟机的ip地址,可以通过ifconfig或者ip addr查看
# url中要加入?useUnicode=true&characterEncoding=utf-8&useSSL=false,否则有可能连接失败
a1.sources.src-1.hibernate.connection.url = jdbc:mysql://youhost:3306/test?useUnicode=true&characterEncoding=utf-8&useSSL=false
# Hibernate Database connection properties
# mysql账号,一般都是root
a1.sources.src-1.hibernate.connection.user = root
# 填入你的mysql密码
a1.sources.src-1.hibernate.connection.password = xxxxxxxx
a1.sources.src-1.hibernate.connection.autocommit = true
# mysql驱动
a1.sources.src-1.hibernate.dialect = org.hibernate.dialect.MySQL5Dialect
# 驱动版本过低会无法使用,驱动安装下文会提及
a1.sources.src-1.hibernate.connection.driver_class = com.mysql.jdbc.Driver
a1.sources.src-1.run.query.delay=5000
# 存放status文件
a1.sources.src-1.status.file.path = /opt/install/flume/status
a1.sources.src-1.status.file.name = sqlSource.status
# Custom query
a1.sources.src-1.start.from = 0
# 填写需要采集的数据表信息,你也可以使用下面的方法:
# agent.sources.sql-source.table =table_name
# agent.sources.sql-source.columns.to.select = *
a1.sources.src-1.custom.query = select `id`, `name` from fk
a1.sources.src-1.batch.size = 1000
a1.sources.src-1.max.rows = 1000
a1.sources.src-1.hibernate.connection.provider_class = org.hibernate.connection.C3P0ConnectionProvider
a1.sources.src-1.hibernate.c3p0.min_size=1
a1.sources.src-1.hibernate.c3p0.max_size=10

################################################################
a1.channels.ch-1.type = memory
a1.channels.ch-1.capacity = 10000
a1.channels.ch-1.transactiOnCapacity= 10000
a1.channels.ch-1.byteCapacityBufferPercentage = 20
a1.channels.ch-1.byteCapacity = 800000

################################################################
# 使用kafka
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
# 这个项目中你创建的或使用的topic名字
a1.sinks.k1.topic = testTopic
# kafka集群,broker列表,由于我没有使用集群所以只有一个
# 如果你搭建了集群,代码如下:agent.sinks.k1.brokerList = kafka-node1:9092,kafka-node2:9092,kafka-node3:9092
a1.sinks.k1.brokerList = 10.100.4.6:9092
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.batchSize = 20

添加mysql驱动到flume的lib目录下

wget https://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-5.1.35.tar.gz

tar xzf mysql-connector-java-5.1.35.tar.gz

cp mysql-connector-java-5.1.35-bin.jar /你flume的位置/lib/

启动zookeeper

启动kafka前要启动zookeeper

cd 到zookeeper的bin目录下

启动:

./zkServer.sh start

等待运行

./zkCli.sh

启动kafka

xshell中打开一个新窗口,cd到kafka目录下,启动kafka

bin/kafka-server-start.sh config/server.properties &

新建一个topic

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic testTopic

注1:testTopic就是你使用的topic名称,这个和上文mysql-flume.conf里的内容是对应的。

注2:可以使用bin/kafka-topics.sh --list --zookeeper localhost:2181来查看已创建的topic。

启动flume

xshell中打开一个新窗口,cd到flume目录下,启动flume

bin/flume-ng agent -n a1 -c conf -f conf/mysql-flume.conf -Dflume.root.logger=INFO,console

等待他运行,同时我们可以打开一个新窗口连接数据库,使用我们新建的test数据库和fk表。

实时采集数据

flume会实时采集数据到kafka中,我们可以启动一个kafak的消费监控,用于查看mysql的实时数据

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic testTopic --from-beginning

这时就可以查看数据了,kafka会打印mysql中的数据

然后我们更改数据库中的一条数据,新读取到的数据也会变更

before:

flume实时采集mysql数据到kafka中并输出

after:

flume实时采集mysql数据到kafka中并输出


推荐阅读
  •        在搭建Hadoop环境之前,请先阅读如下博文,把搭建Hadoop环境之前的准备工作做好,博文如下:       1、CentOS6.7下安装JDK,地址:http:b ... [详细]
  • 一、Hadoop来历Hadoop的思想来源于Google在做搜索引擎的时候出现一个很大的问题就是这么多网页我如何才能以最快的速度来搜索到,由于这个问题Google发明 ... [详细]
  • 如何实现织梦DedeCms全站伪静态
    本文介绍了如何通过修改织梦DedeCms源代码来实现全站伪静态,以提高管理和SEO效果。全站伪静态可以避免重复URL的问题,同时通过使用mod_rewrite伪静态模块和.htaccess正则表达式,可以更好地适应搜索引擎的需求。文章还提到了一些相关的技术和工具,如Ubuntu、qt编程、tomcat端口、爬虫、php request根目录等。 ... [详细]
  • 本文详细介绍了SQL日志收缩的方法,包括截断日志和删除不需要的旧日志记录。通过备份日志和使用DBCC SHRINKFILE命令可以实现日志的收缩。同时,还介绍了截断日志的原理和注意事项,包括不能截断事务日志的活动部分和MinLSN的确定方法。通过本文的方法,可以有效减小逻辑日志的大小,提高数据库的性能。 ... [详细]
  • 本文介绍了Python高级网络编程及TCP/IP协议簇的OSI七层模型。首先简单介绍了七层模型的各层及其封装解封装过程。然后讨论了程序开发中涉及到的网络通信内容,主要包括TCP协议、UDP协议和IPV4协议。最后还介绍了socket编程、聊天socket实现、远程执行命令、上传文件、socketserver及其源码分析等相关内容。 ... [详细]
  • Metasploit攻击渗透实践
    本文介绍了Metasploit攻击渗透实践的内容和要求,包括主动攻击、针对浏览器和客户端的攻击,以及成功应用辅助模块的实践过程。其中涉及使用Hydra在不知道密码的情况下攻击metsploit2靶机获取密码,以及攻击浏览器中的tomcat服务的具体步骤。同时还讲解了爆破密码的方法和设置攻击目标主机的相关参数。 ... [详细]
  • 本文由编程笔记小编整理,介绍了PHP中的MySQL函数库及其常用函数,包括mysql_connect、mysql_error、mysql_select_db、mysql_query、mysql_affected_row、mysql_close等。希望对读者有一定的参考价值。 ... [详细]
  • Oracle分析函数first_value()和last_value()的用法及原理
    本文介绍了Oracle分析函数first_value()和last_value()的用法和原理,以及在查询销售记录日期和部门中的应用。通过示例和解释,详细说明了first_value()和last_value()的功能和不同之处。同时,对于last_value()的结果出现不一样的情况进行了解释,并提供了理解last_value()默认统计范围的方法。该文对于使用Oracle分析函数的开发人员和数据库管理员具有参考价值。 ... [详细]
  • 本文详细介绍了在Centos7上部署安装zabbix5.0的步骤和注意事项,包括准备工作、获取所需的yum源、关闭防火墙和SELINUX等。提供了一步一步的操作指南,帮助读者顺利完成安装过程。 ... [详细]
  • 本文分享了一位Android开发者多年来对于Android开发所需掌握的技能的笔记,包括架构师基础、高级UI开源框架、Android Framework开发、性能优化、音视频精编源码解析、Flutter学习进阶、微信小程序开发以及百大框架源码解读等方面的知识。文章强调了技术栈和布局的重要性,鼓励开发者做好学习规划和技术布局,以提升自己的竞争力和市场价值。 ... [详细]
  • Hadoop2.6.0 + 云centos +伪分布式只谈部署
    3.0.3玩不好,现将2.6.0tar.gz上传到usr,chmod-Rhadoop:hadophadoop-2.6.0,rm掉3.0.32.在etcp ... [详细]
  • POCOCLibraies属于功能广泛、轻量级别的开源框架库,它拥有媲美Boost库的功能以及较小的体积广泛应用在物联网平台、工业自动化等领域。POCOCLibrai ... [详细]
  • mapreduce源码分析总结
    这篇文章总结的非常到位,故而转之一MapReduce概述MapReduce是一个用于大规模数据处理的分布式计算模型,它最初是由Google工程师设计并实现的ÿ ... [详细]
  • 1、概述首先和大家一起回顾一下Java消息服务,在我之前的博客《Java消息队列-JMS概述》中,我为大家分析了:然后在另一篇博客《Java消息队列-ActiveMq实战》中 ... [详细]
  • 伸缩性|发生_分布式文件系统设计,该从哪些方面考虑?
    篇首语:本文由编程笔记#小编为大家整理,主要介绍了分布式文件系统设计,该从哪些方面考虑?相关的知识,希望对你有一定的参考价值。点击上方关注“ ... [详细]
author-avatar
Fier田野莎莎
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有