我正在使用celery和django-celery。我定义了一个定期任务,希望进行测试。是否可以从shell手动运行定期任务以便查看控制台输出?
我正在使用celery和django-celery。我定义了一个定期任务,希望进行测试。是否可以从shell手动运行定期任务以便查看控制台输出?
你尝试过在Django shell中直接运行任务吗?您可以使用任务的.apply
方法确保它立即在本地运行。
假设任务在Django应用程序myapp
中名为my_task
,并位于tasks
子模块中:
$ python manage.py shell
>>> from myapp.tasks import my_task
>>> eager_result = my_task.apply()
结果实例具有与通常的AsyncResult
类型相同的API,唯一的区别是结果总是急切地在本地计算,并且.apply()
方法将阻塞直到任务完成运行。
如果您的意思是只有在条件不满足时触发任务,例如,定期时间未达到。您可以分两步完成。
1.获取您的任务ID。
您可以通过键入来完成此操作。
celery inspect registered
你会看到类似于app.tasks.update_something
的东西。
如果没有出现,很可能是因为celery
没有启动。只需运行它即可。
2.使用celery call
运行任务
celery call app.tasks.update_something
要获取更多详细信息,只需输入
celery --help
celery inspect --help
celery call --help
celery -A yourapp call app.tasks.update_something --kwargs='{"key": value,...}'
。 - Erik Kalkokeninspect
功能很棒,但遗憾的是它仅适用于 "RabbitMQ(AMQP)和 Redis 传输" (而不包括像 filesystem
这样的其他传输方式)。 - R.我认为你需要打开两个终端窗口:一个用于在Python/Django shell中执行任务,另一个用于运行celery worker
(python manage.py celery worker
)。并且如前面的回答所说,你可以使用apply()
或apply_async()
来运行任务。
我已经编辑了答案,以便您不再使用废弃的命令。
manage.py
脚本。)抱歉。 - Platinum Azure