Celery收到未注册的任务类型(run example)。

155

我正在尝试运行Celery文档中的示例

我运行了:celeryd --loglevel=INFO

/usr/local/lib/python2.7/dist-packages/celery/loaders/default.py:64: NotConfigured: No 'celeryconfig' module found! Please make sure it exists and is available to Python.
  "is available to Python." % (configname, )))
[2012-03-19 04:26:34,899: WARNING/MainProcess]  

 -------------- celery@ubuntu v2.5.1
---- **** -----
--- * ***  * -- [Configuration]
-- * - **** ---   . broker:      amqp://guest@localhost:5672//
- ** ----------   . loader:      celery.loaders.default.Loader
- ** ----------   . logfile:     [stderr]@INFO
- ** ----------   . concurrency: 4
- ** ----------   . events:      OFF
- *** --- * ---   . beat:        OFF
-- ******* ----
--- ***** ----- [Queues]
 --------------   . celery:      exchange:celery (direct) binding:celery

tasks.py:

# -*- coding: utf-8 -*-
from celery.task import task

@task
def add(x, y):
    return x + y

运行任务的文件:run_task.py

# -*- coding: utf-8 -*-
from tasks import add
result = add.delay(4, 4)
print (result)
print (result.ready())
print (result.get())

在同一个文件夹中的 celeryconfig.py 文件中:

CELERY_IMPORTS = ("tasks", )
CELERY_RESULT_BACKEND = "amqp"
BROKER_URL = "amqp://guest:guest@localhost:5672//"
CELERY_TASK_RESULT_EXPIRES = 300
当我运行"run_task.py"时,在Python控制台上。
eb503f77-b5fc-44e2-ac0b-91ce6ddbf153
False

celeryd服务器上的错误

[2012-03-19 04:34:14,913: ERROR/MainProcess] Received unregistered task of type 'tasks.add'.
The message has been ignored and discarded.

Did you remember to import the module containing this task?
Or maybe you are using relative imports?
Please see http://bit.ly/gLye1c for more information.

The full contents of the message body was:
{'retries': 0, 'task': 'tasks.add', 'utc': False, 'args': (4, 4), 'expires': None, 'eta': None, 'kwargs': {}, 'id': '841bc21f-8124-436b-92f1-e3b62cafdfe7'}

Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/celery/worker/consumer.py", line 444, in receive_message
    self.strategies[name](message, body, message.ack_log_error)
KeyError: 'tasks.add'
请解释问题所在。

16
你能分享一下问题是什么,以及你是如何解决的吗?被采纳的答案没有明确说明其他人该如何解决这个问题。谢谢。 - Jordan Reiter
4
我支持Jordan的观点-这一点毫无帮助。被踩了。 - Jay Taylor
3
AIHO的答案是正确的:CELERY_IMPORTS = ("tasks", ) - Alp
40个回答

124
重新启动工作服务器。我之前遇到了同样的问题,通过重新启动解决了。

6
这对我有用。如果您正在使用celeryd脚本,则工作进程会在启动时导入您的任务模块。即使您创建了更多任务函数或修改了现有函数,工作进程仍将使用其内存中的副本,就像在读取它们时一样。 - Mark
4
注意:您可以通过运行“celery inspect registered”来验证您的任务是否已注册。 - Nick Brady
5
你也可以使用选项 --autoreload 启动 celery,这样每次代码改动后都会重新启动 celery。 - Sergey Lyapustin
很遗憾,此方法已不再推荐使用。您可以尝试使用以下链接提供的解决方案:https://avilpage.com/2017/05/how-to-auto-reload-celery-workers-in-development.html - Tomasz Szkudlarek

62

我曾遇到同样的问题: "Received unregistered task of type.." 的原因是 celeryd 服务在启动时没有找到并注册任务(顺便提一句,当你运行 ./manage.py celeryd --loglevel=info 时,任务列表将可见)。

这些任务应该在设置文件中声明为 CELERY_IMPORTS = ("tasks", )
如果你有一个特殊的 celery_settings.py 文件,需要在启动 celeryd 服务时使用 --settings=celery_settings.py 声明,就像 digivampire 写的那样。


2
谢谢,事实上我的问题是因为我使用了~/path/to/celery/celeryd来启动celery,而不是使用manage.py命令! - Antoine

56

您可以在celery.registry.TaskRegistry类中查看当前注册任务的列表。可能是因为您的celeryconfig(在当前目录中)不在PYTHONPATH中,因此无法找到它并返回默认值。启动celery时,请明确指定它。

celeryd --loglevel=INFO --settings=celeryconfig

你还可以设置--loglevel=DEBUG,这样你很可能会立即看到问题。


7
对于--loglevel=DEBUG给予+1,我的任务中出现了语法错误。 - Jacob Valenta
21
celeryd已经过时。现在应该像这样运行celery worker来处理Django,例如:celery --app=your_app.celery worker --loglevel=info - andilabs
对于我来说(celery 3.1.23),我必须使用celery.registry.tasks来查看所有当前任务的列表。您可以随时通过运行dir(celery.registry)进行检查。 - Nick Brady
从我的角度来看,对于“--loglevel=DEBUG”,意思是调试级别日志。 - Shobi

