Celery工作进程无任何错误地挂起

17

我有一个生产环境,用于运行Celery工人,以进行向远程服务进行POST/GET请求并存储结果的操作,它每15分钟处理约20,000个任务的负载。

问题在于工作人员无缘无故变得麻木,没有错误,也没有警告。

我尝试添加了多进程,但结果仍然相同。

在日志中,我看到执行任务的时间增加,就像s秒成功一样。

更多详细信息请参见https://github.com/celery/celery/issues/2621


“将响应存储在MongoDB中”这一部分是异步的吗?有相关的日志记录吗? - Kishor Pawar
4个回答

41
如果您的celery worker有时会卡住,您可以使用strace & lsof来找出它在哪个系统调用中卡住了。
例如:
$ strace -p 10268 -s 10000
Process 10268 attached - interrupt to quit
recvfrom(5,

10268是celery worker的进程ID,recvfrom(5)表示该worker在接收文件描述符的数据时停止。

然后可以使用lsof命令查看该worker进程中5代表什么。

lsof -p 10268
COMMAND   PID USER   FD   TYPE    DEVICE SIZE/OFF      NODE NAME
......
celery  10268 root    5u  IPv4 828871825      0t0       TCP 172.16.201.40:36162->10.13.244.205:wap-wsp (ESTABLISHED)
......

这表示工作进程在TCP连接上卡住了(您可以在FD列中看到5u)。

一些像requests这样的Python包会阻塞以等待来自对等方的数据,这可能会导致Celery工作进程挂起,如果您正在使用requests,请确保设置timeout参数。


您看过这个页面吗:

https://www.caktusgroup.com/blog/2013/10/30/using-strace-debug-stuck-celery-tasks/


5
我也遇到了这个问题,当我使用celery、kombu、amqp和billiard的delay shared_task时。在调用API后,当我使用delay()来执行@shared_task时,一切都正常,但是当它进入delay时,它就会挂起。

所以,问题出现在主应用程序的init.py文件中,缺少以下设置:

这将确保在Django启动时始终导入应用程序,以便shared_task将使用此应用程序。

init.py文件中:

from __future__ import absolute_import, unicode_literals

# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celeryApp

#__all__ = ('celeryApp',)
__all__ = ['celeryApp']
    

注意1:将celery_app替换为应用程序名称,即在celery.py中导入App并放置于此处。 注意2:如果仅遇到共享任务卡住的问题,则上述解决方案可能可解决您的问题,并忽略以下问题。 还要提及A=另一个问题,如果有人遇到111错误连接问题,请检查amqp==2.2.2、billiard==3.5.0.3、celery==4.1.0、kombu==4.1.0的版本是否支持。所述版本仅是示例。同时请检查系统是否安装了redis(如果使用了redis)。 另外,请确保您正在使用Kombu 4.1.0。在最新版本的Kombu中,async被重命名为asynchronous。

0
你应该将RabbitMQ的可用磁盘空间增加到配置的限制之上(disk_free_limit)。
另一个案例:RabbitMq磁盘空间不足: enter image description here 来自RabbitMq文档:
当可用磁盘空间低于配置的限制(默认为50 MB)时,将触发警报并阻止所有生产者。
有关更多信息,请参阅相关的Celery主题。
在我的情况下,Celery在recv函数上挂起。
File "/app/manage.py", line 32, in <module>
    execute_from_command_line(sys.argv)
File "/usr/local/lib/python3.10/site-packages/django/core/management/__init__.py", line 446, in execute_from_command_line
    utility.execute()
File "/usr/local/lib/python3.10/site-packages/django/core/management/__init__.py", line 398, in execute
    autoreload.check_errors(django.setup)()
File "/usr/local/lib/python3.10/site-packages/django/utils/autoreload.py", line 64, in wrapper
    fn(*args, **kwargs)
File "/usr/local/lib/python3.10/site-packages/django/__init__.py", line 24, in setup
    apps.populate(settings.INSTALLED_APPS)
File "/usr/local/lib/python3.10/site-packages/django/apps/registry.py", line 124, in populate
    app_config.ready()
File "/app/my_app/api_v1/apps.py", line 21, in ready
    process_data.delay()
File "/usr/local/lib/python3.10/site-packages/celery/app/task.py", line 444, in delay
    return self.apply_async(args, kwargs)
File "/usr/local/lib/python3.10/site-packages/sentry_sdk/integrations/celery.py", line 129, in apply_async
    return f(*args, **kwargs)
File "/usr/local/lib/python3.10/site-packages/celery/app/task.py", line 594, in apply_async
    return app.send_task(
File "/usr/local/lib/python3.10/site-packages/celery/app/base.py", line 798, in send_task
    amqp.send_task_message(P, name, message, **options)
File "/usr/local/lib/python3.10/site-packages/celery/app/amqp.py", line 554, in send_task_message
    evd.publish('task-sent', sent_event,
File "/usr/local/lib/python3.10/site-packages/celery/events/dispatcher.py", line 139, in publish
    return self._publish(event, producer,
File "/usr/local/lib/python3.10/site-packages/celery/events/dispatcher.py", line 146, in _publish
    producer.publish(
File "/usr/local/lib/python3.10/site-packages/kombu/messaging.py", line 186, in publish
    return _publish(
File "/usr/local/lib/python3.10/site-packages/kombu/connection.py", line 563, in _ensured
    return fun(*args, **kwargs)
File "/usr/local/lib/python3.10/site-packages/kombu/messaging.py", line 202, in _publish
    [maybe_declare(entity) for entity in declare]
File "/usr/local/lib/python3.10/site-packages/kombu/messaging.py", line 202, in <listcomp>
    [maybe_declare(entity) for entity in declare]
File "/usr/local/lib/python3.10/site-packages/kombu/messaging.py", line 107, in maybe_declare
    return maybe_declare(entity, self.channel, retry, **retry_policy)
File "/usr/local/lib/python3.10/site-packages/kombu/common.py", line 113, in maybe_declare
    return _maybe_declare(entity, channel)
File "/usr/local/lib/python3.10/site-packages/kombu/common.py", line 153, in _maybe_declare
    entity.declare(channel=channel)
File "/usr/local/lib/python3.10/site-packages/kombu/entity.py", line 184, in declare
    return (channel or self.channel).exchange_declare(
File "/usr/local/lib/python3.10/site-packages/amqp/channel.py", line 624, in exchange_declare
    self.send_method(
File "/usr/local/lib/python3.10/site-packages/amqp/abstract_channel.py", line 79, in send_method
    return self.wait(wait, returns_tuple=returns_tuple)
File "/usr/local/lib/python3.10/site-packages/amqp/abstract_channel.py", line 99, in wait
    self.connection.drain_events(timeout=timeout)
File "/usr/local/lib/python3.10/site-packages/amqp/connection.py", line 525, in drain_events
    while not self.blocking_read(timeout):
File "/usr/local/lib/python3.10/site-packages/amqp/connection.py", line 530, in blocking_read
    frame = self.transport.read_frame()
File "/usr/local/lib/python3.10/site-packages/amqp/transport.py", line 294, in read_frame
    frame_header = read(7, True)
File "/usr/local/lib/python3.10/site-packages/amqp/transport.py", line 627, in _read
    s = recv(n - len(rbuf))

-6

请按照以下教程操作

Celery Django 链接

将以下内容添加到设置中

NB:安装 Redis 用于传输和结果

   # TRANSPORT
   CELERY_BROKER_TRANSPORT = 'redis'
   CELERY_BROKER_HOST = 'localhost'
   CELERY_BROKER_PORT = '6379'
   CELERY_BROKER_VHOST = '0'

   # RESULT
   CELERY_RESULT_BACKEND = 'redis'
   CELERY_REDIS_HOST = 'localhost'
   CELERY_REDIS_PORT = '6379'
   CELERY_REDIS_DB = '1'

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接