在Celery中获取队列中任务列表

216

我该如何检索一个队列中尚未处理的任务列表?


3
RabbitMQ,但我想在Python中检索此列表。 - bradley.ayers
19个回答

240

编辑:查看其他答案获取队列中任务的列表。

你应该看这里:Celery指南-检查工作进程

基本上就是这样:

my_app = Celery(...)

# Inspect all nodes.
i = my_app.control.inspect()

# Show the items that have an ETA or are scheduled for later processing
i.scheduled()

# Show tasks that are currently active.
i.active()

# Show tasks that have been claimed by workers
i.reserved()

根据您的需求而定


16
我试过了,但速度真的很慢(约1秒)。我在一个tornado应用程序中同步使用它来监控进度,所以必须要快。 - julienfr112
70
这不会返回队列中尚未处理的任务列表。 - Ed J
12
使用 i.reserved() 方法获取已排队任务的列表。 - dwitvliet
11
在指定 worker 时,我必须使用列表作为参数:inspect(['celery@Flatty'])。相较于 inspect(),速度大大提高。 - Adversus
13
这并没有回答问题。我不知道为什么这个答案被接受了... :) - DejanLekic
显示剩余9条评论

65

如果您正在使用Celery+Django,最简单的方法是直接从终端使用命令来检查任务,在您的虚拟环境中或者使用完整路径到Celery:

文档: http://docs.celeryproject.org/en/latest/userguide/workers.html?highlight=revoke#inspecting-workers

$ celery inspect reserved
$ celery inspect active
$ celery inspect registered
$ celery inspect scheduled

如果你正在使用Celery+RabbitMQ,你可以使用以下命令检查队列列表

更多信息https://linux.die.net/man/1/rabbitmqctl

$ sudo rabbitmqctl list_queues

17
如果您有一个明确定义的项目,您可以使用 celery -A my_proj inspect reserved - sashaboulouds
11
这句话的意思是:“这又没有回答问题。” - DejanLekic
我在这里是因为我的Celery服务器目前负载过重。这种方法并没有帮助,因为它只是卡住了。所以像app.control.inspect().active()这样的操作也没有用,正如另一个答案中所提到的。我只是想终止一些任务,让我的服务器恢复正常运行... - undefined

56

如果您正在使用RabbitMQ,请在终端中使用以下命令:

sudo rabbitmqctl list_queues

它将打印出队列列表以及挂起任务的数量。例如:

Listing queues ...
0b27d8c59fba4974893ec22d478a7093    0
0e0a2da9828a48bc86fe993b210d984f    0
10@torob2.celery.pidbox 0
11926b79e30a4f0a9d95df61b6f402f7    0
15c036ad25884b82839495fb29bd6395    1
celerey_mail_worker@torob2.celery.pidbox    0
celery  166
celeryev.795ec5bb-a919-46a8-80c6-5d91d2fcf2aa   0
celeryev.faa4da32-a225-4f6c-be3b-d8814856d1b6   0

右侧列中的数字是队列中任务的数量。在上面的例子中,Celery队列有166个待处理任务。


2
当我拥有 sudo 特权时,我很熟悉这个问题,但我希望一个非特权的系统用户也能够检查 - 有什么建议吗? - sage
此外,如果您想稍后处理该数字(例如用于统计),您可以通过 grep -e "^celery\s" | cut -f2 将其导出,以提取出 166 - jamesc

38
如果您不使用优先任务,那么如果您在使用Redis,这实际上是相当简单的。要获取任务计数:
redis-cli -h HOST -p PORT -n DATABASE_NUMBER llen QUEUE_NAME

但是,优先任务在Redis中使用不同的键,因此整个情况稍微复杂一些。整个情况是,您需要查询每个任务的优先级。在Python中(来自Flower项目),代码如下:

PRIORITY_SEP = '\x06\x16'
DEFAULT_PRIORITY_STEPS = [0, 3, 6, 9]


