如何检查Celery中的任务是否正在运行(具体而言,我正在使用celery-django)?
我已阅读文档并进行了谷歌搜索,但我没有找到类似这样的调用:
my_example_task.state() == RUNNING
我的使用情况是我有一个外部(java)转码服务。当我发送要进行转码的文档时,我希望检查运行该服务的任务是否正在运行,如果没有运行,则重新启动它。
我正在使用当前稳定版本-我相信是2.4。
如何检查Celery中的任务是否正在运行(具体而言,我正在使用celery-django)?
我已阅读文档并进行了谷歌搜索,但我没有找到类似这样的调用:
my_example_task.state() == RUNNING
我的使用情况是我有一个外部(java)转码服务。当我发送要进行转码的文档时,我希望检查运行该服务的任务是否正在运行,如果没有运行,则重新启动它。
我正在使用当前稳定版本-我相信是2.4。
返回任务ID(此ID由.delay()提供),然后向Celery实例查询状态:
x = method.delay(1,2)
print x.task_id
在提问时,使用此 task_id
获取一个新的异步结果:
from celery.result import AsyncResult
res = AsyncResult("your-task-id")
res.ready()
x
呢? - MarcinAsyncResult(id)
和task.AsyncResult(id)
有什么不同?x
是指任务吗?你的意思是res.state
会引发错误吗?后端配置是什么?我调用了async_result = run_instance.s(4).apply_async()
,然后稍后使用res = async_result.get() except..
- 我是否失去了配置?抱歉,感到非常迷茫,想知道这是否可以澄清一些困惑(例如:https://dev59.com/f5Pfa4cB1Zd3GeqPFJdh和这个`get()`调用在异常情况下很少消失)。谢谢。 - Chrisasync_result
的实例化。在您的用例中,您已经拥有了该实例,可以直接使用。但是如果您只有任务ID,并且需要实例化一个async_result
实例才能调用async_result.get()
,该怎么办?这是AsyncResult
类的一个实例,但您不能使用原始类celery.result.AsyncResult
,您需要从由app.task()
包装的函数中获取该类。在您的情况下,您将执行async_result = run_instance.AsyncResult('task-id')
。 - ArnauOrriolsAsyncResult
对象是FAQ中推荐的获取任务状态的方法,当你只拥有任务ID时。
然而,在Celery 3.x中,存在一些重要的注意事项,如果不注意可能会出现问题。这确实取决于具体的用例场景。
为了让Celery记录任务正在运行,你必须将task_track_started
设置为True
。这里有一个简单的任务来测试这个功能:
@app.task(bind=True)
def test(self):
print self.AsyncResult(self.request.id).state
task_track_started
为False
时,即默认情况下,状态显示为PENDING
,即使任务已经开始。如果将task_track_started
设置为True
,则状态将变为STARTED
。
PENDING
的意思是“我不知道”。状态为PENDING
的AsyncResult
并没有比Celery不知道任务状态更多的含义。这可能是由于许多原因造成的。
首先,AsyncResult
可以使用无效的任务ID构建。这样的“任务”将被Celery视为待处理:
>>> task.AsyncResult("invalid").status
'PENDING'
好的,显然没有人会向 AsyncResult
提供无效的id。这很公平,但它也有一个影响,即AsyncResult
会认为Celery已经忘记了一个成功运行的任务,而将其视为PENDING
。在一些使用场景中,这可能是个问题。问题的一部分取决于Celery如何配置以保留任务结果,因为它依赖于结果后端中“墓碑”的可用性。(“墓碑”是Celery文档中用于记录任务结束方式的数据块的术语。)如果task_ignore_result
设置为True
,则根本无法使用AsyncResult
。更棘手的问题是Celery默认情况下会过期“墓碑”。result_expires
设置默认为24小时。因此,如果您启动一个任务,并将其ID记录到长期存储中,24小时后再创建一个AsyncResult
,状态将为PENDING
。
CELERY_TRACK_STARTED
, CELERY_IGNORE_RESULT
, CELERY_TASK_RESULT_EXPIRES
。每个Task
对象都有一个.request
属性,其中包含它的AsyncRequest
对象。因此,下面这行代码可以得到一个任务task
的状态:
task.AsyncResult(task.request.id).state
虽然这是一个老问题,但我最近遇到了同样的问题。
如果你想获取task_id,可以按照以下方式操作:
import celery
from celery_app import add
from celery import uuid
task_id = uuid()
result = add.apply_async((2, 2), task_id=task_id)
现在你完全了解task_id是什么,并且可以使用它来获取AsyncResult:
# grab the AsyncResult
result = celery.result.AsyncResult(task_id)
# print the task id
print result.task_id
09dad9cf-c9fa-4aee-933f-ff54dae39bdf
# print the AsyncResult's status
print result.status
SUCCESS
# print the result returned
print result.result
4
apply_async
。 apply_async
返回的对象是一个AsyncResult
对象,其中包含Celery生成的任务ID。 - Louistask_id
。在您的评论中,您想象了一个超越“如何检查任务状态”和“如果您正在尝试获取task_id...” 的原因。如果您有此需求,则很好,但在此情况下并非如此。(此外,使用uuid()
生成任务ID绝对不会比Celery默认情况做得更多。) - Louis您也可以创建自定义状态并在任务执行期间更新其值。 这个例子来自文档:
@app.task(bind=True)
def upload_files(self, filenames):
for i, file in enumerate(filenames):
if not self.request.called_directly:
self.update_state(state='PROGRESS',
meta={'current': i, 'total': len(filenames)})
http://celery.readthedocs.org/en/latest/userguide/tasks.html#custom-states
result = AsyncResult(task_id)
),但在Celery 4.x中不再工作(result = DownloadFileTask.AsyncResult(task_id)
)。显然,现在需要引用任务类,以便正确引导CELERY_RESULT_BACKEND
。否则,由于某种原因,将使用DisabledBackend
。 - Ranel Padoncelery_app.set_default()
也可以,而且更简单(即无需调整现有任务调用),因为它会自动绑定完全配置/引导的应用程序作为默认值,包括在隔离的 AsyncResult()
调用中。 - Ranel Padon2020年的答案:
#### tasks.py
@celery.task()
def mytask(arg1):
print(arg1)
#### blueprint.py
@bp.route("/args/arg1=<arg1>")
def sleeper(arg1):
process = mytask.apply_async(args=(arg1,)) #mytask.delay(arg1)
state = process.state
return f"Thanks for your patience, your job {process.task_id} \
is being processed. Status {state}"
vi my_celery_apps/app1.py
app = Celery(worker_name)
from my_celery_apps.app1 import app
app.AsyncResult(taskid)
try:
if task.state.lower() != "success":
return
except:
""" do something """
尝试:
task.AsyncResult(task.request.id).state
引发意外错误:KeyError('exc_type',)
我在
中找到了有用的信息。
对于我的情况,我正在检查Celery是否正在运行。
inspect_workers = task.app.control.inspect()
if inspect_workers.registered() is None:
state = 'FAILURE'
else:
state = str(task.state)
你可以使用检查工具来满足你的需求。