作者:hello簞調_290 | 来源:互联网 | 2023-10-15 15:43
Checklist
- [x] I have included information about relevant versions
- [ ] I have verified that the issue persists when using the
branch of Faust.
Steps to reproduce
I have a
topic with 300,000 messages. Using the code below, with exactly_once enabled, I tested the consumption and the count of the
3 times. Always 300,000 if the process is not interrupted. Then, I tested a fourth time and killed the faust process in the middle of the consumption, then restarted it to finish. The table counter was missing a lot of events. The final count was 299,356.
Expected behavior
I expected not to lose events, if I have
support and the faust process is abruptly terminated and then, restarted. So, in this case I expected to have 300,000 as the final count.
Actual behavior
The final count was wrong.
Full traceback
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| python
import faust
app = faust.App(
'count_events',
processing_guarantee='exactly_once',
broker='kafka://localhost:19092',
store='rocksdb://',
version=VERSION
)
# source topic
source = app.topic('example-source-topic')
# table
table = app.Table('totals', default=int)
.agent(source)
async def process(stream):
async for event in stream.events():
table['total'] += 1
.page('/table/{total}/')
.table_route(table=table, match_info='total')
async def get_count(web, request, total):
return web.json({
total: table['total'],
}) |
Versions
- Python 3.6.7
- Faust 1.6.0
- Operating system Ubuntu 18.10
- Kafka version 2.2.0
- RocksDB version (if applicable) 0.7.0
该提问来源于开源项目:robinhood/faust
The source topic in your example is not partitioned? Are you running a different example?