作者:我爱麦兜李 | 来源:互联网 | 2024-12-26 18:17
最近,我在构建一个使用20个流线程的Kafka Streams应用程序时遇到了一些问题。该应用旨在计算固定时间间隔内不同用户的消费金额。然而,在从本地商店查询用户消费记录时,发现结果少于实际花费。尽管查阅了官方文档和其他资料,仍未找到满意的解决方案。
我使用的Kafka版本为0.11.0.3,服务器和流API均为同一版本。应用程序配置如下:
关键配置信息:
- 复制因子:3
- 流线程数:20
- 提交间隔:1000ms
- 分区分配策略:StickyAssignor
- 最大等待时间:500ms
- 最大轮询记录数:5000
- 最长轮询间隔:300秒
- 心跳间隔:3秒
- 会话超时:30秒
- 自动偏移重置:最新
Kafka消息结构:
Kafka流构建代码:
KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream peopleSpendStream = kStreamBuilder.stream(topic);
peopleSpendStream.groupByKey()
.aggregate(() -> new HashMap(8192), (key, value, aggregate) -> {
aggregate.merge(key, value, Double::sum);
return aggregate;
}, TimeWindows.of(ONE_MINUTE).until(ONE_HOUR * 10), // 1分钟窗口,保留9小时
new HashMapSerde<>(), // 实际上使用Jackson进行序列化和反序列化
PEOPLE_SPEND_STORE_NAME);
查询代码:
long currentTime = System.currentTimeMillis();
for (String name : names) { // 按用户名查询
try (WindowStoreIterator> iterator = store.fetch(name, currentTime - TEN_MINUTES_MS, currentTime)) {
iterator.forEachRemaining(kv -> log.info("name = {}, time = {}, cost = {}", name, kv.key, kv.value));
}
}
在分析过程中,我发现以下几点可能是导致问题的原因:
- 配置参数可能需要进一步优化,例如调整提交间隔或增加分区数量。
- 可能存在网络延迟或服务器负载过高的情况,影响了数据同步。
- 代码逻辑中可能存在并发处理不当的问题,特别是在高并发场景下。
建议您检查这些方面,并根据实际情况进行调整。如果您有更多具体问题或需要进一步的帮助,请随时联系我。