热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Sendingadeletion/tombstonemessagetoatopicraisesanerror

IamtryingtobuildaTablethatmaintainsalistofavailableitemsbyID.Inorderto

I am trying to build a Table that maintains a list of available items by ID. In order to keep the table up-to-date, I need to send deletion messagse (i.e. same key as a previous message, but

1
value=None

) to a Faust topic. However, this fails. Am I using the API incorrectly or is this an issue with the default serializer?

Checklist


  • [x] I have included information about relevant versions

  • [ ] I have verified that the issue persists when using the
    1
    master

    branch of Faust.


Steps to reproduce

Send a message with

1
value=None

to a topic.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
py

import faust



app = faust.App('test', broker='kafka://localhost:9094', store='memory://')



class Item(faust.Record):

    id: str

    val: str



item_topic = app.topic('items', value_type=Item)

items = app.Table('items')



.agent(item_topic)

async def build_item_index(stream: faust.Stream):

    async for item in stream:

        if item:

            print(f'Adding {item.id} to index')

            items[item.id] = item

        else:

            print(f'Removing {item.id} from index')

            del items[item.id]



.task

async def on_started():

    await item_topic.send(key='1234', value=Item(1, 'a'))

    await item_topic.send(key='1234', value=None) # Causing the error


Expected behavior

Agents that process

1
item_topic

receive an event that is

1
None

Actual behavior

Agents never receive the deletion event.

Full traceback

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
pytb

[2018-08-21 08:45:09,790: INFO]: [^Worker]: Ready

[2018-08-21 08:45:09,921: WARNING]: Adding 1 to index

[2018-08-21 08:45:09,994: ERROR]: [^---Agent*: api.test.build_item_index]: Crashed reason=ValueDecodeError('type object argument after ** must be a mapping, not NoneType',)

Traceback (most recent call last):

  File "/home/michael/arbeit/digitalernachschub/ameto/ameto/api/venv/lib64/python3.6/site-packages/faust/serializers/registry.py", line 99, in loads_value

    return cast(V, self._prepare_payload(typ, payload))

  File "/home/michael/arbeit/digitalernachschub/ameto/ameto/api/venv/lib64/python3.6/site-packages/faust/serializers/registry.py", line 122, in _prepare_payload

    return model.from_data(value, preferred_type=model)

  File "/home/michael/arbeit/digitalernachschub/ameto/ameto/api/venv/lib64/python3.6/site-packages/faust/models/record.py", line 280, in from_data

    return (self_cls or cls)(**data, __strict__=False)

TypeError: type object argument after ** must be a mapping, not NoneType



The above exception was the direct cause of the following exception:



Traceback (most recent call last):

  File "/home/michael/arbeit/digitalernachschub/ameto/ameto/api/venv/lib64/python3.6/site-packages/faust/agents/agent.py", line 527, in _execute_task

    await coro

  File "/home/michael/arbeit/digitalernachschub/ameto/ameto/api/api/test.py", line 14, in build_item_index

    async for item in stream:

  File "/home/michael/arbeit/digitalernachschub/ameto/ameto/api/venv/lib64/python3.6/site-packages/faust/streams.py", line 747, in __aiter__

    channel_value = await chan_slow_get()

  File "/home/michael/arbeit/digitalernachschub/ameto/ameto/api/venv/lib64/python3.6/site-packages/faust/channels.py", line 301, in __anext__

    return await self.queue.get()

  File "/home/michael/arbeit/digitalernachschub/ameto/ameto/api/venv/lib64/python3.6/site-packages/mode/utils/queues.py", line 125, in get

    return await super().get()

  File "/usr/lib64/python3.6/asyncio/queues.py", line 167, in get

    yield from getter

  File "/home/michael/arbeit/digitalernachschub/ameto/ameto/api/venv/lib64/python3.6/site-packages/faust/transport/conductor.py", line 80, in on_message

    event = await chan.decode(message, propagate=True)

  File "/home/michael/arbeit/digitalernachschub/ameto/ameto/api/venv/lib64/python3.6/site-packages/faust/topics.py", line 161, in decode

    value_type, message.value, serializer=value_serializer)

  File "/home/michael/arbeit/digitalernachschub/ameto/ameto/api/venv/lib64/python3.6/site-packages/faust/serializers/registry.py", line 104, in loads_value

    sys.exc_info()[2]) from exc

  File "/home/michael/arbeit/digitalernachschub/ameto/ameto/api/venv/lib64/python3.6/site-packages/faust/serializers/registry.py", line 99, in loads_value

    return cast(V, self._prepare_payload(typ, payload))

  File "/home/michael/arbeit/digitalernachschub/ameto/ameto/api/venv/lib64/python3.6/site-packages/faust/serializers/registry.py", line 122, in _prepare_payload

    return model.from_data(value, preferred_type=model)

  File "/home/michael/arbeit/digitalernachschub/ameto/ameto/api/venv/lib64/python3.6/site-packages/faust/models/record.py", line 280, in from_data

    return (self_cls or cls)(**data, __strict__=False)

