Flask - 作业未作为后台进程运行

7

我正在尝试运行一个包含以下内容的Flask应用:

  1. 实时处理API请求
  2. 将每个请求上传到SQLalchemy数据库中
  3. 作为后台进程运行任务1任务2

为此,我有以下代码:

import concurrent.futures
import queue
from concurrent.futures import ThreadPoolExecutor

from flask import Flask, current_app

app = Flask(__name__)
q = queue.Queue()


def build_cache():
    # 1. Yielding API requests on the fly
    track_and_features = spotify.query_tracks()  # <- a generator
    while True:
        q.put(next(track_and_features))


def upload_cache(tracks_and_features):
    # 2. Uploading each request to a `SQLalchemy` database
    with app.app_context():
        Upload_Tracks(filtered_dataset=track_and_features)

    return "UPLOADING TRACKS TO DATABASE"


@app.route("/cache")
def cache():
    # 3. Do `1` and `2` as a background process
    with concurrent.futures.ThreadPoolExecutor() as executor:

        future_to_track = {executor.submit(build_cache): "TRACKER DONE"}

        while future_to_track:
            # check for status of the futures which are currently working
            done, not_done = concurrent.futures.wait(
                future_to_track,
                timeout=0.25,
                return_when=concurrent.futures.FIRST_COMPLETED,
            )

            # if there is incoming work, start a new future
            while not q.empty():

                # fetch a track from the queue
                track = q.get()

                # Start the load operation and mark the future with its TRACK
                future_to_track[executor.submit(upload_cache, track)] = track
            # process any completed futures
            for future in done:
                track = future_to_track[future]
                try:
                    data = future.result()
                except Exception as exc:
                    print("%r generated an exception: %s" % (track, exc))

                del future_to_track[future]

    return "Cacheing playlist in the background..."

上述方法都可以使用,但不能作为后台进程。调用cache()时,应用程序会挂起,并且只有当该进程完成时才会恢复。
我使用gunicorn -c gconfig.py app:app -w 4 --threads 12来运行它,那么我的问题出在哪里呢?
# 1st background process
def build_cache():
    # only ONE JOB
    tracks_and_features = spotify.query_tracks()  # <- not a generator
    while True:
        print(next(tracks_and_features))


# background cache
@app.route("/cache")
def cache():
    executor.submit(build_cache)
    return "Cacheing playlist in the background..."

然后进程在后台运行。

但是,如果我添加另一个作业:

def build_cache():

    tracks_and_features = spotify.query_tracks()
    while True:
        # SQLalchemy db
        Upload_Tracks(filtered_dataset=next(tracks_and_features))

背景图片再次无法正常工作。

简而言之:

只有在一次只运行一个任务时,背景图片才能正常工作(这也是使用队列的初衷限制)

看起来问题是将后台进程绑定到SQLalchemy,不确定。完全迷失了。


你是如何部署你的应用程序的?你使用WSGI服务器吗? - Fine
是的。我使用 gunicorn -c gconfig.py app:app -w 4 --threads 12 命令来运行应用程序。 - 8-Bit Borges
如果您所说的应用是指连接到Flask API的客户端,那么问题可能不在Flask上,而在于您的应用程序以及它发出请求的方式? - Fine
请参考我的编辑。 - 8-Bit Borges
“yield API requests on the fly”是什么意思?“upload every request to a DB”又是什么意思?你是想记录服务器上的每个API调用,还是尝试缓存你对Spotify的API调用结果,以便无需再次调用它们? - Marco Lavagnino
显示剩余6条评论
2个回答

4

还不确定您的意思:

我是指应用程序等待所有登录请求后,才能进入主页。它应该立即进入主页并在后台处理请求。

这里有一些问题:

  • 您的队列是全局的,即每个gunicorn工作线程只有一个队列;您可能希望将队列绑定到每个请求上,以便多个请求不会共享同一个队列。考虑使用context locals
  • 如果UploadTracks正在写入数据库,则表上可能有锁。检查您的索引并检查数据库中的锁等待。
  • SQLAlchemy可能配置了较小的连接池,第二个UploadTracks正在等待第一个返回其连接。

在您的第一个示例中,在返回之前,端点正在等待所有futures完成,而在您的第二个示例中,端点在提交任务给执行器后立即返回。如果您想让flask在任务仍在后台运行时快速响应,请删除with concurrent.futures.ThreadPoolExecutor() as executor:并在模块顶部构建全局线程池。

使用with,上下文管理器在退出之前等待所有提交的任务完成,但我不确定这是否是您的主要问题。


谢谢您提供详细的答案。不幸的是,这些解决方案都没有解决我的问题。没有锁定,SQLalchemy 与当前池一起正常工作,如果我忽略 queues,问题仍然存在。一个后台作业可以工作,但两个就不行。 - 8-Bit Borges

2
尝试在路由处理程序之外创建ThreadPoolExecutor
import time
from concurrent.futures import ThreadPoolExecutor

from flask import Flask


def foo(*args):
    while True:
        print("foo", args)
        time.sleep(10)


app = Flask(__name__)

executor = ThreadPoolExecutor()


@app.route("/cache")
def cache():
    executor.submit(foo, "1")
    executor.submit(foo, "2")
    return "in cache"

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接