我正在使用Celery和RabbitMQ处理API请求的数据。 过程如下:
请求 > API > RabbitMQ > Celery工人 > 返回
理想情况下,我会生成更多的Celery工人,但由于内存限制而受限。
目前,我的流程瓶颈是从传递给工作程序的URL中获取和下载数据。 大致上,该过程如下:
def celery_gets_job(url):
data = fetches_url(url) # takes 0.1s to 1.0s (bottleneck)
result = processes_data(data) # takes 0.1s
return result
工人在获取URL时被锁定了一段时间是不可接受的。我正在考虑通过线程改进这个问题,但我不确定最佳实践是什么。
是否有办法使celery工作程序在处理数据的同时异步下载传入的数据到不同的线程中?
应该使用单独的工作程序来获取和处理数据,通过某种形式的消息传递(可能通过RabbitMQ)进行通信吗?
celery_gets_job
有多个非原子操作,使用多线程会导致问题。您可以使用队列,其中数据由运行fetches_url(url)
的进程池填充,并且另一个进程用于执行processes_data(data)
。 - shrishinde