faust.exceptions.ValueDecodeError: type object argument after ** must be a mapping, not NoneType

[2018-08-21 08:45:09,997: ERROR]: [^---Agent*: api.test.build_item_index]: Crashed reason=ValueDecodeError('type object argument after ** must be a mapping, not NoneType',)

Traceback (most recent call last):

  File "/home/michael/arbeit/digitalernachschub/ameto/ameto/api/venv/lib64/python3.6/site-packages/faust/serializers/registry.py", line 99, in loads_value

    return cast(V, self._prepare_payload(typ, payload))

  File "/home/michael/arbeit/digitalernachschub/ameto/ameto/api/venv/lib64/python3.6/site-packages/faust/serializers/registry.py", line 122, in _prepare_payload

    return model.from_data(value, preferred_type=model)

  File "/home/michael/arbeit/digitalernachschub/ameto/ameto/api/venv/lib64/python3.6/site-packages/faust/models/record.py", line 280, in from_data

    return (self_cls or cls)(**data, __strict__=False)

TypeError: type object argument after ** must be a mapping, not NoneType



The above exception was the direct cause of the following exception:



Traceback (most recent call last):

  File "/home/michael/arbeit/digitalernachschub/ameto/ameto/api/venv/lib64/python3.6/site-packages/mode/services.py", line 688, in _execute_task

    await task

  File "/home/michael/arbeit/digitalernachschub/ameto/ameto/api/venv/lib64/python3.6/site-packages/faust/agents/agent.py", line 527, in _execute_task

    await coro

  File "/home/michael/arbeit/digitalernachschub/ameto/ameto/api/api/test.py", line 14, in build_item_index

    async for item in stream:

  File "/home/michael/arbeit/digitalernachschub/ameto/ameto/api/venv/lib64/python3.6/site-packages/faust/streams.py", line 747, in __aiter__

    channel_value = await chan_slow_get()

  File "/home/michael/arbeit/digitalernachschub/ameto/ameto/api/venv/lib64/python3.6/site-packages/faust/channels.py", line 301, in __anext__

    return await self.queue.get()

  File "/home/michael/arbeit/digitalernachschub/ameto/ameto/api/venv/lib64/python3.6/site-packages/mode/utils/queues.py", line 125, in get

    return await super().get()

  File "/usr/lib64/python3.6/asyncio/queues.py", line 167, in get

    yield from getter

  File "/home/michael/arbeit/digitalernachschub/ameto/ameto/api/venv/lib64/python3.6/site-packages/faust/transport/conductor.py", line 80, in on_message

    event = await chan.decode(message, propagate=True)

  File "/home/michael/arbeit/digitalernachschub/ameto/ameto/api/venv/lib64/python3.6/site-packages/faust/topics.py", line 161, in decode

    value_type, message.value, serializer=value_serializer)

  File "/home/michael/arbeit/digitalernachschub/ameto/ameto/api/venv/lib64/python3.6/site-packages/faust/serializers/registry.py", line 104, in loads_value

    sys.exc_info()[2]) from exc

  File "/home/michael/arbeit/digitalernachschub/ameto/ameto/api/venv/lib64/python3.6/site-packages/faust/serializers/registry.py", line 99, in loads_value

    return cast(V, self._prepare_payload(typ, payload))

  File "/home/michael/arbeit/digitalernachschub/ameto/ameto/api/venv/lib64/python3.6/site-packages/faust/serializers/registry.py", line 122, in _prepare_payload

    return model.from_data(value, preferred_type=model)

  File "/home/michael/arbeit/digitalernachschub/ameto/ameto/api/venv/lib64/python3.6/site-packages/faust/models/record.py", line 280, in from_data

    return (self_cls or cls)(**data, __strict__=False)

faust.exceptions.ValueDecodeError: type object argument after ** must be a mapping, not NoneType


Versions
  • Python version: 3.6.5

  • Faust version: 1.0.30

  • Operating system: Linux

  • Kafka version: 1.0.1

  • RocksDB version (if applicable)

该提问来源于开源项目:robinhood/faust

Thank you!


推荐阅读
author-avatar
mobiledu2502884357
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有