我有一个celery任务,队列中有100个输入数据,需要使用5个工作进程执行。
- 如何获取正在执行哪个输入的工作进程?
- 每个工作进程执行了多少个输入以及它们的状态?
- 如果任何任务失败,如何单独获取失败的输入数据并重新使用可用的工作进程执行?
是否有可能根据工作进程进行自定义celery配置。
我们可以结合celery工作进程限制和flower。
我没有使用任何框架。
我有一个celery任务,队列中有100个输入数据,需要使用5个工作进程执行。
是否有可能根据工作进程进行自定义celery配置。
我们可以结合celery工作进程限制和flower。
我没有使用任何框架。
Flower提供筛选器来过滤失败的任务,并在退出时提供任务返回的状态。您还可以设置celery应重试失败的任务次数。但是,即使经过重试任务仍然失败,那么您仍需要自己重新启动该任务。
For the first and second question:
1) Using Flower API:
You can use celery flower to keep track of it. Flower api can provide you the information like which task is being executed by which worker through simple api calls (/api/task/info/<task_id>)
2) Querying celery directly:
from celery import Celery
celery = Celery('vwadaptor', broker='redis://workerdb:6379/0',backend='redis://workerdb:6379/0')
celery.control.inspect().active()
3) Using celery events:
Link : http://docs.celeryproject.org/en/latest/userguide/monitoring.html
(look Real-time Processing)
You can create an event ( task created, task received, etc) and the response will have the worker name(hostname , see the link)
For the third question:
Use the config entry 'CELERY_ACKS_LATE=True' to retry failed tasks.
celery.conf.update(
CELERY_ACKS_LATE=True,
)
You can also track failed tasks using celery events mentioned above and retry failed tasks manually.