热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

flumemysql正则_利用Flume将MySQL表数据准实时抽取到HDFS

一、为什么要用到Flume在以前搭建HAWQ数据仓库实验环境时,我使用Sqoop抽取从MySQL数据库增量抽取数据到HDFS,然后用HAWQ的外部表进行

一、为什么要用到Flume

在以前搭建HAWQ数据仓库实验环境时,我使用Sqoop抽取从MySQL数据库增量抽取数据到HDFS,然后用HAWQ的外部表进行访问。这种方式只需要很少量的配置即可完成数据抽取任务,但缺点同样明显,那就是实时性。Sqoop使用MapReduce读写数据,而MapReduce是为了批处理场景设计的,目标是大吞吐量,并不太关心低延时问题。就像实验中所做的,每天定时增量抽取数据一次。

Flume是一个海量日志采集、聚合和传输的系统,支持在日志系统中定制各类数据发送方,用于收集数据。同时,Flume提供对数据进行简单处理,并写到各种数据接受方的能力。Flume以流方式处理数据,可作为代理持续运行。当新的数据可用时,Flume能够立即获取数据并输出至目标,这样就可以在很大程度上解决实时性问题。

Flume是最初只是一个日志收集器,但随着flume-ng-sql-source插件的出现,使得Flume从关系数据库采集数据成为可能。下面简单介绍Flume,并详细说明如何配置Flume将MySQL表数据准实时抽取到HDFS。

二、Flume简介

1. Flume的概念

Flume是分布式的日志收集系统,它将各个服务器中的数据收集起来并送到指定的地方去,比如说送到HDFS,简单来说flume就是收集日志的,其架构如图1所示。

e0c978d15c420c869e0e4865bc71587a.png

图1

2. Event的概念

在这里有必要先介绍一下Flume中event的相关概念:Flume的核心是把数据从数据源(source)收集过来,在将收集到的数据送到指定的目的地(sink)。为了保证输送的过程一定成功,在送到目的地(sink)之前,会先缓存数据(channel),待数据真正到达目的地(sink)后,Flume再删除自己缓存的数据。

在整个数据的传输的过程中,流动的是event,即事务保证是在event级别进行的。那么什么是event呢?Event将传输的数据进行封装,是Flume传输数据的基本单位,如果是文本文件,通常是一行记录。Event也是事务的基本单位。Event从source,流向channel,再到sink,本身为一个字节数组,并可携带headers(头信息)信息。Event代表着一个数据的最小完整单元,从外部数据源来,向外部的目的地去。

3. Flume架构介绍

Flume之所以这么神奇,是源于它自身的一个设计,这个设计就是agent。Agent本身是一个Java进程,运行在日志收集节点——所谓日志收集节点就是服务器节点。 Agent里面包含3个核心的组件:source、channel和sink,类似生产者、仓库、消费者的架构。

Source:source组件是专门用来收集数据的,可以处理各种类型、各种格式的日志数据,包括avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy、自定义。

Channel:source组件把数据收集来以后,临时存放在channel中,即channel组件在agent中是专门用来存放临时数据的——对采集到的数据进行简单的缓存,可以存放在memory、jdbc、file等等。

Sink:sink组件是用于把数据发送到目的地的组件,目的地包括hdfs、logger、avro、thrift、ipc、file、null、Hbase、solr、自定义。

4. Flume的运行机制

Flume的核心就是一个agent,这个agent对外有两个进行交互的地方,一个是接受数据输入的source,一个是数据输出的sink,sink负责将数据发送到外部指定的目的地。source接收到数据之后,将数据发送给channel,chanel作为一个数据缓冲区会临时存放这些数据,随后sink会将channel中的数据发送到指定的地方,例如HDFS等。注意:只有在sink将channel中的数据成功发送出去之后,channel才会将临时数据进行删除,这种机制保证了数据传输的可靠性与安全性。

三、安装Hadoop和Flume

我的实验在HDP 2.5.0上进行,HDP安装中包含Flume,只要配置Flume服务即可。HDP的安装步骤参见“HAWQ技术解析(二) —— 安装部署”

四、配置与测试

1. 建立MySQL数据库表

建立测试表并添加数据。

use test;

create table  wlslog

(id         int not null,

time_stamp varchar(40),

category   varchar(40),

type       varchar(40),

servername varchar(40),

code       varchar(40),

msg        varchar(40),

primary key ( id )

);

insert into wlslog(id,time_stamp,category,type,servername,code,msg) values(1,'apr-8-2014-7:06:16-pm-pdt','notice','weblogicserver','adminserver','bea-000365','server state changed to standby');

