Celery / RabbitMQ - 查找未确认消息数量 - 未被确认的消息

4
我正在尝试弄清楚如何获取未确认消息的信息。它们存储在哪里?通过使用celery inspect,似乎一旦消息被确认,它会处理并且您可以跟踪状态。假设您有结果后端,那么您可以看到它的结果。但是从您应用延迟到它被确认的时间,它就处于黑洞中。
以下是需要回答的问题:
  1. noAcks 存储在哪里?
  2. 我如何查找 noAcks 列表的“深度”?换句话说,有多少个 noAcks,我的任务在列表中位于哪里。
虽然这与问题不完全相关,但这是我所涉及的内容。
from celery.app import app_or_default

app = app_or_default()
inspect = app.control.inspect()

# Now if I want "RECEIVED" jobs.. 
data = inspect.reserved()

# or "ACTIVE" jobs.. 
data = inspect.active()

# or "REVOKED" jobs.. 
data = inspect.revoked()

# or scheduled jobs.. (Assuming these are time based??)
data = inspect.scheduled()

# FILL ME IN FOR UNACK JOBS!!
# data = inspect.??

# This will never work for tasks that aren't in one of the above buckets..
pprint.pprint(inspect.query_task([tasks]))

非常感谢您在这方面提供的建议和帮助。

2个回答

4

它们是在inspect.reserved()中具有'acknowleged': False的任务。

from celery.app import app_or_default

app = app_or_default()
inspect = app.control.inspect()

# those that have been sent to a worker and are thus reserved
# from being sent to another worker, but may or may not be acknowledged as received by that worker
data = inspect.reserved()

{'celery.tasks': [{'acknowledged': False,
               'args': '[]',
               'delivery_info': {'exchange': 'tasks',
                                 'priority': None,
                                 'routing_key': 'celery'},
               'hostname': 'celery.tasks',
               'id': '527961d4-639f-4002-9dc6-7488dd8c8ad8',
               'kwargs': '{}',
               'name': 'globalapp.tasks.task_loop_tick',
               'time_start': None,
               'worker_pid': None},
              {'acknowledged': False,
               'args': '[]',
               'delivery_info': {'exchange': 'tasks',
                                 'priority': None,
                                 'routing_key': 'celery'},
               'hostname': 'celery.tasks',
               'id': '09d5b726-269e-48d0-8b0e-86472d795906',
               'kwargs': '{}',
               'name': 'globalapp.tasks.task_loop_tick',
               'time_start': None,
               'worker_pid': None},
              {'acknowledged': False,
               'args': '[]',
               'delivery_info': {'exchange': 'tasks',
                                 'priority': None,
                                 'routing_key': 'celery'},
               'hostname': 'celery.tasks',
               'id': 'de6d399e-1b37-455c-af63-a68078a9cf7c',
               'kwargs': '{}',
               'name': 'globalapp.tasks.task_loop_tick',
               'time_start': None,
               'worker_pid': None}],
 'fastlane.tasks': [],
 'images.tasks': [],
 'mailer.tasks': []}

很好 - 这个在哪里有文档记录? - rh0dium
我刚刚从你的代码开始,然后注意到未确认的任务在保留状态。昨天我发现如果安装celery 3.0.11,它会安装billiard 3.x,但实际上它与celery 3.0不兼容;因此任务没有被确认并且没有启动时间。billiard < 3是可以工作的。故事的寓意:始终锁定或冻结您的依赖关系。 - Chris Sattinger
太好了。谢谢。我不知道为什么以前没有看到或想到这个。你能确认一下这是否与Celery 3.1x(当前版本)兼容吗? - rh0dium
Celery是一个混乱的选项、文档、多种方法和许多版本的困惑。不知道它是否适用于3.1,抱歉 :) - Chris Sattinger

3
经过多个小时的审查,我得出结论:仅使用纯celery不可能实现。然而,可以松散地跟踪整个流程。以下是我用来查找未确认计数的代码。大部分可以使用celery中的工具完成。 然而,我仍然无法通过id查询底层未确认任务,但是... 如果您安装了RabbitMQ管理插件,则可以查询API。
    data = {}
    base_url = "http://localhost:55672"
    url = base_url + "/api/queues/{}/".format(vhost)
    req = requests.get(url, auth=(settings.RABBITMQ_USER, settings.RABBITMQ_PASSWORD))
    if req.status_code != 200:
        log.error(req.text)
    else:
        request_data = req.json()
        for queue in request_data:
            # TODO if we know what queue the task is then we can nail this.
            if queue.get('name') == "celery":
                data['state'] = "Unknown"
                if queue.get('messages'):
                    data['messages'] = queue.get('messages')
                    data['messages_ready'] = queue.get('messages_ready')
                    data['messages_unacknowledged'] = queue.get('messages_unacknowledged')
                break
    return data 

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接