作者:mobiledu2502923977 | 来源:互联网 | 2023-05-19 03:24
IfollowtheCeleryDjangotutorialandthetasksIseeintheexample(add,mul)workformeperfec
I follow the Celery Django tutorial and the tasks I see in the example (add, mul
) work for me perfectly. I get the correct response when I do res = add.delay(1,2); res.get()
.
我按照Celery Django教程和我在示例中看到的任务(add,mul)完美地为我工作。当我做res = add.delay(1,2)时,我得到正确的答案; res.get()。
But I get *** NotRegistered: u'pipeline.tasks.sayhello'
when I try to execute another my task res = sayhello.delay('trex')
.
但是当我尝试执行另一个我的任务res = sayhello.delay('trex')时,我得到*** NotRegistered:u'pipeline.tasks.sayhello'。
If I do res = sayhello('trex')
then I can get the result by just typing res
. But in this way, I execute the function ornidarly, without using Celery.
如果我做res = sayhello('trex'),那么我只需输入res即可得到结果。但是通过这种方式,我可以在不使用Celery的情况下执行ornidarly函数。
The task works only if I run it in the Django shell ./manage shell
该任务仅在我在Django shell ./manage shell中运行时才有效
>>> res = sayhello.delay('trex')
>>> res.get()
u'Hello trex'
So, the problem is that I can't execute sayhello
task from pipeline/views.py
. But I can execute tasks add
and mul
from there.
所以,问题是我无法从pipeline / views.py执行sayhello任务。但我可以从那里执行任务add和mul。
Why is that? How to run tasks correctly from views.py
?
这是为什么?如何从views.py正确运行任务?
The error full message:
错误已完整消息:
[2016-11-11 10:56:09,870: ERROR/MainProcess] Received unregistered task of type u'pipeline.tasks.sayhello'.
The message has been ignored and discarded.
Did you remember to import the module containing this task?
Or maybe you're using relative imports?
Please see
http://docs.celeryq.org/en/latest/internals/protocol.html
for more information.
The full contents of the message body was:
'[["tiger"], {}, {"chord": null, "callbacks": null, "errbacks": null, "chain": null}]' (84b)
Traceback (most recent call last):
File "/home/trex/Development/Sirius/new/rocket/rocket-venv/local/lib/python2.7/site-packages/celery/worker/consumer/consumer.py", line 549, in on_task_received
strategy = strategies[type_]
KeyError: u'pipeline.tasks.sayhello'
Django version
Django版本
1.9.7
Celery version:
芹菜版:
celery==4.0.0
django-celery==3.1.17
Django project dir tree:
Django项目目录树:
rocket
├── etl
│ ├── etl
│ │ ├── celery.py
│ │ ├── __init__.py
│ │ ├── settings
│ │ │ ├── base.py
│ │ │ ├── dev.py
│ │ │ ├── __init__.py
│ │ │ ├── production.py
│ │ │ └── test.py
│ │ ├── urls.py
│ │ ├── wsgi.py
│ ├── manage.py
│ ├── pipeline
│ │ ├── __init__.py
│ │ ├── models.py
│ │ ├── tasks.py
│ │ ├── tests.py
│ │ ├── urls.py
│ │ ├── views.py
etl/pipeline/views.py
ETL /管道/ views.py
from .tasks import *
def get_response(request):
result = add.delay(1, 2)
result.get()
result = sayhello.delay('tiger')
result.get()
etl/pipeline/tasks.py
ETL /管道/ tasks.py
from __future__ import absolute_import, unicode_literals
from celery import shared_task
@shared_task
def add(x, y):
return x + y
@shared_task
def mul(x, y):
return x * y
@shared_task
def sayhello(name):
return "Hello %s" % name
Also I tried this:
我也试过这个:
from celery.decorators import task
@task(name="sayhello")
def sayhello(name):
return "Hello {0}".format(name)
etl/celery.py
ETL / celery.py
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'etl.settings.base')
app = Celery('etl')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()
@app.task(bind=True)
def debug_task(self):
print('Request: {0!r}'.format(self.request))
etl/__init__py
ETL / __ init__py
from __future__ import absolute_import, unicode_literals
from .celery import app as celery_app
__all__ = ['celery_app']
etl/settings/base.py
ETL /设置/ base.py
...
CELERY_BROKER_URL = 'redis://localhost:6379'
CELERY_RESULT_BACKEND = 'redis://localhost:6379'
CELERY_ACCEPT_COnTENT= ['application/json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZOnE= 'Europe/London'
CELERY_IMPORTS = ('pipeline.tasks', )
1 个解决方案