从Flask路由中进行Python asyncio调用

68

我希望每次执行Flask路由时都能执行一个异步函数。为什么abar函数从未被执行?

import asyncio
from flask import Flask

async def abar(a):
    print(a)

loop = asyncio.get_event_loop()
app = Flask(__name__)

@app.route("/")
def notify():
    asyncio.ensure_future(abar("abar"), loop=loop)
    return "OK"

if __name__ == "__main__":
    app.run(debug=False, use_reloader=False)
    loop.run_forever()

我也尝试将阻塞调用放在一个单独的线程中。但它仍然没有调用abar函数。

import asyncio
from threading import Thread
from flask import Flask

async def abar(a):
    print(a)

app = Flask(__name__)

def start_worker(loop):
    asyncio.set_event_loop(loop)
    try:
        loop.run_forever()
    finally:
        loop.close()

worker_loop = asyncio.new_event_loop()
worker = Thread(target=start_worker, args=(worker_loop,))

@app.route("/")
def notify():
    asyncio.ensure_future(abar("abar"), loop=worker_loop)
    return "OK"

if __name__ == "__main__":
    worker.start()
    app.run(debug=False, use_reloader=False)

2
app.runloop.run_forever 都是阻塞的。你最好使用线程。如果你必须使用 asyncio,那么你应该研究一下基于它构建的类似 Flask 的框架。 - dirn
@dim 非常感谢。我尝试将一个阻塞移动到单独的线程中。请查看我的编辑后的问题! - user24502
5个回答

51

您可以在不完全将Flask应用程序转换为asyncio的情况下,将一些异步功能整合到其中。

import asyncio
from flask import Flask

async def abar(a):
    print(a)

loop = asyncio.get_event_loop()
app = Flask(__name__)

@app.route("/")
def notify():
    loop.run_until_complete(abar("abar"))
    return "OK"

if __name__ == "__main__":
    app.run(debug=False, use_reloader=False)

这将阻塞Flask响应直到异步函数返回,但它仍然允许你做一些聪明的事情。我使用这种模式并行执行了许多外部请求,使用aiohttp,然后当它们完成时,我回到传统的Flask进行数据处理和模板渲染。

import aiohttp
import asyncio
import async_timeout
from flask import Flask

loop = asyncio.get_event_loop()
app = Flask(__name__)

async def fetch(url):
    async with aiohttp.ClientSession() as session, async_timeout.timeout(10):
        async with session.get(url) as response:
            return await response.text()

def fight(responses):
    return "Why can't we all just get along?"

@app.route("/")
def index():
    # perform multiple async requests concurrently
    responses = loop.run_until_complete(asyncio.gather(
        fetch("https://google.com/"),
        fetch("https://bing.com/"),
        fetch("https://duckduckgo.com"),
        fetch("http://www.dogpile.com"),
    ))

    # do something with the results
    return fight(responses)

if __name__ == "__main__":
    app.run(debug=False, use_reloader=False)

3
在生产环境中,通常使用像gevent、meinheld或eventlet这样的异步工作程序来运行Flask。我认为需要注意的是,这种解决方案会阻塞gevent/meinheld/eventlet的事件循环,从而抵消使用它们的某些优点。 - pgjones
17
你的示例代码导致了一个 RuntimeError: There is no current event loop in thread 'Thread-1' 错误。重现步骤:1)我将你的代码保存到 soexamp.py 文件中;2)运行 python soexamp.py 命令;3)然后执行 curl localhost:5000/ 命令。我的 Flask 版本是 '1.0.2',aiohttp 版本是 '3.5.4'。 - Anton Daneyko
10
这段内容的意思是:这不是线程安全的,不能从任意线程简单地使用 loop.run_until_complete()。 asyncio循环只能在特定线程中使用。任何实际的WSGI部署都将使用线程。不能调用 asyncio.get_event_loop(),而需要为每个线程创建一个新的事件循环。但是这样做有些过度。 - Martijn Pieters
1
@MartijnPieters,为什么每个线程创建新的事件循环会有问题?您能否解释一下什么是“不线程安全”吗? - ravi malhotra
4
“not thread safe”指的是当多个线程同时修改同一数据结构时,如果不考虑线程安全,可能会导致程序出现问题。除了一些明确文档记录的函数外,asyncio事件循环实现是不具备线程安全性的。在这里的代码中,并没有为每个线程创建一个新的事件循环,也没有正确地将协程传递给单个线程。请注意,我还在回答这个问题时提到了更好地解决这些问题的方法。 - Martijn Pieters
显示剩余3条评论

