我从针对由Kafka主题定义的KTable的KSQL查询中获得意外结果。KTABLE是“交易”,并且由压缩主题“ localhost.dbo.TradeHistory”支持。它应该包含由TradeId键控的股票交易的最新信息。主题的键是TradeId。每笔交易都有一个AccountId,我正在尝试构造一个查询以获取按帐户分组的交易金额的总和。
交易表的定义ksql> create table Trades(TradeId int, AccountId int, Spn int, Amount double) with (KAFKA_TOPIC = 'localhost.dbo.TradeHistory', VALUE_FORMAT = 'JSON', KEY = 'TradeId'); ... ksql> describe extended Trades; Name : TRADES Type : TABLE Key field : TRADEID Key format : STRING Timestamp field : Not set - usinglocalhost.dbo.TradeHistory主题的配置Value format : JSON Kafka topic : localhost.dbo.TradeHistory (partitions: 1, replication: 1) Field | Type --------------------------------------- ROWTIME | BIGINT (system) ROWKEY | VARCHAR(STRING) (system) TRADEID | INTEGER ACCOUNTID | INTEGER SPN | INTEGER AMOUNT | DOUBLE --------------------------------------- Local runtime statistics ------------------------ consumer-messages-per-sec: 0 consumer-total-bytes: 3709 consumer-total-messages: 39 last-message: 2019-10-12T20:52:16.552Z (Statistics of the local KSQL server interaction with the Kafka topic localhost.dbo.TradeHistory)
/usr/bin/kafka-topics --zookeeper zookeeper:2181 --describe --topic localhost.dbo.TradeHistory Topic:localhost.dbo.TradeHistory PartitionCount:1 ReplicationFactor:1 Configs:min.cleanable.dirty.ratio=0.01,delete.retention.ms=100,cleanup.policy=compact,segment.ms=100 Topic: localhost.dbo.TradeHistory Partition: 0 Leader: 1 Replicas: 1 Isr: 1
在我的测试中,我正在使用TradeId 2将消息添加到localhost.dbo.TradeHistory主题,该消息只是更改了交易量。仅金额被更新;AccountId仍为1。
localhost.dbo.TradeHistory主题中的消息/usr/bin/kafka-console-consumer --bootstrap-server broker:9092 --property print.key=true --topic localhost.dbo.TradeHistory --from-beginning ... (earlier values redacted) ... 2 {"TradeHistoryId":47,"TradeId":2,"AccountId":1,"Spn":1,"Amount":106.0,"__table":"TradeHistory"} 2 {"TradeHistoryId":48,"TradeId":2,"AccountId":1,"Spn":1,"Amount":107.0,"__table":"TradeHistory"}
上面的主题转储显示(帐户1中的)贸易额2从106.0更改为107.0。
KSQL查询ksql> select AccountId, count(*) as Count, sum(Amount) as Total from Trades group by AccountId; 1 | 1 | 106.0 1 | 0 | 0.0 1 | 1 | 107.0
问题是,为什么每次我发布交易更新时上面显示的KSQL查询都返回一个“中间”值。如您所见,“计数”和“金额”字段显示0,0,然后KSQL查询立即将其“更正”为1,107.0。我对此行为感到有点困惑。
有人可以解释吗?
非常感谢。
感谢您的提问。我已经为我们的知识库添加了答案:https : //github.com/confluentinc/ksql/pull/3594/files。
当KSQL看到表中现有行的更新时,它将在内部发出CDC事件,该事件包含旧值和新值。聚合通过应用新值之前先取消旧值来处理此问题。
因此,在上面的示例中,当第二次插入发生时,KSQL首先撤消旧值。这导致COUNT
下降1,而SUM
下降旧值106.0
,即下降到零。然后KSQL应用新行值,它看到COUNT
1上升和SUM
由新的价值上升107.0
。
默认情况下,将KSQL配置为在将结果刷新到Kafka之前最多缓冲2秒或10MB数据。这就是在此示例中插入值时可能会在输出上看到轻微延迟的原因。如果两个输出行都一起缓冲,那么KSQL将抑制第一个结果。这就是为什么您通常看不到中间行被输出的原因。配置commit.interval.ms
和cache.max.bytes.buffering
分别设置为2秒和10MB,可用于调整此行为。将这些设置之一设置为零将导致KSQL始终输出所有中间结果。
如果您每次都看到这些中间结果输出,则可能您已将这些设置之一或两者都设置为零。
我们有一个Github问题来增强KSQL以利用Kafka Stream的抑制功能,该功能将允许用户更多地控制如何实现结果。