检测Celery是否可用/运行

68
我正在使用Celery管理异步任务。偶尔,Celery进程会崩溃,导致无法执行任何任务。我想检查Celery的状态,确保一切正常运行,如果检测到问题,向用户显示错误消息。根据Celery Worker文档,我似乎可以使用pinginspect来实现这一点,但ping感觉像是 hacky的方式,并且不清楚inspect应该如何使用(如果 inspect().registered() 为空?)。
对此的任何指导将不胜感激。基本上,我需要一个像这样的方法:
def celery_is_alive():
    from celery.task.control import inspect
    return bool(inspect().registered()) # is this right??

编辑:看起来 celery 2.3.3 上甚至没有 registered() 方法(即使 2.1 文档中列出了它)。也许 ping 是正确的答案。

编辑:Ping 看起来也不是我想象中的那样,所以仍然不确定答案。


以下的答案对您无效吗?作为一个有类似问题需要解决的人,我很希望得到一些确认。 - kojiro
3
我知道这是一个老问题,但你能否详细说明为什么“ping”不是答案?看起来,“ping”恰好是正确的答案,它简单地回应“pong”,表明工作进程还在运行。 - Tim Tisdall
10个回答

66

这是我一直在使用的代码。 celery.task.control.Inspect.stats() 返回一个包含当前可用 workers 的许多详细信息的字典,如果没有运行任何 worker,则返回 None,或者如果无法连接到消息代理则引发 IOError。我正在使用 RabbitMQ - 其他消息系统可能会略有不同。这适用于 Celery 2.3.x 和 2.4.x;我不确定它可以向后兼容多久。

def get_celery_worker_status():
    ERROR_KEY = "ERROR"
    try:
        from celery.task.control import inspect
        insp = inspect()
        d = insp.stats()
        if not d:
            d = { ERROR_KEY: 'No running Celery workers were found.' }
    except IOError as e:
        from errno import errorcode
        msg = "Error connecting to the backend: " + str(e)
        if len(e.args) > 0 and errorcode.get(e.args[0]) == 'ECONNREFUSED':
            msg += ' Check that the RabbitMQ server is running.'
        d = { ERROR_KEY: msg }
    except ImportError as e:
        d = { ERROR_KEY: str(e)}
    return d

10
我发现每次运行上述代码都会向rabbitmq添加两个reply.celery.pidbox队列,这导致了rabbitmq内存使用量的递增。 - kojiro
只是为了完整性,您可以使用以下命令来检查调度程序是否已启动:sudo service celerybeatd status。 - Adarsh
2
这对我没有用。使用Redis作为代理时,当Redis不可用且没有Celery工作程序运行时,“insp.stats()”会阻塞。 - Giannis
1
这对我不起作用。我猜Celery的API已经改变了(现在是4.2版本)。 - Ouss
对于4.2及以上版本,您可以查看https://dev59.com/ZGoy5IYBdhLWcg3wguSS。 - Compro Prasad

22

来自celery 4.2的文档:

from your_celery_app import app


def get_celery_worker_status():
    i = app.control.inspect()
    availability = i.ping()
    stats = i.stats()
    registered_tasks = i.registered()
    active_tasks = i.active()
    scheduled_tasks = i.scheduled()
    result = {
        'availability': availability,
        'stats': stats,
        'registered_tasks': registered_tasks,
        'active_tasks': active_tasks,
        'scheduled_tasks': scheduled_tasks
    }
    return result

当然,您可以/应该通过错误处理来改进代码...


3
仅需检查可用性,还可以使用 i.ping(),在失败时返回 None - Tim Tisdall
2
谢谢Tim。我将“可用性”添加到了函数中。 - Ouss

13

7
以下内容适用于我:

以下内容适用于我:

import socket
from kombu import Connection

celery_broker_url = "amqp://localhost"

try:
    conn = Connection(celery_broker_url)
    conn.ensure_connection(max_retries=3)
except socket.error:
    raise RuntimeError("Failed to connect to RabbitMQ instance at {}".format(celery_broker_url))

5
只要 RabbitMQ 在运行,我相信这个任务可以成功完成,而 Celery 的状态并不影响它。但是如果 Celery 失败了,进行这个检查是一个很好的办法,以确定是 RabbitMQ 出了问题还是其他原因导致的失败。 - Tim Tisdall
使用Redis作为后端进行测试,即使Redis没有运行,这将返回一个有效的连接对象。 - jwadsack

6

测试任何一个工作人员是否响应的一种方法是发送“ping”广播并在第一次响应成功后返回。

from .celery import app  # the celery 'app' created in your project

def is_celery_working():
    result = app.control.broadcast('ping', reply=True, limit=1)
    return bool(result)  # True if at least one result

这将广播一个“ping”,并等待最多一秒钟以获取响应。 一旦收到第一个响应,它将返回结果。 如果您想更快地获得False结果,则可以添加timeout参数以缩短等待时间。


3
我找到了一个优雅的解决方案:
from .celery import app
try:
    app.broker_connection().ensure_connection(max_retries=3)
except Exception as ex:
    raise RuntimeError("Failed to connect to celery broker, {}".format(str(ex)))

如其他答案所述,我认为这只告诉您代理正在工作,而不是工人是否在运行。 - Tim Tisdall

2

1

您可以通过运行以下命令在终端上进行测试。

celery -A proj_name worker -l INFO

您可以随时查看您的celery运行情况。


1

运行 celery status 以获取状态。

当 celery 在运行时,

(venv) ubuntu@server1:~/project-dir$ celery status
->  celery@server1: OK

1 node online.

当没有celery worker在运行时,终端会显示以下信息。

(venv) ubuntu@server1:~/project-dir$ celery status
Error: No nodes replied within time constraint

1
以下脚本对我有效。
    #Import the celery app from project
    from application_package import app as celery_app
    def get_celery_worker_status():
        insp = celery_app.control.inspect()
        nodes = insp.stats()
        if not nodes:
            raise Exception("celery is not running.")
        logger.error("celery workers are: {}".format(nodes))
        return nodes

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接