37

在我个人的观点中,解决您问题的更简单的方法是从Flask切换到Quart。这样,您的片段会变得更简单,如下所示:

import asyncio
from quart import Quart

async def abar(a):
    print(a)

app = Quart(__name__)

@app.route("/")
async def notify():
    await abar("abar")
    return "OK"

if __name__ == "__main__":
    app.run(debug=False)

正如其他答案中所指出的,Flask应用程序运行是阻塞的,并且不与asyncio循环交互。另一方面,Quart是建立在asyncio之上的Flask API,因此它应该按照您的期望工作。

另外更新一下,Flask-Aiohttp不再维护


我有几个包含同步/阻塞函数的库。如果我切换到Quart,会发生什么?当我调用这些库中的函数时,它会阻塞事件循环,对吗? - Arvind Sridharan
是的,它们会被阻塞。您可以使用asyncio.run_in_executor将对这些函数的调用包装起来,并等待它(默认情况下在另一个线程中运行函数)。或者,您可以切换到基于asyncio的替代库。 - pgjones
2
很抱歉给你点了踩,但是那些告诉你要切换整个框架才能触发后台任务的答案并没有真正帮到你。 - Eric Burel
Quart是一个不错的建议,但您的答案实际上没有正确回答问题,因为您使用了await调用,而原始问题要求异步地独立于服务器响应进行调用。 - deed02392

24

你的错误在于尝试在调用 app.run() 之后运行 asyncio 事件循环。后者不会返回,而是运行 Flask 开发服务器。

事实上,大多数 WSGI 设置都是这样工作的;要么主线程将忙于分派请求,要么 Flask 服务器作为模块在 WSGI 服务器中导入,在这里也无法启动事件循环。

相反,您需要在单独的线程中运行您的 asyncio 事件循环,然后通过asyncio.run_coroutine_threadsafe()在该单独线程中运行您的协程。有关此过程的详细信息,请参见文档中的协程和多线程部分

这里是一个运行此类事件循环线程并提供您安排在该循环中运行协程的实用程序的模块实现:

import asyncio
import itertools
import threading

__all__ = ["EventLoopThread", "get_event_loop", "stop_event_loop", "run_coroutine"]

class EventLoopThread(threading.Thread):
    loop = None
    _count = itertools.count(0)

    def __init__(self):
        self.started = threading.Event()
        name = f"{type(self).__name__}-{next(self._count)}"
        super().__init__(name=name, daemon=True)

    def __repr__(self):
        loop, r, c, d = self.loop, False, True, False
        if loop is not None:
            r, c, d = loop.is_running(), loop.is_closed(), loop.get_debug()
        return (
            f"<{type(self).__name__} {self.name} id={self.ident} "
            f"running={r} closed={c} debug={d}>"
        )

    def run(self):
        self.loop = loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        loop.call_later(0, self.started.set)

        try:
            loop.run_forever()
        finally:
            try:
                shutdown_asyncgens = loop.shutdown_asyncgens()
            except AttributeError:
                pass
            else:
                loop.run_until_complete(shutdown_asyncgens)
            try:
                shutdown_executor = loop.shutdown_default_executor()
            except AttributeError:
                pass
            else:
                loop.run_until_complete(shutdown_executor)
            asyncio.set_event_loop(None)
            loop.close()

    def stop(self):
        loop, self.loop = self.loop, None
        if loop is None:
            return
        loop.call_soon_threadsafe(loop.stop)
        self.join()

