Python Celery 监控事件未被触发。

3

我有一个项目目录:

azima:
    __init.py
    main.py
    tasks.py
    monitor.py

tasks.py

from .main import app

@app.task
def add(x, y):
    return x + y

@app.task
def mul(x, y):
    return x * y

@app.task
def xsum(numbers):
    return sum(numbers)

main.py

from celery import Celery

app = Celery('azima', backend='redis://localhost:6379/0', broker='redis://localhost:6379/0', include=['azima.tasks'])

# Optional configuration, see the application user guide.
app.conf.update(
    result_expires=3600,
)

if __name__ == '__main__':
    app.start()

monitor.py

from .main import app

def my_monitor(app):
    state = app.events.State()

    def announce_failed_tasks(event):
        state.event(event)
        task = state.tasks.get(event['uuid'])

        print(f'TASK FAILED: {task.name}[{task.uuid}]')

    def announce_succeeded_tasks(event):
        print('task succeeded')
        state.event(event)
        task = state.tasks.get(event['uuid'])

        print(f'TASK SUCCEEDED: {task.name}[{task.uuid}]')

    def worker_online_handler(event):
        state.event(event)
        print("New worker gets online")
        print(event['hostname'], event['timestamp'], event['freq'], event['sw_ver'])

    with app.connection() as connection:
        recv = app.events.Receiver(connection, handlers={
                'task-failed': announce_failed_tasks,
                'task-succeeded': announce_succeeded_tasks,
                'worker-online': worker_online_handler,
                '*': state.event,
        })
        recv.capture(limit=None, timeout=None, wakeup=True)

if __name__ == '__main__':
    # app = Celery('azima')
    my_monitor(app)

启动Celery工作进程,使用

celery -A azima.main worker -l INFO

并使用以下命令启动monitor.py

python -m azima.monitor

只有worker-online事件被触发,而其他事件如task-succeeded没有被触发或处理。

输入图像描述

我错过了什么?

3个回答

3

使用CLI选项-E--task-events启用工人任务组事件,并尝试捕获所有事件:

def my_monitor(app):
    def on_event(event):
        print("Event.type", event.get('type'))

    with app.connection() as connection:
        recv = app.events.Receiver(connection, handlers={'*': on_event})
        recv.capture(limit=None, timeout=None, wakeup=True)

3

默认情况下,Celery工作者不会发送事件。然而,像大多数实用功能一样,它可以通过在配置中启用worker_send_task_events或使用-E标志运行Celery工作者来进行配置。


0

通过将您的代码与flowercode进行比较:

try_interval = 1
while True:
    try:
        try_interval *= 2

        with self.capp.connection() as conn:
            recv = EventReceiver(conn,
                                    handlers={"*": self.on_event},
                                    app=self.capp)
            try_interval = 1
            logger.debug("Capturing events...")
            recv.capture(limit=None, timeout=None, wakeup=True)
    except (KeyboardInterrupt, SystemExit):
        try:
            import _thread as thread
        except ImportError:
            import thread
        thread.interrupt_main()
    except Exception as e:
        logger.error("Failed to capture events: '%s', "
                        "trying again in %s seconds.",
                        e, try_interval)
        logger.debug(e, exc_info=True)
        time.sleep(try_interval)

有两个不同之处:

  1. EventReceiver 中缺少 celery 的 app
  2. 无限循环 (while True),尽管我认为 capture 方法是阻塞并等待事件的,而循环只是为了防止出现错误。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接