热门标签 | HotTags
当前位置:  开发笔记 > 前端 > 正文

KSQL查询以简单聚合返回意外值

如何解决《KSQL查询以简单聚合返回意外值》经验,为你挑选了1个好方法。

我从针对由Kafka主题定义的KTable的KSQL查询中获得意外结果。KTABLE是“交易”,并且由压缩主题“ localhost.dbo.TradeHistory”支持。它应该包含由TradeId键控的股票交易的最新信息。主题的键是TradeId。每笔交易都有一个AccountId,我正在尝试构造一个查询以获取按帐户分组的交易金额的总和。

交易表的定义
ksql> create table Trades(TradeId int, AccountId int, Spn int, Amount double) with (KAFKA_TOPIC = 'localhost.dbo.TradeHistory', VALUE_FORMAT = 'JSON', KEY = 'TradeId');

...

ksql> describe extended Trades;

Name                 : TRADES
Type                 : TABLE
Key field            : TRADEID
Key format           : STRING
Timestamp field      : Not set - using 
Value format         : JSON
Kafka topic          : localhost.dbo.TradeHistory (partitions: 1, replication: 1)

Field     | Type
---------------------------------------
ROWTIME   | BIGINT           (system)
ROWKEY    | VARCHAR(STRING)  (system)
TRADEID   | INTEGER
ACCOUNTID | INTEGER
SPN       | INTEGER
AMOUNT    | DOUBLE
---------------------------------------

Local runtime statistics
------------------------
consumer-messages-per-sec:         0 consumer-total-bytes:      3709 consumer-total-messages:        39     last-message: 2019-10-12T20:52:16.552Z

(Statistics of the local KSQL server interaction with the Kafka topic localhost.dbo.TradeHistory)
localhost.dbo.TradeHistory主题的配置
/usr/bin/kafka-topics --zookeeper zookeeper:2181 --describe --topic localhost.dbo.TradeHistory
Topic:localhost.dbo.TradeHistory    PartitionCount:1    ReplicationFactor:1 Configs:min.cleanable.dirty.ratio=0.01,delete.retention.ms=100,cleanup.policy=compact,segment.ms=100
    Topic: localhost.dbo.TradeHistory   Partition: 0    Leader: 1   Replicas: 1 Isr: 1

在我的测试中,我正在使用TradeId 2将消息添加到localhost.dbo.TradeHistory主题,该消息只是更改了交易量。仅金额被更新;AccountId仍为1。

localhost.dbo.TradeHistory主题中的消息
/usr/bin/kafka-console-consumer --bootstrap-server broker:9092 --property print.key=true --topic localhost.dbo.TradeHistory --from-beginning

... (earlier values redacted) ...

2   {"TradeHistoryId":47,"TradeId":2,"AccountId":1,"Spn":1,"Amount":106.0,"__table":"TradeHistory"}
2   {"TradeHistoryId":48,"TradeId":2,"AccountId":1,"Spn":1,"Amount":107.0,"__table":"TradeHistory"}

上面的主题转储显示(帐户1中的)贸易额2从106.0更改为107.0。

KSQL查询
ksql> select AccountId, count(*) as Count, sum(Amount) as Total from Trades group by AccountId;
1 | 1 | 106.0
1 | 0 | 0.0
1 | 1 | 107.0

问题是,为什么每次我发布交易更新时上面显示的KSQL查询都返回一个“中间”值。如您所见,“计数”和“金额”字段显示0,0,然后KSQL查询立即将其“更正”为1,107.0。我对此行为感到有点困惑。

有人可以解释吗?

非常感谢。



1> Andrew Coate..:

感谢您的提问。我已经为我们的知识库添加了答案:https : //github.com/confluentinc/ksql/pull/3594/files。

当KSQL看到表中现有行的更新时,它将在内部发出CDC事件,该事件包含旧值和新值。聚合通过应用新值之前先取消旧值来处理此问题。

因此,在上面的示例中,当第二次插入发生时,KSQL首先撤消旧值。这导致COUNT下降1,而SUM下降旧值106.0,即下降到零。然后KSQL应用新行值,它看到COUNT1上升和SUM由新的价值上升107.0

默认情况下,将KSQL配置为在将结果刷新到Kafka之前最多缓冲2秒或10MB数据。这就是在此示例中插入值时可能会在输出上看到轻微延迟的原因。如果两个输出行都一起缓冲,那么KSQL将抑制第一个结果。这就是为什么您通常看不到中间行被输出的原因。配置commit.interval.mscache.max.bytes.buffering分别设置为2秒和10MB,可用于调整此行为。将这些设置之一设置为零将导致KSQL始终输出所有中间结果。

