如何在Celery中检查任务状态?

134

如何检查Celery中的任务是否正在运行(具体而言,我正在使用celery-django)?

我已阅读文档并进行了谷歌搜索,但我没有找到类似这样的调用:

my_example_task.state() == RUNNING

我的使用情况是我有一个外部(java)转码服务。当我发送要进行转码的文档时,我希望检查运行该服务的任务是否正在运行,如果没有运行,则重新启动它。

我正在使用当前稳定版本-我相信是2.4。


在我的情况下,这部分有所帮助。 - Nima
13个回答

127

返回任务ID(此ID由.delay()提供),然后向Celery实例查询状态:

x = method.delay(1,2)
print x.task_id

在提问时,使用此 task_id 获取一个新的异步结果:

from celery.result import AsyncResult
res = AsyncResult("your-task-id")
res.ready()

12
谢谢,但是如果我无法访问 x 呢? - Marcin
6
你是将任务放入Celery队列的位置?在那里,你需要返回任务ID以便在未来跟踪该任务。 - Gregor
1
@ArnauOrriols,抱歉我不明白。AsyncResult(id)task.AsyncResult(id)有什么不同?x是指任务吗?你的意思是res.state会引发错误吗?后端配置是什么?我调用了async_result = run_instance.s(4).apply_async(),然后稍后使用res = async_result.get() except.. - 我是否失去了配置?抱歉,感到非常迷茫,想知道这是否可以澄清一些困惑(例如:https://dev59.com/f5Pfa4cB1Zd3GeqPFJdh和这个`get()`调用在异常情况下很少消失)。谢谢。 - Chris
2
@Chris,与@gregor代码的争议在于async_result的实例化。在您的用例中,您已经拥有了该实例,可以直接使用。但是如果您只有任务ID,并且需要实例化一个async_result实例才能调用async_result.get(),该怎么办?这是AsyncResult类的一个实例,但您不能使用原始类celery.result.AsyncResult,您需要从由app.task()包装的函数中获取该类。在您的情况下,您将执行async_result = run_instance.AsyncResult('task-id') - ArnauOrriols
1
但是你不能使用原始的类celery.result.AsyncResult,你需要从由app.task()包装的函数中获取该类。我认为这才是它实际上应该被使用的方式。请阅读代码:https://github.com/celery/celery/blob/c26e30bad8e141e80f2f62900474121ac52476ac/celery/result.py#L92 - nevelis
显示剩余3条评论

108
从任务ID创建一个AsyncResult对象是FAQ中推荐的获取任务状态的方法,当你只拥有任务ID时。

然而,在Celery 3.x中,存在一些重要的注意事项,如果不注意可能会出现问题。这确实取决于具体的用例场景。

默认情况下,Celery不记录“运行”状态。

为了让Celery记录任务正在运行,你必须将task_track_started设置为True。这里有一个简单的任务来测试这个功能:

@app.task(bind=True)
def test(self):
    print self.AsyncResult(self.request.id).state

task_track_startedFalse时,即默认情况下,状态显示为PENDING,即使任务已经开始。如果将task_track_started设置为True,则状态将变为STARTED

状态PENDING的意思是“我不知道”。

状态为PENDINGAsyncResult并没有比Celery不知道任务状态更多的含义。这可能是由于许多原因造成的。

首先,AsyncResult可以使用无效的任务ID构建。这样的“任务”将被Celery视为待处理:

>>> task.AsyncResult("invalid").status
'PENDING'

好的,显然没有人会向 AsyncResult 提供无效的id。这很公平,但它也有一个影响,即AsyncResult 会认为Celery已经忘记了一个成功运行的任务,而将其视为PENDING。在一些使用场景中,这可能是个问题。问题的一部分取决于Celery如何配置以保留任务结果,因为它依赖于结果后端中“墓碑”的可用性。(“墓碑”是Celery文档中用于记录任务结束方式的数据块的术语。)如果task_ignore_result设置为True,则根本无法使用AsyncResult。更棘手的问题是Celery默认情况下会过期“墓碑”。result_expires设置默认为24小时。因此,如果您启动一个任务,并将其ID记录到长期存储中,24小时后再创建一个AsyncResult,状态将为PENDING

所有“真实任务”都从“PENDING”状态开始。因此,任务变成“PENDING”可能意味着该任务已被请求,但由于某种原因未进一步进行。或者它可能意味着任务已运行,但Celery忘记了其状态。
哎呀!AsyncResult对我没用。还有什么其他方法吗?
我更喜欢跟踪“目标”而不是跟踪“任务本身”。我确实保留了一些任务信息,但这些信息与跟踪目标相比较次要。目标存储在与Celery无关的存储中。当请求需要执行计算并取决于已实现某个目标时,它会检查是否已经实现了该目标,如果是,则使用此缓存的目标,否则启动将影响目标的任务,并向发出HTTP请求的客户端发送指示应等待结果的响应。
上面的变量名和超链接是针对 Celery 4.x 的。在 3.x 中,相应的变量和超链接是:CELERY_TRACK_STARTED, CELERY_IGNORE_RESULT, CELERY_TASK_RESULT_EXPIRES

1
那么,如果我想稍后检查结果(甚至在另一个进程中),我最好使用自己的实现?手动将结果存储到数据库中? - Franklin Yu
是的,我会将“目标”和“任务”的跟踪分开。我写了“执行依赖于某个目标的计算”。通常,“目标”也是一种计算。例如,如果我想向用户展示文章X,我必须将其从XML转换为HTML,但在此之前,我必须解决所有的参考文献。 (X就像一篇期刊文章。)我检查是否存在“已解决所有参考文献的文章X”的目标,并使用它而不是尝试检查计算所需的Celery任务的任务状态。 - Louis
并且信息“带有所有参考文献的文章X”被存储在内存缓存中,并存储在eXist-db数据库中。 - Louis

76

每个Task对象都有一个.request属性,其中包含它的AsyncRequest对象。因此,下面这行代码可以得到一个任务task的状态:

task.AsyncResult(task.request.id).state

3
有没有一种方法可以存储任务进度的百分比? - patrick
6
即使我等待足够长的时间任务完成,但当我执行此操作时,我得到一个永久处于“PENDING”状态的异步结果。有没有办法让它看到状态变化?我相信我的后端已经配置好了,并且我尝试将CELERY_TRACK_STARTED设置为True也没用。 - dstromberg
1
很抱歉,这个问题已经过去4年了,我无法提供帮助。你几乎肯定需要配置celery来跟踪状态。 - Marcin
进一步补充@dstromberg的观察,仅为确认起见,我选择了一个我确定已成功完成的celery任务,并检查了其“state”属性,它仍然返回“PENDING”。这似乎不是从终端跟踪celery任务状态的可靠方法。此外,我正在运行Celery Flower(Celery监控工具),由于某种原因,它没有显示我正在查找的任务在执行的任务列表中。我可能需要查看Flower设置,以查看是否有任何内容说仅显示过去的某个小时。 - Deep

19

虽然这是一个老问题,但我最近遇到了同样的问题。

如果你想获取task_id,可以按照以下方式操作:

import celery
from celery_app import add
from celery import uuid

task_id = uuid()
result = add.apply_async((2, 2), task_id=task_id)

现在你完全了解task_id是什么,并且可以使用它来获取AsyncResult:

# grab the AsyncResult 
result = celery.result.AsyncResult(task_id)

# print the task id
print result.task_id
09dad9cf-c9fa-4aee-933f-ff54dae39bdf

# print the AsyncResult's status
print result.status
SUCCESS

# print the result returned 
print result.result
4

8
完全不必创建自己的任务ID并将其传递给apply_asyncapply_async返回的对象是一个AsyncResult对象,其中包含Celery生成的任务ID。 - Louis
1
请纠正我,但是基于某些输入生成UUID有时候是有用的,这样所有获取相同输入的调用都会得到相同的UUID。换句话说,有时候指定您的task_id是有用的。 - dstromberg
1
@dstromberg OP提出的问题是“如何检查任务状态”,而这里的答案是“如果你正在尝试获取task_id...”。既不需要检查任务状态,也不需要自己生成一个任务id来获取task_id。在您的评论中,您想象了一个超越“如何检查任务状态”和“如果您正在尝试获取task_id...” 的原因。如果您有此需求,则很好,但在此情况下并非如此。(此外,使用uuid()生成任务ID绝对不会比Celery默认情况做得更多。) - Louis
我同意OP并没有明确询问如何获得可预测的任务ID,但是目前对于OP问题的答案是“跟踪任务ID并执行x”。在我看来,在各种情况下跟踪任务ID都是不切实际的,因此该答案可能并不令人满意。正如@dstromberg指出的那样,无论是否出于这个原因,这个答案帮助我解决了我的用例(如果我能克服其他已知的限制)。 - claytond

18

您也可以创建自定义状态并在任务执行期间更新其值。 这个例子来自文档:

@app.task(bind=True)
def upload_files(self, filenames):
    for i, file in enumerate(filenames):
        if not self.request.called_directly:
            self.update_state(state='PROGRESS',
                meta={'current': i, 'total': len(filenames)})

http://celery.readthedocs.org/en/latest/userguide/tasks.html#custom-states


14

只需使用来自celery FAQ的此API即可。

result = app.AsyncResult(task_id)

这个很好用。


谢谢,这对我来说真是救命稻草!我们的Celery + Jobtastic任务之前在Celery 3.x中运行正常(result = AsyncResult(task_id)),但在Celery 4.x中不再工作(result = DownloadFileTask.AsyncResult(task_id))。显然,现在需要引用任务类,以便正确引导CELERY_RESULT_BACKEND。否则,由于某种原因,将使用DisabledBackend - Ranel Padon
更新:使用 celery_app.set_default() 也可以,而且更简单(即无需调整现有任务调用),因为它会自动绑定完全配置/引导的应用程序作为默认值,包括在隔离的 AsyncResult() 调用中。 - Ranel Padon

6

2020年的答案:

#### tasks.py
@celery.task()
def mytask(arg1):
    print(arg1)

#### blueprint.py
@bp.route("/args/arg1=<arg1>")
def sleeper(arg1):
    process = mytask.apply_async(args=(arg1,)) #mytask.delay(arg1)
    state = process.state
    return f"Thanks for your patience, your job {process.task_id} \
             is being processed. Status {state}"

0
  • 首先,在你的Celery APP中:

vi my_celery_apps/app1.py

app = Celery(worker_name)

接下来,切换到任务文件,在你的 celery 应用模块中导入 app。
vi tasks/task1.py
from my_celery_apps.app1 import app

app.AsyncResult(taskid)

try:
   if task.state.lower() != "success":
        return
except:
    """ do something """



0

尝试:

task.AsyncResult(task.request.id).state

这将提供Celery任务状态。如果Celery任务已经处于FAILURE状态,它将抛出异常:

引发意外错误:KeyError('exc_type',)


0

我在

Celery项目工作人员指南检查工人

中找到了有用的信息。

对于我的情况,我正在检查Celery是否正在运行。

inspect_workers = task.app.control.inspect()
if inspect_workers.registered() is None:
    state = 'FAILURE'
else:
    state = str(task.state) 

你可以使用检查工具来满足你的需求。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接