_lock = threading.Lock()
_loop_thread = None

def get_event_loop():
    global _loop_thread

    if _loop_thread is None:
        with _lock:
            if _loop_thread is None:
                _loop_thread = EventLoopThread()
                _loop_thread.start()
                # give the thread up to a second to produce a loop
                _loop_thread.started.wait(1)

    return _loop_thread.loop

def stop_event_loop():
    global _loop_thread
    with _lock:
        if _loop_thread is not None:
            _loop_thread.stop()
            _loop_thread = None

def run_coroutine(coro):
    """Run the coroutine in the event loop running in a separate thread

    Returns a Future, call Future.result() to get the output

    """
    return asyncio.run_coroutine_threadsafe(coro, get_event_loop())

您可以使用此处定义的run_coroutine()函数来调度asyncio例程。使用返回的Future实例来控制协程:

  • 使用Future.result()获取结果。您可以设置超时时间;如果在超时时间内没有产生结果,则自动取消协程。
  • 您可以使用.cancelled().running().done()方法查询协程的状态。
  • 您可以为Future添加回调函数,当协程完成、取消或引发异常时将调用该回调函数(请注意,这可能会从事件循环线程而不是您调用run_coroutine()的线程中调用)。

对于您的特定示例,其中abar()不返回任何结果,您可以忽略返回的future,就像这样:

@app.route("/")
def notify():
    run_coroutine(abar("abar"))
    return "OK"

请注意,在Python 3.8之前,您无法使用运行在单独线程上的事件循环来创建子进程!请参见我的回答 Python3 Flask asyncio subprocess in route hangs,了解Python 3.8 ThreadedChildWatcher类的后移解决方法。


假设我们在abar()函数内进行递归异步调用。如果abar()调用另一个async函数,例如:async def abar_1,我们应该使用run_coroutine(abar_1())还是await abar_1()。如果abar_1()调用另一个异步函数,那么情况是否相同?我有一个库,其中包含await func()定义,据我所知,我必须将所有内容转换为run_coroutine(func())格式,以便它们能够与您的代码一起工作。它们是否可以有一个wrapper()函数? - alper
1
@alper,你在这里没有谈论递归,只是普通的异步调用。通常情况下,你只需要在其他协程上使用await或创建一个任务对象来并发运行其他协程。请参见协程和任务。我回答中的代码仅用于将asyncio与Flask集成,一旦进入事件循环,请使用异步编程技术。 - Martijn Pieters

6
由于同样的原因,您将看不到此打印输出:
if __name__ == "__main__":
    app.run(debug=False, use_reloader=False)
    print('Hey!')
    loop.run_forever()

loop.run_forever()从未被调用,因为正如@dirn所指出的那样,app.run也是阻塞的。

运行全局阻塞事件循环是运行asyncio协程和任务的唯一方式,但它与运行阻塞的Flask应用程序(或通常情况下的任何其他应用程序)不兼容。

如果您想使用异步Web框架,则应选择专门为异步而创建的框架。例如,目前可能最受欢迎的是aiohttp

from aiohttp import web


async def hello(request):
    return web.Response(text="Hello, world")


if __name__ == "__main__":
    app = web.Application()
    app.router.add_get('/', hello)
    web.run_app(app)  # this runs asyncio event loop inside

更新:

关于您尝试在后台线程中运行事件循环的问题。我没有深入调查,但似乎问题与线程安全有关: 许多asyncio对象不是线程安全的。如果您按照以下方式更改代码,它将工作:

def _create_task():
    asyncio.ensure_future(abar("abar"), loop=worker_loop)

@app.route("/")
def notify():
    worker_loop.call_soon_threadsafe(_create_task)
    return "OK"

