Celery工作者细节

3

我有一个celery任务,队列中有100个输入数据,需要使用5个工作进程执行。

  • 如何获取正在执行哪个输入的工作进程?
  • 每个工作进程执行了多少个输入以及它们的状态?
  • 如果任何任务失败,如何单独获取失败的输入数据并重新使用可用的工作进程执行?

是否有可能根据工作进程进行自定义celery配置。

我们可以结合celery工作进程限制和flower。

我没有使用任何框架。

2个回答

0
如何获取正在执行哪个输入的工作程序?
使用多个工作程序有两个选项:
1. 您可以使用单独的运行命令分别运行每个工作程序。 2. 您可以使用命令行选项-c(并发)在一个命令中运行。
第一种方法,flower将支持它,并显示所有工作程序、所有任务(您调用的输入)、哪个工作程序处理了哪个任务以及其他信息。
使用第二种方法,flower将显示单个工作程序正在处理的所有任务。在这种情况下,您只能通过查看由celery worker生成的日志来区分,因为日志会存储哪个工作程序线程执行了哪个任务。因此,考虑到您的要求,我认为您最好使用第一种选项。
每个工作程序执行了多少个输入和其状态?
正如我所提到的,使用第一种方法,flower将为您提供此信息。
如果任何任务失败,如何单独获取失败的输入数据并重新使用可用的工作程序执行?

Flower提供筛选器来过滤失败的任务,并在退出时提供任务返回的状态。您还可以设置celery应重试失败的任务次数。但是,即使经过重试任务仍然失败,那么您仍需要自己重新启动该任务。


0
For the first and second question:

1) Using Flower API:
You can use celery flower to keep track of it. Flower api can provide you the information like which task is being executed by which worker through simple api calls (/api/task/info/<task_id>) 

2) Querying celery directly:
from celery import Celery
celery = Celery('vwadaptor', broker='redis://workerdb:6379/0',backend='redis://workerdb:6379/0')
celery.control.inspect().active()

3) Using celery events:
   Link : http://docs.celeryproject.org/en/latest/userguide/monitoring.html
   (look Real-time Processing)
   You can create an event  ( task created, task received, etc) and the response will have the worker name(hostname , see the link)

For the third question:
Use the config entry 'CELERY_ACKS_LATE=True' to retry failed tasks.
celery.conf.update(
    CELERY_ACKS_LATE=True,
)

You can also track failed tasks using celery events mentioned above and retry failed tasks manually.

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接