def make_queue_name_for_pri(queue, pri):
    """Make a queue name for redis

    Celery uses PRIORITY_SEP to separate different priorities of tasks into
    different queues in Redis. Each queue-priority combination becomes a key in
    redis with names like:

     - batch1\x06\x163 <-- P3 queue named batch1

    There's more information about this in Github, but it doesn't look like it 
    will change any time soon:

      - https://github.com/celery/kombu/issues/422

    In that ticket the code below, from the Flower project, is referenced:

      - https://github.com/mher/flower/blob/master/flower/utils/broker.py#L135

    :param queue: The name of the queue to make a name for.
    :param pri: The priority to make a name with.
    :return: A name for the queue-priority pair.
    """
    if pri not in DEFAULT_PRIORITY_STEPS:
        raise ValueError('Priority not in priority steps')
    return '{0}{1}{2}'.format(*((queue, PRIORITY_SEP, pri) if pri else
                                (queue, '', '')))


def get_queue_length(queue_name='celery'):
    """Get the number of tasks in a celery queue.

    :param queue_name: The name of the queue you want to inspect.
    :return: the number of items in the queue.
    """
    priority_names = [make_queue_name_for_pri(queue_name, pri) for pri in
                      DEFAULT_PRIORITY_STEPS]
    r = redis.StrictRedis(
        host=settings.REDIS_HOST,
        port=settings.REDIS_PORT,
        db=settings.REDIS_DATABASES['CELERY'],
    )
    return sum([r.llen(x) for x in priority_names])

如果你想得到一个实际的任务,你可以使用类似这样的东西:
redis-cli -h HOST -p PORT -n DATABASE_NUMBER lrange QUEUE_NAME 0 -1

接下来,您需要对返回的列表进行反序列化。在我的情况下,我能够使用以下内容完成此操作:

r = redis.StrictRedis(
    host=settings.REDIS_HOST,
    port=settings.REDIS_PORT,
    db=settings.REDIS_DATABASES['CELERY'],
)
l = r.lrange('celery', 0, -1)
pickle.loads(base64.decodestring(json.loads(l[0])['body']))

请注意,反序列化可能需要一些时间,并且您需要调整上述命令以适应不同的优先级。


在生产中使用后,我发现它会因Celery的设计问题而无法处理优先任务 - mlissner
2
我已更新上述内容以处理优先任务。 进展顺利! - mlissner
10
需要说明的是,默认使用的DATABASE_NUMBER0,而QUEUE_NAMEcelery,因此redis-cli -n 0 llen celery将返回排队消息的数量。 - Vineet Bansal
2
它总是返回0。 - Mark Mishyn
2
我遇到的问题是:如果您撤销了在队列中等待的celery任务,它仍会留在redis队列中。而且lrange返回的任务数量不正确。 - Rom1
显示剩余2条评论

16

要从后端检索任务,请使用此

from amqplib import client_0_8 as amqp
conn = amqp.Connection(host="localhost:5672 ", userid="guest",
                       password="guest", virtual_host="/", insist=False)
chan = conn.channel()
name, jobs, consumers = chan.queue_declare(queue="queue_name", passive=True)

4
但是,“jobs”命令只会显示排队中的任务数量。 - bitnik
3
请参考 https://dev59.com/VG035IYBdhLWcg3wPNZk#57807913 中的相关答案,该答案提供了任务名称。 - Caleb Syring

15

一个带有JSON序列化的Redis复制粘贴解决方案:

def get_celery_queue_items(queue_name):
    import base64
    import json  

    # Get a configured instance of a celery app:
    from yourproject.celery import app as celery_app

    with celery_app.pool.acquire(block=True) as conn:
        tasks = conn.default_channel.client.lrange(queue_name, 0, -1)
        decoded_tasks = []

    for task in tasks:
        j = json.loads(task)
        body = json.loads(base64.b64decode(j['body']))
        decoded_tasks.append(body)

    return decoded_tasks

