I am trying to build a Table that maintains a list of available items by ID. In order to keep the table up-to-date, I need to send deletion messagse (i.e. same key as a previous message, but
1 | value=None |
) to a Faust topic. However, this fails. Am I using the API incorrectly or is this an issue with the default serializer?
1 | master |
branch of Faust.
Send a message with
1 | value=None |
to a topic.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 | py import faust app = faust.App('test', broker='kafka://localhost:9094', store='memory://') class Item(faust.Record): id: str val: str item_topic = app.topic('items', value_type=Item) items = app.Table('items') .agent(item_topic) async def build_item_index(stream: faust.Stream): async for item in stream: if item: print(f'Adding {item.id} to index') items[item.id] = item else: print(f'Removing {item.id} from index') del items[item.id] .task async def on_started(): await item_topic.send(key='1234', value=Item(1, 'a')) await item_topic.send(key='1234', value=None) # Causing the error |
Agents that process
1 | item_topic |
receive an event that is
1 | None |
Agents never receive the deletion event.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 | pytb [2018-08-21 08:45:09,790: INFO]: [^Worker]: Ready [2018-08-21 08:45:09,921: WARNING]: Adding 1 to index [2018-08-21 08:45:09,994: ERROR]: [^---Agent*: api.test.build_item_index]: Crashed reason=ValueDecodeError('type object argument after ** must be a mapping, not NoneType',) Traceback (most recent call last): File "/home/michael/arbeit/digitalernachschub/ameto/ameto/api/venv/lib64/python3.6/site-packages/faust/serializers/registry.py", line 99, in loads_value return cast(V, self._prepare_payload(typ, payload)) File "/home/michael/arbeit/digitalernachschub/ameto/ameto/api/venv/lib64/python3.6/site-packages/faust/serializers/registry.py", line 122, in _prepare_payload return model.from_data(value, preferred_type=model) File "/home/michael/arbeit/digitalernachschub/ameto/ameto/api/venv/lib64/python3.6/site-packages/faust/models/record.py", line 280, in from_data return (self_cls or cls)(**data, __strict__=False) TypeError: type object argument after ** must be a mapping, not NoneType The above exception was the direct cause of the following exception: Traceback (most recent call last): File "/home/michael/arbeit/digitalernachschub/ameto/ameto/api/venv/lib64/python3.6/site-packages/faust/agents/agent.py", line 527, in _execute_task await coro File "/home/michael/arbeit/digitalernachschub/ameto/ameto/api/api/test.py", line 14, in build_item_index async for item in stream: File "/home/michael/arbeit/digitalernachschub/ameto/ameto/api/venv/lib64/python3.6/site-packages/faust/streams.py", line 747, in __aiter__ channel_value = await chan_slow_get() File "/home/michael/arbeit/digitalernachschub/ameto/ameto/api/venv/lib64/python3.6/site-packages/faust/channels.py", line 301, in __anext__ return await self.queue.get() File "/home/michael/arbeit/digitalernachschub/ameto/ameto/api/venv/lib64/python3.6/site-packages/mode/utils/queues.py", line 125, in get return await super().get() File "/usr/lib64/python3.6/asyncio/queues.py", line 167, in get yield from getter File "/home/michael/arbeit/digitalernachschub/ameto/ameto/api/venv/lib64/python3.6/site-packages/faust/transport/conductor.py", line 80, in on_message event = await chan.decode(message, propagate=True) File "/home/michael/arbeit/digitalernachschub/ameto/ameto/api/venv/lib64/python3.6/site-packages/faust/topics.py", line 161, in decode value_type, message.value, serializer=value_serializer) File "/home/michael/arbeit/digitalernachschub/ameto/ameto/api/venv/lib64/python3.6/site-packages/faust/serializers/registry.py", line 104, in loads_value sys.exc_info()[2]) from exc File "/home/michael/arbeit/digitalernachschub/ameto/ameto/api/venv/lib64/python3.6/site-packages/faust/serializers/registry.py", line 99, in loads_value return cast(V, self._prepare_payload(typ, payload)) File "/home/michael/arbeit/digitalernachschub/ameto/ameto/api/venv/lib64/python3.6/site-packages/faust/serializers/registry.py", line 122, in _prepare_payload return model.from_data(value, preferred_type=model) File "/home/michael/arbeit/digitalernachschub/ameto/ameto/api/venv/lib64/python3.6/site-packages/faust/models/record.py", line 280, in from_data return (self_cls or cls)(**data, __strict__=False) faust.exceptions.ValueDecodeError: type object argument after ** must be a mapping, not NoneType [2018-08-21 08:45:09,997: ERROR]: [^---Agent*: api.test.build_item_index]: Crashed reason=ValueDecodeError('type object argument after ** must be a mapping, not NoneType',) Traceback (most recent call last): File "/home/michael/arbeit/digitalernachschub/ameto/ameto/api/venv/lib64/python3.6/site-packages/faust/serializers/registry.py", line 99, in loads_value return cast(V, self._prepare_payload(typ, payload)) File "/home/michael/arbeit/digitalernachschub/ameto/ameto/api/venv/lib64/python3.6/site-packages/faust/serializers/registry.py", line 122, in _prepare_payload return model.from_data(value, preferred_type=model) File "/home/michael/arbeit/digitalernachschub/ameto/ameto/api/venv/lib64/python3.6/site-packages/faust/models/record.py", line 280, in from_data return (self_cls or cls)(**data, __strict__=False) TypeError: type object argument after ** must be a mapping, not NoneType The above exception was the direct cause of the following exception: Traceback (most recent call last): File "/home/michael/arbeit/digitalernachschub/ameto/ameto/api/venv/lib64/python3.6/site-packages/mode/services.py", line 688, in _execute_task await task File "/home/michael/arbeit/digitalernachschub/ameto/ameto/api/venv/lib64/python3.6/site-packages/faust/agents/agent.py", line 527, in _execute_task await coro File "/home/michael/arbeit/digitalernachschub/ameto/ameto/api/api/test.py", line 14, in build_item_index async for item in stream: File "/home/michael/arbeit/digitalernachschub/ameto/ameto/api/venv/lib64/python3.6/site-packages/faust/streams.py", line 747, in __aiter__ channel_value = await chan_slow_get() File "/home/michael/arbeit/digitalernachschub/ameto/ameto/api/venv/lib64/python3.6/site-packages/faust/channels.py", line 301, in __anext__ return await self.queue.get() File "/home/michael/arbeit/digitalernachschub/ameto/ameto/api/venv/lib64/python3.6/site-packages/mode/utils/queues.py", line 125, in get return await super().get() File "/usr/lib64/python3.6/asyncio/queues.py", line 167, in get yield from getter File "/home/michael/arbeit/digitalernachschub/ameto/ameto/api/venv/lib64/python3.6/site-packages/faust/transport/conductor.py", line 80, in on_message event = await chan.decode(message, propagate=True) File "/home/michael/arbeit/digitalernachschub/ameto/ameto/api/venv/lib64/python3.6/site-packages/faust/topics.py", line 161, in decode value_type, message.value, serializer=value_serializer) File "/home/michael/arbeit/digitalernachschub/ameto/ameto/api/venv/lib64/python3.6/site-packages/faust/serializers/registry.py", line 104, in loads_value sys.exc_info()[2]) from exc File "/home/michael/arbeit/digitalernachschub/ameto/ameto/api/venv/lib64/python3.6/site-packages/faust/serializers/registry.py", line 99, in loads_value return cast(V, self._prepare_payload(typ, payload)) File "/home/michael/arbeit/digitalernachschub/ameto/ameto/api/venv/lib64/python3.6/site-packages/faust/serializers/registry.py", line 122, in _prepare_payload return model.from_data(value, preferred_type=model) File "/home/michael/arbeit/digitalernachschub/ameto/ameto/api/venv/lib64/python3.6/site-packages/faust/models/record.py", line 280, in from_data return (self_cls or cls)(**data, __strict__=False) faust.exceptions.ValueDecodeError: type object argument after ** must be a mapping, not NoneType |
该提问来源于开源项目:robinhood/faust
Thank you!