热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

用logstash作数据的聚合统计

用logstash作数据的聚合统计以spark-streaming处理消费数据,统计日志经sparksql存储在mysql中日志写入方式为appendvalwordsDataFram

用logstash 作数据的聚合统计

以spark-streaming 处理消费数据,统计日志经spark sql存储在mysql中

日志写入方式为append
val wordsDataFrame = rdd.toDF("supplier", "type", "domain", "pdate", "count", "idate")
wordsDataFrame
.write
.format("jdbc")
.mode("append").options(mysql_conf)
.save()
因为spark-streaming是批处理服务,这些日志只是中间结果,会有大量小批次的统计信息
表名statistics_temp
id(自增id) domain idate count
1 www.baidu.com 2018-06-13 1
2 www.baidu.com 2018-06-13 2
3 www.taobao.com 2018-06-13 3

求每日每domain的数据还需再一次作聚合

原本的构想是通过logstash将日志导入es,elk方案,kibana直接划分时间间隔聚合出结果展示

现在有个需求是要存入每日的量级入一张表,直接在mysql展示

不考虑从kibana聚合再写入es,statistics_temp数据量较大,每次查询都从statistics_temp用聚合显然不可行

因此需要由这张临时表,导出完真正的有效表statistics,这种需求很常见,通常是记录时间点,定时执行,读取时间点后的数据,统计再upsert

导出表名statistics(domain+idate 配置联合唯一索引,DUPLICATE才生效)
id(自增id) domain idate count
1 www.baidu.com 2018-06-13 3
3 www.taobao.com 2018-06-13 3

最近作数据流程设计搭建,不想写这种代码,因为对logstash比较熟悉,因此便想试试logstash的方案,最终实验可行

环境
logstash 5.5
mysql 5.7

聚合 需要官方插件
https://www.elastic.co/guide/en/logstash/current/plugins-filters-aggregate.html#plugins-filters-aggregate-example2
写入 mysql需要第三方插件
https://github.com/theangryangel/logstash-output-jdbc

1安装插件
logstash5.5 默认不含logstash-filters-aggregate 需单独安装,其他版本未知
logstash-plugins install logstash-filters-aggregate
第三方插件,默认不含,需单独安装
logstash-plugins install logstash-output-jdbc

2执行
由于jdbc_fetch_size参数不生效,因此会拿出大量数据(mysql-connector-java-5.1.44-bin.jar,默认的数据量)需要调整堆大小,避免oom

export LS_JAVA_OPTS="-Xms4g -Xmx8g"
logstash -f /logstash/statistic.conf

重点是/logstash/statistic.conf的内容

input {
jdbc {
jdbc_driver_library => "/logstash/mysql-connector-java-5.1.44-bin.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://host:3306/db1"
jdbc_user => "root"
jdbc_password => "root"
schedule => "* * * * *"
use_column_value => true
tracking_column => "id"
jdbc_fetch_size => 2000
tracking_column_type => "numeric"
statement => "SELECT * from statistics WHERE id > :sql_last_value"
clean_run => false
record_last_run => true
last_run_metadata_path => "/home/logstash/.statistic"
}
}

filter {
aggregate {
task_id => "%{domain}_%{idate}"
code => "
map['domain'] = event.get('domain')
map['idate'] = event.get('idate')
map['count'] ||= 0
map['count'] += event.get('count')
event.cancel()
"
push_previous_map_as_event => true
}
}

output {
stdout{
codec=>rubydebug{}
}
jdbc {
driver_jar_path => "/logstash/mysql-connector-java-5.1.44-bin.jar"
driver_class => "com.mysql.jdbc.Driver"
connection_string => "jdbc:mysql://host:3306/db1?user=root&password=root"
statement => [ "INSERT INTO `bbs`.`statistics2` (`domain`,`idate`,`count`) VALUES(?,?,?) ON DUPLICATE KEY UPDATE `count` = `count` + ?;", "domain","idate","count","count" ]
}
}


tracking_column => "id"
tracking_column_type => "numeric"
statement => "SELECT * from statistics WHERE id > :sql_last_value"
last_run_metadata_path => "/home/logstash/.statistic"

以上四个参数,配置记录点的信息

该例子为自增id numeric类型
记录文件示例
bash-4.3# cat /home/logstash/.statistic
--- 127986843

若是时间类型
tracking_column => "updated_on"
tracking_column_type => "timestamp"
statement => "SELECT * from user WHERE updated_on > :sql_last_value"
last_run_metadata_path => "/home/logstash/.statistic"

记录文件示例
bash-4.3# cat /home/logstash/.statistic
--- 2018-06-13 10:57:59.000000000 +00:00

可以通过,停logstash服务,改记录文件,再启动logstash服务,来重新聚合统计某记录点后的数据。


示例
count 对比
statistics_temp:
mysql> select count(count),idate from statistics_temp where domain='www.abc.com' and idate>'2018-06-15' group by idate;
+--------------+------------+
| count(count) | idate |
+--------------+------------+
| 15290 | 2018-06-16 |
| 27176 | 2018-06-17 |
| 18997 | 2018-06-18 |
| 21785 | 2018-06-19 |
+--------------+------------+
statistics:
mysql> select count(count),idate from statistics where domain='www.abc.com' and idate>'2018-06-15' group by idate;
+--------------+------------+
| count(count) | idate |
+--------------+------------+
| 532 | 2018-06-16 |
| 533 | 2018-06-17 |
| 534 | 2018-06-18 |
| 535 | 2018-06-19 |
+--------------+------------+