但是,这是非常不好的想法。不仅非常不方便,而且我认为没有太多意义:如果您要使用线程来启动asyncio,为什么不直接在Flask中使用线程而不是asyncio?这样你就可以拥有你想要的Flask和并行化。
如果我还没有说服您,请至少看一下Flask-aiohttp项目。它与Flask api非常接近,我认为比您尝试做的更好。

非常感谢您的解释,这很有道理。同时这是一个不错的小型aiohttp示例。不幸的是,我被绑定在flask/flask-ask上用于Alexa技能。我已经修改了我的原始问题并将一个阻塞调用移动到单独的线程中。但仍然没有运气。 - user24502
在Flask中运行一个asyncio循环是一个非常好的想法,并且没有任何问题,只要你稍微小心一点。当进行大量阻塞I/O时,线程和异步协程有非常不同的优缺点,使用asyncio比线程更可取。 - Martijn Pieters

4
主要问题,正如@Martijn Pieters@Mikhail Gerasimov在其他答案中已经解释的那样,是因为app.run是阻塞的,所以loop.run_forever()这一行永远不会被调用。您需要在单独的线程上手动设置和维护运行循环。
幸运的是,使用 Flask 2.0,您不再需要创建、运行和管理自己的事件循环了。您可以将路由定义为async def,并直接从路由函数中await协程。

https://flask.palletsprojects.com/en/2.0.x/async-await/

使用async和await

自版本2.0起新增功能。

如果使用pip install flask[async]安装Flask,那么路由、错误处理程序、请求前、请求后以及拆卸函数都可以是协程函数。这需要Python 3.7+并且支持contextvars.ContextVar。这使得视图可以使用async def定义并使用await

Flask将在每个请求上创建事件循环,您只需定义您的协程并等待它们完成即可:

https://flask.palletsprojects.com/en/2.0.x/async-await/#performance

性能

异步函数需要事件循环才能运行。作为WSGI应用程序,Flask使用一个工作线程来处理一个请求/响应周期。 当一个请求进入异步视图时,Flask将在一个线程中启动一个事件循环,在那里运行视图函数,然后返回结果。

即使对于异步视图,每个请求仍然会占用一个工作线程。好处是您可以在视图中运行异步代码,例如进行多个并发数据库查询、向外部API发出HTTP请求等。但是,您的应用程序一次可以处理的请求数量仍将保持不变。

import asyncio
from flask import Flask, jsonify

async def send_notif(x: int):
    print(f"Called coro with {x}")
    await asyncio.sleep(1)
    return {"x": x}

app = Flask(__name__)

@app.route("/")
async def notify():
    futures = [send_notif(x) for x in range(5)]
    results = await asyncio.gather(*futures)

    response = list(results)
    return jsonify(response)

# The recommended way now is to use `flask run`.
# See: https://flask.palletsprojects.com/en/2.0.x/cli/
# if __name__ == "__main__":
#     app.run(debug=False, use_reloader=False)

$ time curl -s -XGET 'http://localhost:5000'
[{"x":0},{"x":1},{"x":2},{"x":3},{"x":4}]


real    0m1.016s
user    0m0.005s
sys     0m0.006s

大多数使用asyncio的常见编程方案都可以以相同的方式应用。需要注意的一件事是,从Flask 2.0.1开始,我们不能使用asyncio.create_task来生成后台任务:

https://flask.palletsprojects.com/en/2.0.x/async-await/#background-tasks

异步函数会在事件循环中运行,直到完成后,事件循环将停止。这意味着,当异步函数完成时,任何未完成的附加任务都将被取消。因此,您无法生成后台任务,例如通过 "asyncio.create_task"。

如果您希望使用后台任务,最好使用任务队列来触发后台工作,而不是在视图函数中生成任务。

除了使用 "create_task" 的限制外,它应该适用于您想要进行异步数据库查询或多次调用外部 API 的用例。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接