insert into wlslog(id,time_stamp,category,type,servername,code,msg) values(2,'apr-8-2014-7:06:17-pm-pdt','notice','weblogicserver','adminserver','bea-000365','server state changed to starting');

insert into wlslog(id,time_stamp,category,type,servername,code,msg) values(3,'apr-8-2014-7:06:18-pm-pdt','notice','weblogicserver','adminserver','bea-000365','server state changed to admin');

insert into wlslog(id,time_stamp,category,type,servername,code,msg) values(4,'apr-8-2014-7:06:19-pm-pdt','notice','weblogicserver','adminserver','bea-000365','server state changed to resuming');

insert into wlslog(id,time_stamp,category,type,servername,code,msg) values(5,'apr-8-2014-7:06:20-pm-pdt','notice','weblogicserver','adminserver','bea-000361','started weblogic adminserver');

insert into wlslog(id,time_stamp,category,type,servername,code,msg) values(6,'apr-8-2014-7:06:21-pm-pdt','notice','weblogicserver','adminserver','bea-000365','server state changed to running');

insert into wlslog(id,time_stamp,category,type,servername,code,msg) values(7,'apr-8-2014-7:06:22-pm-pdt','notice','weblogicserver','adminserver','bea-000360','server started in running mode');

commit;

2. 建立相关目录与文件

(1)创建本地状态文件

mkdir -p /var/lib/flume

cd /var/lib/flume

touch sql-source.status

chmod -R 777 /var/lib/flume

(2)建立HDFS目标目录

hdfs dfs -mkdir -p /flume/mysql

hdfs dfs -chmod -R 777 /flume/mysql

3. 准备JAR包

从http://book2s.com/java/jar/f/flume-ng-sql-source/download-flume-ng-sql-source-1.3.7.html下载flume-ng-sql-source-1.3.7.jar文件,并复制到Flume库目录。

cp flume-ng-sql-source-1.3.7.jar /usr/hdp/current/flume-server/lib/

将MySQL JDBC驱动JAR包也复制到Flume库目录。

cp mysql-connector-java-5.1.17.jar /usr/hdp/current/flume-server/lib/mysql-connector-java.jar

4. 建立HAWQ外部表

create external table ext_wlslog

(id         int,

time_stamp varchar(40),

category   varchar(40),

type       varchar(40),

servername varchar(40),

code       varchar(40),

msg        varchar(40)

) location ('pxf://mycluster/flume/mysql?profile=hdfstextmulti') format 'csv' (quote=e'"');

5. 配置Flume

在Ambari -> Flume -> Configs -> flume.conf中配置如下属性:

agent.channels.ch1.type = memory

agent.sources.sql-source.channels = ch1

agent.channels = ch1

agent.sinks = HDFS

agent.sources = sql-source

agent.sources.sql-source.type = org.keedio.flume.source.SQLSource

agent.sources.sql-source.connection.url = jdbc:mysql://172.16.1.127:3306/test

agent.sources.sql-source.user = root

agent.sources.sql-source.password = 123456

agent.sources.sql-source.table = wlslog

agent.sources.sql-source.columns.to.select = *

agent.sources.sql-source.incremental.column.name = id

agent.sources.sql-source.incremental.value = 0

agent.sources.sql-source.run.query.delay=5000

agent.sources.sql-source.status.file.path = /var/lib/flume

agent.sources.sql-source.status.file.name = sql-source.status

agent.sinks.HDFS.channel = ch1

agent.sinks.HDFS.type = hdfs

agent.sinks.HDFS.hdfs.path = hdfs://mycluster/flume/mysql

agent.sinks.HDFS.hdfs.fileType = DataStream

agent.sinks.HDFS.hdfs.writeFormat = Text

agent.sinks.HDFS.hdfs.rollSize = 268435456

agent.sinks.HDFS.hdfs.rollInterval = 0

agent.sinks.HDFS.hdfs.rollCount = 0

Flume在flume.conf文件中指定Source、Channel和Sink相关的配置,各属性描述如表1所示。

属性

描述

agent.channels.ch1.type

Agent的channel类型

agent.sources.sql-source.channels

Source对应的channel名称

agent.channels

Channel名称

agent.sinks

Sink名称

agent.sources

Source名称

agent.sources.sql-source.type

Source类型

agent.sources.sql-source.connection.url

数据库URL

agent.sources.sql-source.user

数据库用户名

agent.sources.sql-source.password

数据库密码

agent.sources.sql-source.table

数据库表名

agent.sources.sql-source.columns.to.select

查询的列

agent.sources.sql-source.incremental.column.name

增量列名

agent.sources.sql-source.incremental.value

增量初始值

