热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

日志处理流程:Flume+MapReduce+Hive+Sqoop+MySQL

本文介绍了如何使用Flume从Linux文件系统收集日志并存储到HDFS,然后通过MapReduce清洗数据,使用Hive进行数据分析,并最终通过Sqoop将结果导出到MySQL数据库。

(1)使用Flume-ng的spooldir类型的source来监控Linux文件系统上的一个目录,并使用hdfs类型的sink将日志数据传输到HDFS。以下是Flume-ng agent的配置文件a4.conf的内容:

# 定义agent名,source、channel、sink的名称
a4.sources = r1
a4.channels = c1
a4.sinks = k1

# 具体定义source
a4.sources.r1.type = spooldir
a4.sources.r1.spoolDir = /root/Documents/logs

# 具体定义channel
a4.channels.c1.type = memory
a4.channels.c1.capacity = 10000
a4.channels.c1.transactiOnCapacity= 100

# 定义拦截器,为消息添加时间戳
a4.sources.r1.interceptors = i1
a4.sources.r1.interceptors.i1.type = org.apache.flume.interceptor.TimestampInterceptor$Builder

# 具体定义sink
a4.sinks.k1.type = hdfs
a4.sinks.k1.hdfs.path = hdfs://hadoop:9000/flume/%Y%m%d
a4.sinks.k1.hdfs.filePrefix = events-
a4.sinks.k1.hdfs.fileType = DataStream
# 不按照条数生成文件
a4.sinks.k1.hdfs.rollCount = 0
# HDFS上的文件达到128M时生成一个文件
a4.sinks.k1.hdfs.rollSize = 134217728
# HDFS上的文件达到60秒生成一个文件
a4.sinks.k1.hdfs.rollInterval = 60

# 组装source、channel、sink
a4.sources.r1.channels = c1
a4.sinks.k1.channel = c1

在Flume的根目录下,使用以下命令启动Flume监听: bin/flume-ng agent -n a4 -c conf -f conf/a4.conf -Dflume.root.logger=INFO,console。一旦目标目录下新增内容,这些内容将被自动收集到HDFS的指定目录。

(2)使用MapReduce、Hive、Sqoop和MySQL进行数据处理。首先,创建一个Hive外部分区表bbslog,指向清洗后的数据目录bbslog_cleaned。接着,使用MapReduce程序清洗位于/flume目录下的原始日志数据,并将结果保存到/bbslog_cleaned目录。然后,告知bbslog表新的分区,分析清洗后的日志数据,计算PV、UV、新注册用户数和VIP用户,并将结果分别保存到四个Hive表中。最后,使用Sqoop将这四个Hive表中的数据导出到MySQL的bbslog_out库中的相应表中。以下是执行脚本daily.sh的内容:

CURRENT=`date +%Y%m%d`

# 使用MapReduce程序清洗日志数据
/usr/hadoop/hadoop-2.2.0/bin/hadoop jar /root/Documents/cleaner.jar /flume/$CURRENT /bbslog_cleaned/$CURRENT

# 为Hive外部分区表bbslog添加分区
/usr/hive/apache-hive-0.13.0-bin/bin/hive -e "alter table bbslog add partition(logdate=$CURRENT) location '/bbslog_cleaned/$CURRENT'"

# 计算当天的PV
/usr/hive/apache-hive-0.13.0-bin/bin/hive -e "create table pv_$CURRENT row format delimited fields terminated by '\t' as select $CURRENT, count(*) from bbslog where logdate=$CURRENT"

# 计算当天的UV
/usr/hive/apache-hive-0.13.0-bin/bin/hive -e "create table uv_$CURRENT row format delimited fields terminated by '\t' as select $CURRENT, count(distinct ip) from bbslog where logdate=$CURRENT"

# 计算当天的新注册用户数
/usr/hive/apache-hive-0.13.0-bin/bin/hive -e "create table newregister_$CURRENT row format delimited fields terminated by '\t' as select $CURRENT, count(*) from bbslog where logdate=$CURRENT and instr(url, 'member.php?mod=register')>0"

# 计算当天的VIP用户
/usr/hive/apache-hive-0.13.0-bin/bin/hive -e "create table vip_$CURRENT row format delimited fields terminated by '\t' as select $CURRENT, ip, count(*) as hits from bbslog where logdate=$CURRENT group by ip having hits > 20 order by hits desc limit 10"

# 将pv_$CURRENT表中的数据导出到MySQL的pv表
/usr/sqoop/sqoop-1.4.4.bin__hadoop-2.0.4-alpha/bin/sqoop export --connect jdbc:mysql://192.168.1.10:3306/bbslog_out --username root --password 123 --export-dir "/user/hive/warehouse/pv_$CURRENT" --table pv --fields-terminated-by '\t'

# 将uv_$CURRENT表中的数据导出到MySQL的uv表
/usr/sqoop/sqoop-1.4.4.bin__hadoop-2.0.4-alpha/bin/sqoop export --connect jdbc:mysql://192.168.1.10:3306/bbslog_out --username root --password 123 --export-dir "/user/hive/warehouse/uv_$CURRENT" --table uv --fields-terminated-by '\t'

