Celery工作者内的多线程

18

我正在使用Celery和RabbitMQ处理API请求的数据。 过程如下:

请求 > API > RabbitMQ > Celery工人 > 返回

理想情况下,我会生成更多的Celery工人,但由于内存限制而受限。

目前,我的流程瓶颈是从传递给工作程序的URL中获取和下载数据。 大致上,该过程如下:

def celery_gets_job(url):
    data = fetches_url(url)       # takes 0.1s to 1.0s (bottleneck)
    result = processes_data(data) # takes 0.1s
    return result

工人在获取URL时被锁定了一段时间是不可接受的。我正在考虑通过线程改进这个问题,但我不确定最佳实践是什么。

  • 是否有办法使celery工作程序在处理数据的同时异步下载传入的数据到不同的线程中?

  • 应该使用单独的工作程序来获取和处理数据,通过某种形式的消息传递(可能通过RabbitMQ)进行通信吗?


1
你可以考虑在Celery任务中使用类似multiprocessing pipes这样的东西,通过创建两个多进程来实现。当然,你的多进程应该受到池的限制。如果我没记错的话,在RabbitMQ/结果后端共享获取的URL大数据不是一个好主意。Celery低级API也可能具有类似的功能。 - Sanket Sudake
1
我不了解RabbitMQ,但我认为多进程比多线程更适合你,因为celery_gets_job有多个非原子操作,使用多线程会导致问题。您可以使用队列,其中数据由运行fetches_url(url)的进程池填充,并且另一个进程用于执行processes_data(data) - shrishinde
这可能是你正在寻找的内容:https://dev59.com/Ol4c5IYBdhLWcg3wFnA4 - fpbhb
1
Celery 的创建者发布的这篇文章 https://news.ycombinator.com/item?id=11889549 或许是你在寻找的内容。 - dyeray
2个回答

3
使用 eventlet 库,您可以为标准库打补丁以使其变成异步的。
首先导入异步 urllib2:
from eventlet.green import urllib2

因此,您可以使用以下方式获取URL正文:

def fetch(url):
    body = urllib2.urlopen(url).read()
    return body

这里可以看到更多与eventlet相关的示例。


3
使用eventlet执行池http://docs.celeryproject.org/en/latest/userguide/concurrency/eventlet.html 时,应该直接进行io调用的猴子补丁。 - dyeray
但是,那么 processes_data(data) 不会仍然阻塞并使得合并结果比以前更慢吗? - ostrokach

0
我会创建两个任务,一个用于下载数据,另一个用于在下载完成后处理数据。这样您就可以独立扩展这两个任务。请参考:路由,

看起来不像是一个解决方案。工作人员仍然会被卡在等待io完成的状态中。目标是让一个工作人员同时下载多个url。 - ogurets

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接