agent.sources.sql-source.run.query.delay

发起查询的时间间隔,单位是毫秒

agent.sources.sql-source.status.file.path

状态文件路径

agent.sources.sql-source.status.file.name

状态文件名称

agent.sinks.HDFS.channel

Sink对应的channel名称

agent.sinks.HDFS.type

Sink类型

agent.sinks.HDFS.hdfs.path

Sink路径

agent.sinks.HDFS.hdfs.fileType

流数据的文件类型

agent.sinks.HDFS.hdfs.writeFormat

数据写入格式

agent.sinks.HDFS.hdfs.rollSize

目标文件轮转大小,单位是字节

agent.sinks.HDFS.hdfs.rollInterval

hdfs sink间隔多长将临时文件滚动成最终目标文件,单位是秒;如果设置成0,则表示不根据时间来滚动文件

agent.sinks.HDFS.hdfs.rollCount

当events数据达到该数量时候,将临时文件滚动成目标文件;如果设置成0,则表示不根据events数据来滚动文件

表1

6. 运行Flume代理

保存上一步的设置,然后重启Flume服务,如图2所示。

2b240c9593645f13c33234f14a05f3fb.png

图2

重启后,状态文件已经记录了将最新的id值7,如图3所示。

d5e813d00d6f6ebf5aad2569a0bc95a3.png

图3

查看目标路径,生成了一个临时文件,其中有7条记录,如图4所示。

5420277b577be18363c1b9ff65cdd82b.png

图4

查询HAWQ外部表,结果也有全部7条数据,如图5所示。

74fd2ac4e9c81f160636fc31a2e5c3b6.png

图5

至此,初始数据抽取已经完成。

7. 测试准实时增量抽取

在源表中新增id为8、9、10的三条记录。

use test;

insert into wlslog(id,time_stamp,category,type,servername,code,msg) values(8,'apr-8-2014-7:06:22-pm-pdt','notice','weblogicserver','adminserver','bea-000360','server started in running mode');

insert into wlslog(id,time_stamp,category,type,servername,code,msg) values(9,'apr-8-2014-7:06:22-pm-pdt','notice','weblogicserver','adminserver','bea-000360','server started in running mode');

insert into wlslog(id,time_stamp,category,type,servername,code,msg) values(10,'apr-8-2014-7:06:22-pm-pdt','notice','weblogicserver','adminserver','bea-000360','server started in running mode');

commit;

5秒之后查询HAWQ外部表,从图6可以看到,已经查询出全部10条数据,准实时增量抽取成功。

65d3aa6d6005a8ee1116077e1664ae3f.png

图6

五、方案优缺点

利用Flume采集关系数据库表数据最大的优点是配置简单,不用编程。相比tungsten-replicator的复杂性,Flume只要在flume.conf文件中配置source、channel及sink的相关属性,已经没什么难度了。而与现在很火的canal比较,虽然不够灵活,但毕竟一行代码也不用写。再有该方案采用普通SQL轮询的方式实现,具有通用性,适用于所有关系库数据源。

这种方案的缺点与其优点一样突出,主要体现在以下几方面。

在源库上执行了查询,具有入侵性。

通过轮询的方式实现增量,只能做到准实时,而且轮询间隔越短,对源库的影响越大。

只能识别新增数据,检测不到删除与更新。

要求源库必须有用于表示增量的字段。

即便有诸多局限,但用Flume抽取关系库数据的方案还是有一定的价值,特别是在要求快速部署、简化编程,又能满足需求的应用场景,对传统的Sqoop方式也不失为一种有效的补充。

参考:

Flume架构以及应用介绍

Streaming MySQL Database Table Data to HDFS with Flume

how to read data from oracle using FLUME to kafka broker

https://github.com/keedio/flume-ng-sql-source