它可以与Django一起使用。只是不要忘记更改yourproject.celery


5
如果您正在使用pickle序列化程序,则可以将body =这行改为body = pickle.loads(base64.b64decode(j['body'])) - Jim Hunziker
我遇到了一个错误!模块 'celery.app' 没有 'pool' 属性。 - Sadegh-khan

12

这在我的应用程序中有效:

def get_queued_jobs(queue_name):
    connection = <CELERY_APP_INSTANCE>.connection()

    try:
        channel = connection.channel()
        name, jobs, consumers = channel.queue_declare(queue=queue_name, passive=True)
        active_jobs = []

        def dump_message(message):
            active_jobs.append(message.properties['application_headers']['task'])

        channel.basic_consume(queue=queue_name, callback=dump_message)

        for job in range(jobs):
            connection.drain_events()

        return active_jobs
    finally:
        connection.close()

active_jobs 将是一个字符串列表,对应于队列中的任务。

不要忘记将 CELERY_APP_INSTANCE 替换为您自己的。

感谢 @ashish 在这里的回答指引我朝正确的方向前进:https://dev59.com/VG035IYBdhLWcg3wPNZk#19465670


@daveoncode,我认为这并不足以让我提供有帮助的回答。你可以开一个新问题。如果你明确指出你想用Python检索信息,我认为这不会是重复的问题。我会回到https://dev59.com/VG035IYBdhLWcg3wPNZk#19465670,这是我回答的基础,并确保它首先能够正常工作。 - Caleb Syring
2
@CalebSyring 这是第一个真正向我展示排队任务的方法。非常好。唯一的问题是,列表附加似乎不起作用。有什么想法可以让回调函数写入列表吗? - Varlor
@CalebSyring 这个回答太棒了!谢谢!你有什么办法可以只获取有限数量的结果吗?我有一个包含数百万个任务的队列,我想检查其中一些任务以查看里面有什么。我尝试更改“range”,但没有帮助。 - Sarang
dump_message 函数中,您可以检查 active_jobs 的长度,并且仅在该列表的元素少于您所需的数量时才附加任务。 - Helge Schneider
谢谢您的回答。有没有一种非破坏性的方法来实现这个?我的意思是,这对我来说是有效的,因为它返回了一个作业列表,但是“drain_events”方法似乎将队列中的作业转换为“完成”,因此它们对于工作者或以后的检查不可用。 - Owen
显示剩余6条评论

7

celery inspect模块似乎只能从工作进程的角度查看任务。如果您想查看队列中的消息(尚未被工作进程拉取),我建议使用pyrabbit,它可以与rabbitmq http api接口交互,从队列中检索各种信息。

此处可找到示例: 使用Celery(RabbitMQ,Django)检索队列长度


5

我认为获取等待的任务的唯一方法是保持一个已启动任务列表,并在任务启动后从列表中将其删除。

使用rabbitmqctl和list_queues,您可以获得等待任务的概述,但无法获取任务本身:http://www.rabbitmq.com/man/rabbitmqctl.1.man.html

如果您想要包括正在处理但尚未完成的任务,则可以保留您的任务列表并检查它们的状态:

from tasks import add
result = add.delay(4, 4)

result.ready() # True if finished

或者您可以让Celery使用CELERY_RESULT_BACKEND存储结果,并检查哪些任务没有在其中。


5
据我所知,Celery没有提供API来检查在队列中等待的任务。这取决于代理。例如,如果您使用Redis作为代理,则检查等待在默认队列 celery 中的任务就像以下简单步骤一样:
  1. 连接到代理
  2. 列出 celery 列表中的项目(例如LRANGE命令)
请记住,这些都是等待可用工人拾取的任务。您的群集可能有些任务正在运行 - 它们不会出现在此列表中,因为它们已经被拾取。
检索特定队列中的任务的过程取决于代理。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接