我有一个项目目录:
azima:
__init.py
main.py
tasks.py
monitor.py
tasks.py
from .main import app
@app.task
def add(x, y):
return x + y
@app.task
def mul(x, y):
return x * y
@app.task
def xsum(numbers):
return sum(numbers)
main.py
from celery import Celery
app = Celery('azima', backend='redis://localhost:6379/0', broker='redis://localhost:6379/0', include=['azima.tasks'])
# Optional configuration, see the application user guide.
app.conf.update(
result_expires=3600,
)
if __name__ == '__main__':
app.start()
monitor.py
from .main import app
def my_monitor(app):
state = app.events.State()
def announce_failed_tasks(event):
state.event(event)
task = state.tasks.get(event['uuid'])
print(f'TASK FAILED: {task.name}[{task.uuid}]')
def announce_succeeded_tasks(event):
print('task succeeded')
state.event(event)
task = state.tasks.get(event['uuid'])
print(f'TASK SUCCEEDED: {task.name}[{task.uuid}]')
def worker_online_handler(event):
state.event(event)
print("New worker gets online")
print(event['hostname'], event['timestamp'], event['freq'], event['sw_ver'])
with app.connection() as connection:
recv = app.events.Receiver(connection, handlers={
'task-failed': announce_failed_tasks,
'task-succeeded': announce_succeeded_tasks,
'worker-online': worker_online_handler,
'*': state.event,
})
recv.capture(limit=None, timeout=None, wakeup=True)
if __name__ == '__main__':
# app = Celery('azima')
my_monitor(app)
启动Celery工作进程,使用
celery -A azima.main worker -l INFO
并使用以下命令启动monitor.py
:
python -m azima.monitor
只有worker-online
事件被触发,而其他事件如task-succeeded
没有被触发或处理。
我错过了什么?