推荐阅读
  • 网站访问全流程解析
    本文详细介绍了从用户在浏览器中输入一个域名(如www.yy.com)到页面完全展示的整个过程,包括DNS解析、TCP连接、请求响应等多个步骤。 ... [详细]
  • 秒建一个后台管理系统?用这5个开源免费的Java项目就够了
    秒建一个后台管理系统?用这5个开源免费的Java项目就够了 ... [详细]
  • 该大学网站采用PHP和MySQL技术,在校内可免费访问某些外部收费资料数据库。为了方便学生校外访问,建议通过学校账号登录实现免费访问。具体方案可包括利用学校服务器作为代理,结合身份验证机制,确保合法用户在校外也能享受免费资源。 ... [详细]
  • 本文深入探讨了NoSQL数据库的四大主要类型:键值对存储、文档存储、列式存储和图数据库。NoSQL(Not Only SQL)是指一系列非关系型数据库系统,它们不依赖于固定模式的数据存储方式,能够灵活处理大规模、高并发的数据需求。键值对存储适用于简单的数据结构;文档存储支持复杂的数据对象;列式存储优化了大数据量的读写性能;而图数据库则擅长处理复杂的关系网络。每种类型的NoSQL数据库都有其独特的优势和应用场景,本文将详细分析它们的特点及应用实例。 ... [详细]
  • 技术分享:使用 Flask、AngularJS 和 Jinja2 构建高效前后端交互系统
    技术分享:使用 Flask、AngularJS 和 Jinja2 构建高效前后端交互系统 ... [详细]
  • 本文介绍了如何利用Struts1框架构建一个简易的四则运算计算器。通过采用DispatchAction来处理不同类型的计算请求,并使用动态Form来优化开发流程,确保代码的简洁性和可维护性。同时,系统提供了用户友好的错误提示,以增强用户体验。 ... [详细]
  • Spark与HBase结合处理大规模流量数据结构设计
    本文将详细介绍如何利用Spark和HBase进行大规模流量数据的分析与处理,包括数据结构的设计和优化方法。 ... [详细]
  • 在软件开发过程中,经常需要将多个项目或模块进行集成和调试,尤其是当项目依赖于第三方开源库(如Cordova、CocoaPods)时。本文介绍了如何在Xcode中高效地进行多项目联合调试,分享了一些实用的技巧和最佳实践,帮助开发者解决常见的调试难题,提高开发效率。 ... [详细]
  • 浏览器作为我们日常不可或缺的软件工具,其背后的运作机制却鲜为人知。本文将深入探讨浏览器内核及其版本的演变历程,帮助读者更好地理解这一关键技术组件,揭示其内部运作的奥秘。 ... [详细]
  • 如何将TS文件转换为M3U8直播流:HLS与M3U8格式详解
    在视频传输领域,MP4虽然常见,但在直播场景中直接使用MP4格式存在诸多问题。例如,MP4文件的头部信息(如ftyp、moov)较大,导致初始加载时间较长,影响用户体验。相比之下,HLS(HTTP Live Streaming)协议及其M3U8格式更具优势。HLS通过将视频切分成多个小片段,并生成一个M3U8播放列表文件,实现低延迟和高稳定性。本文详细介绍了如何将TS文件转换为M3U8直播流,包括技术原理和具体操作步骤,帮助读者更好地理解和应用这一技术。 ... [详细]
  • 阿里巴巴终面技术挑战:如何利用 UDP 实现 TCP 功能?
    在阿里巴巴的技术面试中,技术总监曾提出一道关于如何利用 UDP 实现 TCP 功能的问题。当时回答得不够理想,因此事后进行了详细总结。通过与总监的进一步交流,了解到这是一道常见的阿里面试题。面试官的主要目的是考察应聘者对 UDP 和 TCP 在原理上的差异的理解,以及如何通过 UDP 实现类似 TCP 的可靠传输机制。 ... [详细]
  • Hyperledger Fabric 1.4 节点 SDK 快速入门指南
    本文将详细介绍如何利用 Hyperledger Fabric 1.4 的 Node.js SDK 开发应用程序。通过最新版本的 Fabric Node.js SDK,开发者可以更高效地构建和部署基于区块链的应用,实现数据的安全共享和交易处理。文章将涵盖环境配置、SDK 安装、示例代码以及常见问题的解决方法,帮助读者快速上手并掌握核心功能。 ... [详细]
  • B站服务器故障影响豆瓣评分?别担心,阿里巴巴架构师分享预防策略与技术方案
    13日晚上,在视频观看高峰时段,B站出现了服务器故障,引发网友在各大平台上的广泛吐槽。这一事件导致了连锁反应,大量用户纷纷涌入A站、豆瓣和晋江等平台,给这些网站带来了突如其来的流量压力。为了防止类似问题的发生,阿里巴巴架构师分享了一系列预防策略和技术方案,包括负载均衡、弹性伸缩和容灾备份等措施,以确保系统的稳定性和可靠性。 ... [详细]
  • 在《Linux高性能服务器编程》一书中,第3.2节深入探讨了TCP报头的结构与功能。TCP报头是每个TCP数据段中不可或缺的部分,它不仅包含了源端口和目的端口的信息,还负责管理TCP连接的状态和控制。本节内容详尽地解析了TCP报头的各项字段及其作用,为读者提供了深入理解TCP协议的基础。 ... [详细]
  • 如何优化MySQL数据库性能以提升查询效率和系统稳定性 ... [详细]
author-avatar
塘迅人要更名_544
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有