Celery / RabbitMQ / Django无法运行任务

3

我希望有人可以帮助我,因为我在 Stack Overflow 上找不到解决我的问题的方法。我正在运行一个 Django 项目,并安装了 Supervisor、RabbitMQ 和 Celery。RabbitMQ 已经启动并运行,Supervisor 确保我的 celerybeat 在运行,但是虽然它记录了节拍已经开始并且每 5 分钟发送任务(见下文),但任务实际上从未执行:

我的 supervisor 程序配置:

[program:nrv_twitter]
; Set full path to celery program if using virtualenv
command=/Users/tsantor/.virtualenvs/nrv_env/bin/celery beat -A app --loglevel=INFO --pidfile=/tmp/nrv-celerybeat.pid --schedule=/tmp/nrv-celerybeat-schedule

; Project dir
directory=/Users/tsantor/Projects/NRV/nrv

; Logs
stdout_logfile=/Users/tsantor/Projects/NRV/nrv/logs/celerybeat_twitter.log
redirect_stderr=true

autorestart=true
autostart=true
startsecs=10
user=tsantor

; if rabbitmq is supervised, set its priority higher so it starts first
priority=999

以下是上述程序的日志输出内容:
[2014-12-16 20:29:42,293: INFO/MainProcess] beat: Starting...
[2014-12-16 20:34:08,161: INFO/MainProcess] Scheduler: Sending due task gettweets-every-5-mins (twitter.tasks.get_tweets)
[2014-12-16 20:39:08,186: INFO/MainProcess] Scheduler: Sending due task gettweets-every-5-mins (twitter.tasks.get_tweets)
[2014-12-16 20:44:08,204: INFO/MainProcess] Scheduler: Sending due task gettweets-every-5-mins (twitter.tasks.get_tweets)
[2014-12-16 20:49:08,205: INFO/MainProcess] Scheduler: Sending due task gettweets-every-5-mins (twitter.tasks.get_tweets)
[2014-12-16 20:54:08,223: INFO/MainProcess] Scheduler: Sending due task gettweets-every-5-mins (twitter.tasks.get_tweets)

这是我的celery.py设置文件:
from datetime import timedelta

BROKER_URL = 'amqp://guest:guest@localhost//'

CELERY_DISABLE_RATE_LIMITS = True

CELERYBEAT_SCHEDULE = {
    'gettweets-every-5-mins': {
        'task': 'twitter.tasks.get_tweets',
        'schedule': timedelta(seconds=300) # 300 = every 5 minutes
    },
}

这里是我的celeryapp.py文件:
from __future__ import absolute_import
import os
from django.conf import settings
from celery import Celery

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'app.settings')

app = Celery('app')
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

这是我的twitter/tasks.py文件:
from __future__ import absolute_import
import logging
from celery import shared_task
from twitter.views import IngestTweets

log = logging.getLogger('custom.log')

@shared_task
def get_tweets():
    """
    Get tweets and save them to the DB
    """
    instance = IngestTweets()
    IngestTweets.get_new_tweets(instance)

    log.info('Successfully ingested tweets via celery task')
    return True
< p > get_tweets 方法从未被执行,但我知道它能够正常运行,因为我可以手动执行 get_tweets 并得到正确的结果。

我已经花费两天的时间尝试弄清楚为什么它会发送由任务,但却不执行它们?任何帮助都将不胜感激。提前致以感谢。


你有一个运行中的celery工作程序吗? - user2097159
是的,正如我之前提到的,我有一个通过supervisor管理的celery beat正在运行。从上面的日志中可以看出,它每5分钟运行一次。我在上面添加了更多的信息以提供更多的上下文。 - tsantor
等等,你是说我不仅需要使用supervisor运行celery beat进程,还需要添加另一个supervisor程序来确保celery worker也在运行? - tsantor
2个回答

2

感谢user2097159指引我正确的方向,我不知道我还必须使用supervisor运行一个worker。我以为只需要一个worker或者一个beat,但现在我明白了,我必须有一个worker来处理任务和一个beat定期触发任务。

下面是缺失的supervisor worker配置:

[program:nrv_celery_worker]
; Worker
command=/Users/tsantor/.virtualenvs/nrv_env/bin/celery worker -A app --loglevel=INFO

; Project dir
directory=/Users/tsantor/Projects/NRV/nrv

; Logs
stdout_logfile=/Users/tsantor/Projects/NRV/nrv/logs/celery_worker.log
redirect_stderr=true

autostart=true
autorestart=true
startsecs=10
user=tsantor
numprocs=1

; Need to wait for currently executing tasks to finish at shutdown.
; Increase this if you have very long running tasks.
stopwaitsecs = 600

; When resorting to send SIGKILL to the program to terminate it
; send SIGKILL to its whole process group instead,
; taking care of its children as well.
killasgroup=true

; if rabbitmq is supervised, set its priority higher
; so it starts first
priority=998

我随后重置了RabbitMQ队列。现在,通过supervisor管理beat和worker程序,一切都按预期运行。希望这能帮助其他人。


0

您需要同时启动一个 worker 进程和一个 beat 进程。您可以创建单独的进程如 tsantor 回答中所述,或者您可以创建一个带有 worker 和 beat 的单个进程。这在开发过程中可能更方便(但不建议用于生产)。

Celery 文档中的 "Starting the scheduler"

您也可以通过启用 workers 的 -B 选项将 beat 嵌入到 worker 中,如果您永远不会运行多个 worker 节点,那么这很方便,但是它不常用,因此不建议用于生产环境:

$ celery -A proj worker -B

有关在 Supervisor 配置文件中的表达式,请参见 https://github.com/celery/celery/tree/master/extra/supervisord/(从 "Daemonization" 链接)


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接