对此的任何指导将不胜感激。基本上,我需要一个像这样的方法:
def celery_is_alive():
from celery.task.control import inspect
return bool(inspect().registered()) # is this right??
编辑:看起来 celery 2.3.3 上甚至没有 registered() 方法(即使 2.1 文档中列出了它)。也许 ping 是正确的答案。
编辑:Ping 看起来也不是我想象中的那样,所以仍然不确定答案。