48
app = Celery('proj',
             broker='amqp://',
             backend='amqp://',
             include=['proj.tasks'])
请包含=['proj.tasks'] 你需要进入顶层目录,然后执行此命令。
celery -A app.celery_module.celeryapp worker --loglevel=info

celery -A celeryapp worker --loglevel=info

在您的 celeryconfig.py 文件中,输入 imports = ("path.path.tasks",)

请在其他模块中调用任务!!!!!


3
如果您使用相对导入,需要添加"include"参数。我通过添加它来解决了我的问题。 - CK.Nguyen
这应该是被接受的答案;你需要从顶级目录调用worker,以便celery启动命令中的路径与客户端中的导入路径匹配。 - Edward Gaere
我完全不理解这个。您能否提供一个代码示例或解释一下 'proj.tasks' 代表什么?您指的是 settings.py 所在的根文件夹还是 tasks.py 所处的应用程序? - codyc4321
没有代码示例来说明应该在 celeryconfig.py 中放置什么。 - codyc4321

42
无论使用CELERY_IMPORTS还是autodiscover_tasks,重要的是任务能够被找到,并且在Celery中注册的任务名称应该与工作进程尝试获取的名称匹配。当启动Celery时,执行celery worker -A project --loglevel=DEBUG命令,您应该看到任务的名称。例如,如果我在celery.py中有一个debug_task任务。
[tasks]
. project.celery.debug_task
. celery.backend_cleanup
. celery.chain
. celery.chord
. celery.chord_unlock
. celery.chunks
. celery.group
. celery.map
. celery.starmap
如果您在列表中看不到自己的任务,请检查您的celery配置是否正确导入了任务,可以在--setting--configceleryconfigconfig_from_object中进行设置。 如果您使用的是celery beat,请确保CELERYBEAT_SCHEDULE中使用的任务名称task与celery任务列表中的名称匹配。

这非常有帮助。任务的名称需要与您的CELERYBEAT_SCHEDULE中的“task”键匹配。 - ss_millionaire
重要的是任务能够被找到,并且在Celery中注册的任务名称应该与工作进程尝试获取的名称匹配。好的观点! - Light.G
这是正确的答案。在BEAT_SCHEDULER中,您的任务名称应与自动发现任务列表中显示的内容匹配。因此,如果您使用了@task(name='check_periodically'),那么它应该与您在节拍计划中放置的内容相匹配,即:CELERY_BEAT_SCHEDULE = { 'check_periodically': { 'task': 'check_periodically', 'schedule': timedelta(seconds=1) } - Mormoran

30
我也遇到了同样的问题;我添加了
CELERY_IMPORTS=("mytasks")

我在celeryconfig.py文件中解决了它。


12
注意,这应该是一个列表或元组:CELERY_IMPORTS = ['my_module'] - asksol
这对我来说解决了问题。 - Riziero

14

对我来说有效的方法是为celery任务装饰器添加明确的名称。我把我的任务声明从@app.tasks改成了@app.tasks(name='module.submodule.task')

以下是一个例子

起初,我的任务看起来像这样:

# tasks/test_tasks.py
@celery.task
def test_task():
    print("Celery Task  !!!!")

我将它改为:

# tasks/test_tasks.py
@celery.task(name='tasks.test_tasks.test_task')
def test_task():
    print("Celery Task  !!!!")

当您没有专门的tasks.py文件可以将其包含在celery配置中时,此方法很有帮助。


这对我也起作用了,但如果我在“name” kwarg中指定了完整路径,而不是只复制名称,那么它就不起作用,所以只需使用celery.task(name='test_task')。很蠢,但它有效。正在尝试弄清楚为什么。 - Chris
也适用于我。 - Edward Gaere

12

使用 --settings 对我没有起作用。我不得不使用以下内容才能让所有东西正常工作:

celery --config=celeryconfig --loglevel=INFO

以下是已添加CELERY_IMPORTS的celeryconfig文件:

# Celery configuration file
BROKER_URL = 'amqp://'
CELERY_RESULT_BACKEND = 'amqp://'

CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'America/Los_Angeles'
CELERY_ENABLE_UTC = True

CELERY_IMPORTS = ("tasks",)

我的设置略微麻烦,因为我正在使用supervisor将celery作为守护程序启动。


11

对于我来说,解决这个错误的方法是确保包含任务的应用程序在Django的INSTALLED_APPS设置中被包含。


此外,这些任务需要从<app>/tasks.py中访问。 - Niko Pasanen

7
在我的情况下,问题是我的项目没有正确地获取autodiscover_tasks。 在celery.py文件中,用于获取autodiscover_tasks的代码如下:
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
我把它更改为以下内容:
from django.apps import apps
app.autodiscover_tasks(lambda: [n.name for n in apps.get_app_configs()])

祝你一切顺利。


好的,它确实有效,但为什么?顺便说一下,谢谢 :) - Taras Mykhalchuk

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,