sum 对比
statistics_temp
mysql> select sum(count),idate from statistics_temp where domain='www.abc.com' and idate>'2018-06-15' group by idate;
+------------+------------+
| sum(count) | idate |
+------------+------------+
| 390499 | 2018-06-16 |
| 705807 | 2018-06-17 |
| 462147 | 2018-06-18 |
| 600657 | 2018-06-19 |
+------------+------------+
statistics
mysql> select sum(count),idate from statistics where domain='www.abc.com' and idate>'2018-06-15' group by idate;
+------------+------------+
| sum(count) | idate |
+------------+------------+
| 390499 | 2018-06-16 |
| 705807 | 2018-06-17 |
| 462147 | 2018-06-18 |
| 600657 | 2018-06-19 |
+------------+------------+

相比statistics_temp,statistics count减少,sum一致,聚合成功


推荐阅读
  • MySQL初级篇——字符串、日期时间、流程控制函数的相关应用
    文章目录:1.字符串函数2.日期时间函数2.1获取日期时间2.2日期与时间戳的转换2.3获取年月日、时分秒、星期数、天数等函数2.4时间和秒钟的转换2. ... [详细]
  • 本文介绍了如何使用Flume从Linux文件系统收集日志并存储到HDFS,然后通过MapReduce清洗数据,使用Hive进行数据分析,并最终通过Sqoop将结果导出到MySQL数据库。 ... [详细]
  • MicrosoftDeploymentToolkit2010部署培训实验手册V1.0目录实验环境说明3实验环境虚拟机使用信息3注意:4实验手册正文说 ... [详细]
  • 本文介绍如何使用 Python 的 DOM 和 SAX 方法解析 XML 文件,并通过示例展示了如何动态创建数据库表和处理大量数据的实时插入。 ... [详细]
  • 从0到1搭建大数据平台
    从0到1搭建大数据平台 ... [详细]
  • 本文讨论了在进行 MySQL 数据迁移过程中遇到的所有 .frm 文件报错的问题,并提供了详细的解决方案和建议。 ... [详细]
  • 本文详细介绍了MySQL数据库的基础语法与核心操作,涵盖从基础概念到具体应用的多个方面。首先,文章从基础知识入手,逐步深入到创建和修改数据表的操作。接着,详细讲解了如何进行数据的插入、更新与删除。在查询部分,不仅介绍了DISTINCT和LIMIT的使用方法,还探讨了排序、过滤和通配符的应用。此外,文章还涵盖了计算字段以及多种函数的使用,包括文本处理、日期和时间处理及数值处理等。通过这些内容,读者可以全面掌握MySQL数据库的核心操作技巧。 ... [详细]
  • 本文详细介绍了 InfluxDB、collectd 和 Grafana 的安装与配置流程。首先,按照启动顺序依次安装并配置 InfluxDB、collectd 和 Grafana。InfluxDB 作为时序数据库,用于存储时间序列数据;collectd 负责数据的采集与传输;Grafana 则用于数据的可视化展示。文中提供了 collectd 的官方文档链接,便于用户参考和进一步了解其配置选项。通过本指南,读者可以轻松搭建一个高效的数据监控系统。 ... [详细]
  • 在CentOS 7环境中安装配置Redis及使用Redis Desktop Manager连接时的注意事项与技巧
    在 CentOS 7 环境中安装和配置 Redis 时,需要注意一些关键步骤和最佳实践。本文详细介绍了从安装 Redis 到配置其基本参数的全过程,并提供了使用 Redis Desktop Manager 连接 Redis 服务器的技巧和注意事项。此外,还探讨了如何优化性能和确保数据安全,帮助用户在生产环境中高效地管理和使用 Redis。 ... [详细]
  • Unity与MySQL连接过程中出现的新挑战及解决方案探析 ... [详细]
  • 在对WordPress Duplicator插件0.4.4版本的安全评估中,发现其存在跨站脚本(XSS)攻击漏洞。此漏洞可能被利用进行恶意操作,建议用户及时更新至最新版本以确保系统安全。测试方法仅限于安全研究和教学目的,使用时需自行承担风险。漏洞编号:HTB23162。 ... [详细]
  • 一个建表一个执行crud操作建表代码importandroid.content.Context;importandroid.database.sqlite.SQLiteDat ... [详细]
  • Spring Data JdbcTemplate 入门指南
    本文将介绍如何使用 Spring JdbcTemplate 进行数据库操作,包括查询和插入数据。我们将通过一个学生表的示例来演示具体步骤。 ... [详细]
  • PTArchiver工作原理详解与应用分析
    PTArchiver工作原理及其应用分析本文详细解析了PTArchiver的工作机制,探讨了其在数据归档和管理中的应用。PTArchiver通过高效的压缩算法和灵活的存储策略,实现了对大规模数据的高效管理和长期保存。文章还介绍了其在企业级数据备份、历史数据迁移等场景中的实际应用案例,为用户提供了实用的操作建议和技术支持。 ... [详细]
  • 基于Net Core 3.0与Web API的前后端分离开发:Vue.js在前端的应用
    本文介绍了如何使用Net Core 3.0和Web API进行前后端分离开发,并重点探讨了Vue.js在前端的应用。后端采用MySQL数据库和EF Core框架进行数据操作,开发环境为Windows 10和Visual Studio 2019,MySQL服务器版本为8.0.16。文章详细描述了API项目的创建过程、启动步骤以及必要的插件安装,为开发者提供了一套完整的开发指南。 ... [详细]
author-avatar
歪果仁
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有