如果您每次都看到这些中间结果输出,则可能您已将这些设置之一或两者都设置为零。

我们有一个Github问题来增强KSQL以利用Kafka Stream的抑制功能,该功能将允许用户更多地控制如何实现结果。


推荐阅读
  • Netflix利用Druid实现高效实时数据分析
    本文探讨了全球领先的在线娱乐公司Netflix如何通过采用Apache Druid,实现了高效的数据采集、处理和实时分析,从而显著提升了用户体验和业务决策的准确性。文章详细介绍了Netflix在系统架构、数据摄取、管理和查询方面的实践,并展示了Druid在大规模数据处理中的卓越性能。 ... [详细]
  • window下kafka的安装以及测试
    目录一、安装JDK(需要安装依赖javaJDK)二、安装Kafka三、测试参考在Windows系统上安装消息队列kafka一、安装JDKÿ ... [详细]
  • Kafka Topic 数据管理与清理策略
    本文探讨了在生产环境中如何有效管理和定期清理Kafka Topic中的数据。介绍了基于时间、日志大小和日志起始偏移量三种清除方式,并重点讲解了基于时间的清除策略及其配置方法。 ... [详细]
  • 构建基于BERT的中文NL2SQL模型:一个简明的基准
    本文探讨了将自然语言转换为SQL语句(NL2SQL)的任务,这是人工智能领域中一项非常实用的研究方向。文章介绍了笔者在公司举办的首届中文NL2SQL挑战赛中的实践,该比赛提供了金融和通用领域的表格数据,并标注了对应的自然语言与SQL语句对,旨在训练准确的NL2SQL模型。 ... [详细]
  • 数据库内核开发入门 | 搭建研发环境的初步指南
    本课程将带你从零开始,逐步掌握数据库内核开发的基础知识和实践技能,重点介绍如何搭建OceanBase的开发环境。 ... [详细]
  • 本文详细介绍了如何在ECharts中使用线性渐变色,通过echarts.graphic.LinearGradient方法实现。文章不仅提供了完整的代码示例,还解释了各个参数的具体含义及其应用场景。 ... [详细]
  • Composer Registry Manager:PHP的源切换管理工具
    本文介绍了一个用于Composer的源切换管理工具——Composer Registry Manager。该项目旨在简化Composer包源的管理和切换,避免与常见的CRM系统混淆,并提供了详细的安装和使用指南。 ... [详细]
  • 本文详细介绍了Git分布式版本控制系统中远程仓库的概念和操作方法。通过具体案例,帮助读者更好地理解和掌握如何高效管理代码库。 ... [详细]
  • 本文详细介绍了Python编程语言的学习路径,涵盖基础语法、常用组件、开发工具、数据库管理、Web服务开发、大数据分析、人工智能、爬虫开发及办公自动化等多个方向。通过系统化的学习计划,帮助初学者快速掌握Python的核心技能。 ... [详细]
  • 探讨了小型企业在构建安全网络和软件时所面临的挑战和机遇。本文介绍了如何通过合理的方法和工具,确保小型企业能够有效提升其软件的安全性,从而保护客户数据并增强市场竞争力。 ... [详细]
  • 在本周的白板演练中,Apache Flink 的 PMC 成员及数据工匠首席技术官 Stephan Ewen 深入探讨了如何利用保存点功能进行流处理中的数据重新处理、错误修复、系统升级和 A/B 测试。本文将详细解释保存点的工作原理及其应用场景。 ... [详细]
  • 本文介绍如何在PostgreSQL数据库中正确插入和处理JSON数据类型,确保数据完整性和避免常见错误。 ... [详细]
  • 作为一名 Ember.js 新手,了解如何在路由和模型中正确加载 JSON 数据是至关重要的。本文将探讨两者之间的差异,并提供实用的建议。 ... [详细]
  • 本文详细介绍了在企业级项目中如何优化 Webpack 配置,特别是在 React 移动端项目中的最佳实践。涵盖资源压缩、代码分割、构建范围缩小、缓存机制以及性能优化等多个方面。 ... [详细]
  • 全面解析运维监控:白盒与黑盒监控及四大黄金指标
    本文深入探讨了白盒和黑盒监控的概念,以及它们在系统监控中的应用。通过详细分析基础监控和业务监控的不同采集方法,结合四个黄金指标的解读,帮助读者更好地理解和实施有效的监控策略。 ... [详细]
author-avatar
michael
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有