Celery(Redis)结果后端不起作用

23

我有一个使用Django的Web应用程序,我正在使用Celery进行一些异步任务处理。

对于Celery,我使用Rabbitmq作为代理,并使用Redis作为结果后端。

Rabbitmq和Redis运行在同一台Ubuntu 14.04服务器上,托管在本地虚拟机上。

Celery工作者正在远程计算机(Windows 10)上运行(没有工作者在Django服务器上运行)。

我有三个问题(我认为它们某种程度上是相关的!):

  1. 无论任务是否成功或失败,任务都保持“PENDING”状态。
  2. 任务失败时不会重试。尝试重试时,我收到以下错误:

reject requeue=False: [WinError 10061] No connection could be made because the target machine actively refused it

  1. 结果后端似乎没起作用。

我对我的设置也感到困惑,不确定这些问题确切来自哪里!

因此,这是我的设置:

my_app/settings.py

# region Celery Settings
CELERY_CONCURRENCY = 1
CELERY_ACCEPT_CONTENT = ['json']
# CELERY_RESULT_BACKEND = 'redis://:C@pV@lue2016@cvc.ma:6379/0'
BROKER_URL = 'amqp://soufiaane:C@pV@lue2016@cvc.ma:5672/cvcHost'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TASK_SERIALIZER = 'json'
CELERY_ACKS_LATE = True
CELERYD_PREFETCH_MULTIPLIER = 1

CELERY_REDIS_HOST = 'cvc.ma'
CELERY_REDIS_PORT = 6379
CELERY_REDIS_DB = 0
CELERY_RESULT_BACKEND = 'redis'
CELERY_RESULT_PASSWORD = "C@pV@lue2016"
REDIS_CONNECT_RETRY = True

AMQP_SERVER = "cvc.ma"
AMQP_PORT = 5672
AMQP_USER = "soufiaane"
AMQP_PASSWORD = "C@pV@lue2016"
AMQP_VHOST = "/cvcHost"
CELERYD_HIJACK_ROOT_LOGGER = True
CELERY_HIJACK_ROOT_LOGGER = True
CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler'
# endregion

我的应用程序/celery_settings.py

from __future__ import absolute_import
from django.conf import settings
from celery import Celery
import django
import os

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'my_app.settings')
django.setup()
app = Celery('CapValue', broker='amqp://soufiaane:C@pV@lue2016@cvc.ma/cvcHost', backend='redis://:C@pV@lue2016@cvc.ma:6379/0')

# Using a string here means the worker will not have to
# pickle the object when using Windows.
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)


@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))

my_app__init__.py

from __future__ import absolute_import

# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.

from .celery_settings import app as celery_app

我的应用程序\电子邮件\tasks.py

from __future__ import absolute_import
from my_app.celery_settings import app

# here i only define the task skeleton because i'm executing this task on remote workers !
@app.task(name='email_task', bind=True, max_retries=3, default_retry_delay=1)
def email_task(self, job, email):
    try:
        print("x")
    except Exception as exc:
        self.retry(exc=exc)

在工人方面,我有一个名为“tasks.py”的文件,其中包含任务的实际实现:

工人\tasks.py

from __future__ import absolute_import
from celery.utils.log import get_task_logger
from celery import Celery


logger = get_task_logger(__name__)
app = Celery('CapValue', broker='amqp://soufiaane:C@pV@lue2016@cvc.ma/cvcHost', backend='redis://:C@pV@lue2016@cvc.ma:6379/0')

@app.task(name='email_task', bind=True, max_retries=3, default_retry_delay=1)
def email_task(self, job, email):
    try:
        """
        The actual implementation of the task
        """
    except Exception as exc:
        self.retry(exc=exc)

然而我注意到的是:

  • 当我将工作进程中的代理设置更改为错误密码时,会出现无法连接到代理的错误。
  • 当我将工作进程中结果后端的设置更改为错误密码时,它会正常运行,好像一切都没问题。

什么可能导致这些问题?

编辑

在我的Redis服务器上,我已经启用了远程连接

/etc/redis/redis.conf

... bind 0.0.0.0 ...