# 将newregister_$CURRENT表中的数据导出到MySQL的新注册用户表
/usr/sqoop/sqoop-1.4.4.bin__hadoop-2.0.4-alpha/bin/sqoop export --connect jdbc:mysql://192.168.1.10:3306/bbslog_out --username root --password 123 --export-dir "/user/hive/warehouse/newregister_$CURRENT" --table newRegister --fields-terminated-by '\t'

# 将vip_$CURRENT表中的数据导出到MySQL的vip表
/usr/sqoop/sqoop-1.4.4.bin__hadoop-2.0.4-alpha/bin/sqoop export --connect jdbc:mysql://192.168.1.10:3306/bbslog_out --username root --password 123 -m 1 --export-dir "/user/hive/warehouse/vip_$CURRENT" --table vip --fields-terminated-by '\t'

推荐阅读
  • 本文探讨了使用Python实现监控信息收集的方法,涵盖从基础的日志记录到复杂的系统运维解决方案,旨在帮助开发者和运维人员提升工作效率。 ... [详细]
  • 本文介绍如何通过整合SparkSQL与Hive来构建高效的用户画像环境,提高数据处理速度和查询效率。 ... [详细]
  • 从0到1搭建大数据平台
    从0到1搭建大数据平台 ... [详细]
  • 简化报表生成:EasyReport工具的全面解析
    本文详细介绍了EasyReport,一个易于使用的开源Web报表工具。该工具支持Hadoop、HBase及多种关系型数据库,能够将SQL查询结果转换为HTML表格,并提供Excel导出、图表显示和表头冻结等功能。 ... [详细]
  • databasesync适配openGauss使用指导书
    一、database-sync简介database-sync作为一种开源辅助工具,用于数据库之间的表同步,更确切的说法是复制,可以从一个数据库复制表到另一个数据库该工具支持的功能如 ... [详细]
  • 深入浅出:Hadoop架构详解
    Hadoop作为大数据处理的核心技术,包含了一系列组件如HDFS(分布式文件系统)、YARN(资源管理框架)和MapReduce(并行计算模型)。本文将通过实例解析Hadoop的工作原理及其优势。 ... [详细]
  • 本文介绍了在解决Hive表中复杂数据结构平铺化问题后,如何通过创建视图来准确计算广告日志的曝光PV,特别是针对用户对应多个标签的情况。同时,详细探讨了UDF的使用方法及其在实际项目中的应用。 ... [详细]
  • Hadoop MapReduce 实战案例:手机流量使用统计分析
    本文通过一个具体的Hadoop MapReduce案例,详细介绍了如何利用MapReduce框架来统计和分析手机用户的流量使用情况,包括上行和下行流量的计算以及总流量的汇总。 ... [详细]
  • 大数据领域的职业路径与角色解析
    本文将深入探讨大数据领域的各种职业和工作角色,帮助读者全面了解大数据行业的需求、市场趋势,以及从入门到高级专业人士的职业发展路径。文章还将详细介绍不同公司对大数据人才的需求,并解析各岗位的具体职责、所需技能和经验。 ... [详细]
  • Presto:高效即席查询引擎的深度解析与应用
    本文深入解析了Presto这一高效的即席查询引擎,详细探讨了其架构设计及其优缺点。Presto通过内存到内存的数据处理方式,显著提升了查询性能,相比传统的MapReduce查询,不仅减少了数据传输的延迟,还提高了查询的准确性和效率。然而,Presto在大规模数据处理和容错机制方面仍存在一定的局限性。本文还介绍了Presto在实际应用中的多种场景,展示了其在大数据分析领域的强大潜力。 ... [详细]
  • 本文探讨了在渗透测试中信息收集阶段使用的几种端口扫描技术,包括nmap、masscan、socket、telnet及nc等工具的应用与比较。 ... [详细]
  • 本文介绍了Hive作为基于Hadoop的数据仓库工具的核心概念,包括其基本功能、使用理由、特点以及与Hadoop的关系。同时,文章还探讨了Hive相较于传统关系型数据库的不同之处,并展望了Hive的发展前景。 ... [详细]
  • 初探Hadoop:第一章概览
    本文深入探讨了《Hadoop》第一章的内容,重点介绍了Hadoop的基本概念及其如何解决大数据处理中的关键挑战。 ... [详细]
  • HBase 数据复制与灾备同步策略
    本文探讨了HBase在企业级应用中的数据复制与灾备同步解决方案,包括存量数据迁移及增量数据实时同步的方法。 ... [详细]
  • 流处理中的计数挑战与解决方案
    本文探讨了在流处理中进行计数的各种技术和挑战,并基于作者在2016年圣何塞举行的Hadoop World大会上的演讲进行了深入分析。文章不仅介绍了传统批处理和Lambda架构的局限性,还详细探讨了流处理架构的优势及其在现代大数据应用中的重要作用。 ... [详细]
author-avatar
手机用户2502940417_253
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有