我该如何检索一个队列中尚未处理的任务列表?
编辑:查看其他答案获取队列中任务的列表。
你应该看这里:Celery指南-检查工作进程
基本上就是这样:
my_app = Celery(...)
# Inspect all nodes.
i = my_app.control.inspect()
# Show the items that have an ETA or are scheduled for later processing
i.scheduled()
# Show tasks that are currently active.
i.active()
# Show tasks that have been claimed by workers
i.reserved()
根据您的需求而定
i.reserved()
方法获取已排队任务的列表。 - dwitvlietinspect(['celery@Flatty'])
。相较于 inspect()
,速度大大提高。 - Adversus如果您正在使用Celery+Django,最简单的方法是直接从终端使用命令来检查任务,在您的虚拟环境中或者使用完整路径到Celery:
文档: http://docs.celeryproject.org/en/latest/userguide/workers.html?highlight=revoke#inspecting-workers
$ celery inspect reserved
$ celery inspect active
$ celery inspect registered
$ celery inspect scheduled
如果你正在使用Celery+RabbitMQ,你可以使用以下命令检查队列列表:
更多信息:https://linux.die.net/man/1/rabbitmqctl
$ sudo rabbitmqctl list_queues
celery -A my_proj inspect reserved
。 - sashaboulouds如果您正在使用RabbitMQ,请在终端中使用以下命令:
sudo rabbitmqctl list_queues
它将打印出队列列表以及挂起任务的数量。例如:
Listing queues ...
0b27d8c59fba4974893ec22d478a7093 0
0e0a2da9828a48bc86fe993b210d984f 0
10@torob2.celery.pidbox 0
11926b79e30a4f0a9d95df61b6f402f7 0
15c036ad25884b82839495fb29bd6395 1
celerey_mail_worker@torob2.celery.pidbox 0
celery 166
celeryev.795ec5bb-a919-46a8-80c6-5d91d2fcf2aa 0
celeryev.faa4da32-a225-4f6c-be3b-d8814856d1b6 0
右侧列中的数字是队列中任务的数量。在上面的例子中,Celery队列有166个待处理任务。
grep -e "^celery\s" | cut -f2
将其导出,以提取出 166
。 - jamescredis-cli -h HOST -p PORT -n DATABASE_NUMBER llen QUEUE_NAME
但是,优先任务在Redis中使用不同的键,因此整个情况稍微复杂一些。整个情况是,您需要查询每个任务的优先级。在Python中(来自Flower项目),代码如下:
PRIORITY_SEP = '\x06\x16'
DEFAULT_PRIORITY_STEPS = [0, 3, 6, 9]
def make_queue_name_for_pri(queue, pri):
"""Make a queue name for redis
Celery uses PRIORITY_SEP to separate different priorities of tasks into
different queues in Redis. Each queue-priority combination becomes a key in
redis with names like:
- batch1\x06\x163 <-- P3 queue named batch1
There's more information about this in Github, but it doesn't look like it
will change any time soon:
- https://github.com/celery/kombu/issues/422
In that ticket the code below, from the Flower project, is referenced:
- https://github.com/mher/flower/blob/master/flower/utils/broker.py#L135
:param queue: The name of the queue to make a name for.
:param pri: The priority to make a name with.
:return: A name for the queue-priority pair.
"""
if pri not in DEFAULT_PRIORITY_STEPS:
raise ValueError('Priority not in priority steps')
return '{0}{1}{2}'.format(*((queue, PRIORITY_SEP, pri) if pri else
(queue, '', '')))
def get_queue_length(queue_name='celery'):
"""Get the number of tasks in a celery queue.
:param queue_name: The name of the queue you want to inspect.
:return: the number of items in the queue.
"""
priority_names = [make_queue_name_for_pri(queue_name, pri) for pri in
DEFAULT_PRIORITY_STEPS]
r = redis.StrictRedis(
host=settings.REDIS_HOST,
port=settings.REDIS_PORT,
db=settings.REDIS_DATABASES['CELERY'],
)
return sum([r.llen(x) for x in priority_names])
redis-cli -h HOST -p PORT -n DATABASE_NUMBER lrange QUEUE_NAME 0 -1
接下来,您需要对返回的列表进行反序列化。在我的情况下,我能够使用以下内容完成此操作:
r = redis.StrictRedis(
host=settings.REDIS_HOST,
port=settings.REDIS_PORT,
db=settings.REDIS_DATABASES['CELERY'],
)
l = r.lrange('celery', 0, -1)
pickle.loads(base64.decodestring(json.loads(l[0])['body']))
请注意,反序列化可能需要一些时间,并且您需要调整上述命令以适应不同的优先级。
DATABASE_NUMBER
是0
,而QUEUE_NAME
是celery
,因此redis-cli -n 0 llen celery
将返回排队消息的数量。 - Vineet Bansal要从后端检索任务,请使用此
from amqplib import client_0_8 as amqp
conn = amqp.Connection(host="localhost:5672 ", userid="guest",
password="guest", virtual_host="/", insist=False)
chan = conn.channel()
name, jobs, consumers = chan.queue_declare(queue="queue_name", passive=True)
一个带有JSON序列化的Redis复制粘贴解决方案:
def get_celery_queue_items(queue_name):
import base64
import json
# Get a configured instance of a celery app:
from yourproject.celery import app as celery_app
with celery_app.pool.acquire(block=True) as conn:
tasks = conn.default_channel.client.lrange(queue_name, 0, -1)
decoded_tasks = []
for task in tasks:
j = json.loads(task)
body = json.loads(base64.b64decode(j['body']))
decoded_tasks.append(body)
return decoded_tasks
它可以与Django一起使用。只是不要忘记更改yourproject.celery
。
body =
这行改为body = pickle.loads(base64.b64decode(j['body']))
。 - Jim Hunziker这在我的应用程序中有效:
def get_queued_jobs(queue_name):
connection = <CELERY_APP_INSTANCE>.connection()
try:
channel = connection.channel()
name, jobs, consumers = channel.queue_declare(queue=queue_name, passive=True)
active_jobs = []
def dump_message(message):
active_jobs.append(message.properties['application_headers']['task'])
channel.basic_consume(queue=queue_name, callback=dump_message)
for job in range(jobs):
connection.drain_events()
return active_jobs
finally:
connection.close()
active_jobs
将是一个字符串列表,对应于队列中的任务。
不要忘记将 CELERY_APP_INSTANCE 替换为您自己的。
感谢 @ashish 在这里的回答指引我朝正确的方向前进:https://dev59.com/VG035IYBdhLWcg3wPNZk#19465670
dump_message
函数中,您可以检查 active_jobs
的长度,并且仅在该列表的元素少于您所需的数量时才附加任务。 - Helge Schneidercelery inspect模块似乎只能从工作进程的角度查看任务。如果您想查看队列中的消息(尚未被工作进程拉取),我建议使用pyrabbit,它可以与rabbitmq http api接口交互,从队列中检索各种信息。
此处可找到示例: 使用Celery(RabbitMQ,Django)检索队列长度
我认为获取等待的任务的唯一方法是保持一个已启动任务列表,并在任务启动后从列表中将其删除。
使用rabbitmqctl和list_queues,您可以获得等待任务的概述,但无法获取任务本身:http://www.rabbitmq.com/man/rabbitmqctl.1.man.html
如果您想要包括正在处理但尚未完成的任务,则可以保留您的任务列表并检查它们的状态:
from tasks import add
result = add.delay(4, 4)
result.ready() # True if finished
或者您可以让Celery使用CELERY_RESULT_BACKEND存储结果,并检查哪些任务没有在其中。
celery
中的任务就像以下简单步骤一样:
celery
列表中的项目(例如LRANGE命令)