2
看起来你的结果后端配置不正确。 - scytale
@scytale 这样怎么样? - Soufiaane
因为看起来是这样的。尝试从Django和Celery服务器查询Redis。我不熟悉如何配置Django/Celery - 你在my_app/settings.pymy_app/celery_settings.py中重复了Celery配置,并且你没有一个celeryconfig.py(这在独立的Celery中很常见)- 这是推荐的做法吗?你使用的是哪个文档? - scytale
@scytale 1 - 我从Django和Celery工作机器上运行以下命令: `>>> import redis
pool = redis.ConnectionPool(host='cvc.ma', port=6379, db=0, password='C@pV@lue2016') r = redis.Redis(connection_pool=pool) r.set('foo', 'bar') True` 因此,Redis配置似乎正常。 2 - 只有在将“my_app/settings.py”中的一些设置复制到“my_app/celery_settings.py”后,我才成功让Celery工作者和Django服务器一起工作。
- Soufiaane
你看到rabbitmq里的任务了吗?运行 sudo rabbitmqctl list_queues -p cvcHost。我认为结果后端似乎没有工作,因为你的任务没有完成。 - Adi Krishnan
显示剩余4条评论
3个回答

11

我猜测你的问题出在密码上。 你的密码中有 @ 符号,这可能会被解释为 user:passhost 之间的分隔符。

工作进入挂起状态是因为无法正确连接到代理。 来自Celery文档: http://docs.celeryproject.org/en/latest/userguide/tasks.html#pending

待定 任务正在等待执行或状态未知。任何未知的任务ID都被认为处于待定状态。


我为了测试目的已经将我的密码更改为简单密码,但仍然遇到了同样的问题。如果工作人员无法连接到代理服务器,那么我的任务会如何处理? - Soufiaane
让我们进行一些实验。请尝试从Celery实例中删除结果后端。尝试添加ignore_result = True。http://docs.celeryproject.org/en/latest/userguide/tasks.html#Task.ignore_result - Gal Ben David
抱歉,Gal Ben David,我离开工作了。所以我尝试使用 ignore_result = True 并删除结果后端。工作人员指出结果已被禁用,当我提交任务时尝试 task.state 时会出现错误:AttributeError: 'DisabledBackend' 对象没有 '_get_task_meta_for' 属性。 - Soufiaane
在这种情况下,“apply”方法不起作用,因为它等待任务执行完成并返回返回值,而此时该功能已被禁用。无论如何,尝试使用“apply_async”进行操作,并检查任务是否成功运行。如果是这种情况,显然问题与后端服务器有关,我们可以通过逐个隔离部分来从那一点继续前进。 - Gal Ben David
我没有使用apply方法,我一开始就使用了apply_async。 - Soufiaane
有点奇怪,当你等待结果并且有结果后端时,会出现这个错误。你能否发布整个代码,以便我可以在运行系统中尝试解决它? - Gal Ben David

3

我曾经搭建过一个环境,其中“单实例服务器”(即开发和本地主机服务器)可以正常工作,但当redis服务器位于另一台服务器上时就无法工作。celery任务可以正常工作,但获取结果时出现了以下错误:

Error 111 connecting to localhost:6379. Connection refused.

只需将该设置添加到Django中即可使其工作:
CELERY_RESULT_BACKEND = 'redis://10.10.10.10:6379/0'

如果不提供此参数,似乎默认使用本地主机来获取任务结果。


我看到了相同的错误,即使我的设置是这样的(我想使用redis),我得到了[2018-02-27 13:18:18,494:ERROR / MainProcess] consumer:无法连接到amqp:// guest:** @ 127.0.0.1:5672 //:[Errno 111]拒绝连接。28.00秒后再试... - ilhnctn
我正在使用docker和docker-compose,没有自定义防火墙或其他东西。同样的结构在另一个项目中可以工作,但在当前项目中不行。我可能错过了一些关键点,但不确定是什么。@mrmuggles - ilhnctn
啊,抱歉,看起来它仍在尝试连接到本地主机而不是远程服务器?此外,如果您使用redis,则配置文件中可能缺少其他设置,“CELERY_RESULT_BACKEND”不足。您还需要使用以下设置: BROKER_URL = 'redis://100.100.100.1000:6379/0'。同时,在初始化应用程序时:app = Celery('blcorp', backend='redis')。 - mrmuggles
谢谢您的回答,问题已经解决了,但我不确定之前出了什么问题 :) - ilhnctn

2

在我的设置中添加CELERY_STORE_ERRORS_EVEN_IF_IGNORED = True解决了我的问题。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接