Celery任务在执行中挂起,用于机器学习预测。

6

我正在尝试创建一个Web应用,它可以从POST请求中接收输入,并根据该输入提供一些ML预测。

由于预测模型相当复杂,我不希望用户等待计算完成。相反,我将重计算委托给了Celery任务,用户稍后可以查看结果。

我正在使用简单的Flask应用程序与Celery,Redis和Flower。

我的view.py文件:

@ns.route('predict/')
class Predict(Resource):
    ...
    def post(self):
        ...
        do_categorize(data)
        return jsonify(success=True)

我的tasks.py文件大致如下:

from ai.categorizer import Categorizer
categorizer = Categorizer(
    model_path='category_model.h5',
    tokenizer_path='tokenize.joblib',
    labels_path='labels.joblib'
)


@task()
def do_categorize(data):
    result = categorizer.predict(data)
    print(result)
    # Write result to the DB
    ...

我的Categorizer类中的predict()方法:

def predict(self, value):
    K.set_session(self.sess)
    with self.sess.as_default():
        with self.graph.as_default():
            prediction = self.model.predict(np.asarray([value], dtype='int64'))
            return prediction

我这样运行Celery:

celery worker -A app.celery --loglevel=DEBUG

我最近几天遇到的问题是categorizer.predict(data)调用在执行过程中卡住了。
我尝试在post方法中运行categorizer.predict(data),它可以正常工作。但是如果将其放置在Celery任务中,则停止工作。如果我尝试进行调试,它就会在.predict()上冻结。
我的问题:
  • 我如何解决这个问题?
  • 工作程序是否有任何内存、CPU限制?
  • Celery任务是否是进行这种重型计算的“正确”方式?
  • 我该如何调试这个问题?我做错了什么?
  • 在文件顶部初始化模型是否正确?
1个回答

6

感谢这个SO问题,我找到了解决我的问题的答案:

原来对于Keras来说,使用Threads池比默认的Process更好。

对我来说很幸运的是,使用 Celery 4.4 不久前重新引入了Threads池。 你可以在 Celery 4.4 Changelogs 中了解更多:

线程任务池

我们重新引入了使用 concurrent.futures.ThreadPoolExecutor 的线程任务池。

以前的线程任务池是实验性的。此外它基于已过时的 threadpool 包。

您可以通过将 worker_pool 设置为 'threads' 或通过传递 --pool threads 到 celery worker 命令来使用新的线程任务池。

现在你可以使用线程而不是进程进行池化。

celery worker -A your_application --pool threads --loginfo=INFO

如果你无法使用最新的Celery版本,你可以使用gevent软件包:

pip install gevent
celery worker -A your_application --pool gevent --loginfo=INFO

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接