如何使用Python检查Celery/Supervisor是否正在运行

19
如何在Python中编写脚本以输出Celery是否在机器(Ubuntu)上运行?
我的用例是,我有一个简单的Python文件,其中包含一些任务。我没有使用Django或Flask。我使用supervisor来运行任务队列。例如,
tasks.py
from celery import Celery, task
app = Celery('tasks')
@app.task()
def add_together(a, b):
    return a + b

主管:

[program:celery_worker]
directory = /var/app/
command=celery -A tasks worker info

这一切都有效,我现在希望有一个页面来检查celery/supervisor进程是否正在运行。例如使用Flask编写类似下面的代码,使我可以托管页面并返回200状态,以实现负载均衡。

例如...

check_status.py

from flask import Flask

app = Flask(__name__)

@app.route('/')
def status_check():

    #check supervisor is running
    if supervisor:
         return render_template('up.html')
    else:
        return render_template('down.html')

if __name__ == '__main__':
    app.run()
7个回答

26

更新于09/2020: Jérôme在这里更新了针对Celery 4.3的答案: https://dev59.com/FlwX5IYBdhLWcg3wvRkf#57628025

你可以通过导入celery.bin.celery包,通过代码运行celery status命令:

import celery
import celery.bin.base
import celery.bin.celery
import celery.platforms

app = celery.Celery('tasks', broker='redis://')

status = celery.bin.celery.CeleryCommand.commands['status']()
status.app = status.get_app()

def celery_is_up():
    try:
        status.run()
        return True
    except celery.bin.base.Error as e:
        if e.status == celery.platforms.EX_UNAVAILABLE:
            return False
        raise e

if __name__ == '__main__':
    if celery_is_up():
        print('Celery up!')
    else:
        print('Celery not responding...')

4
今天我已经没有赞了,所以我通过评论来点赞这种方法。 - sobolevn
我认为app = celery.Celery('tasks', broker='redis://')这行代码是不必要的。app变量在其他地方没有被使用到。 - djromero
@djromero,自从我上次检查它的工作原理以来已经有一段时间了,但是如果我没记错的话,在库内部实例化单例应用程序是必需的。如果您在其他地方使用celery并且它创建了celery实例,则不需要这样做。 - vgel
@vgel 不错的解决方案。那Celery Beat怎么样?谢谢。 - Milano
谢谢您提供的解决方案,这里只有一个问题,status.run()是否返回任何值,您将如何确保它是否起作用。它是基于引发的异常吗? - sattva_venu

2

使用子进程,不确定这是否是一个好主意:

>>> import subprocess
>>> output = subprocess.check_output('ps aux'.split())
>>> 'supervisord' in output
True

2
你可以从supervisorctl status的输出中解析出进程状态
import subprocess

def is_celery_worker_running():
    ctl_output = subprocess.check_output('supervisorctl status celery_worker'.split()).strip()
    if ctl_output == 'unix:///var/run/supervisor.sock no such file':
        # supervisord not running
        return False
    elif ctl_output == 'No such process celery_worker':
        return False
    else:
        state = ctl_output.split()[1]
        return state == 'RUNNING'

2
受到@vgel的答案的启发,使用Celery 4.3.0。
import celery
import celery.bin.base
import celery.bin.control
import celery.platforms

# Importing Celery app from my own application
from my_app.celery import app as celery_app


def celery_running():
    """Test Celery server is available

    Inspired by https://dev59.com/FlwX5IYBdhLWcg3wvRkf#33545849
    """
    status = celery.bin.control.status(celery_app)
    try:
        status.run()
        return True
    except celery.bin.base.Error as exc:
        if exc.status == celery.platforms.EX_UNAVAILABLE:
            return False
        raise


if __name__ == '__main__':
    if celery_is_up():
        print('Celery up!')
    else:
        print('Celery not responding...')

2
这行代码: status = celery.bin.control.status(celery_app) 出现了错误: TypeError: 'Celery' object is not iterable - Anders_K
我正在使用版本5.0.5,但似乎文档消失了。 https://docs.celeryproject.org/en/v5.0.5/reference/celery.bin.control.html - Anders_K
你找到了5.0版本的解决方案吗? - tuuttuut

1

Supervisor提供了一个简洁的Web用户界面。您可以使用它,只需在supervisor配置中启用即可。关键是查找[inet_http_server]。

您甚至可以查看该部分的源代码,以获取实现自己想法的灵感。


0
这不适用于celery,但是对于任何想要检查supervisord是否正在运行的人,请检查您的supervisord.conf配置文件中为supervisord定义的pidfile是否存在。如果存在,则正在运行;如果不存在,则未运行。默认的pidfile是/tmp/supervisord.pid,这是我下面使用的内容。
import os
import sys

if os.path.isfile("/tmp/supervisord.pid"):
    print "supervisord is running."
    sys.exit()

-2
在我的经验中,我会设置一条消息来跟踪任务是否已完成,这样队列就可以负责重试任务。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接