使用asyncio同时运行阻塞和非阻塞任务

3

我希望能够同时异步运行阻塞和非阻塞任务。显然,需要使用asyncio中的run_in_executor方法来处理阻塞任务。以下是我的示例代码:

import asyncio
import concurrent.futures
import datetime
import time


def blocking():
    print("Enter to blocking()", datetime.datetime.now().time())
    time.sleep(2)
    print("Exited from blocking()", datetime.datetime.now().time())


async def waiter():
    print("Enter to waiter()", datetime.datetime.now().time())
    await asyncio.sleep(3)
    print("Exit from waiter()", datetime.datetime.now().time())


async def asynchronous(loop):
    print("Create tasks", datetime.datetime.now().time())
    task_1 = asyncio.create_task(waiter())

    executor = concurrent.futures.ThreadPoolExecutor(max_workers=3)
    task_2 = loop.run_in_executor(executor, blocking)

    tasks = [task_1, task_2]
    print("Tasks are created", datetime.datetime.now().time())
    await asyncio.wait(tasks)


if __name__ == "__main__":
    try:
        loop = asyncio.get_event_loop()
        loop.run_until_complete(asynchronous(loop))
    except (OSError) as exc:
        sys.exit('Exception: ' + str(exc))

在run_in_executor中,我应该使用相同的事件循环来处理阻塞任务,还是需要使用另一个?为使代码异步执行,我应该更改哪些内容?谢谢


除了答案中提到的内容,还要注意不需要为每次运行创建新的线程池。线程池的目的是为了重复使用线程以提高效率。您只需将None作为第一个参数传递给run_in_executor,它将使用由asyncio为事件循环创建的线程池。 - user4815162342
哦,我明白了。但是如果我不使用自定义线程池中的单独线程,它会阻塞带有事件循环的线程,对吧? - d.golov
我并不是说你应该使用线程池,我只是在说你不需要创建自己的线程池。将 None 传递给 run_in_executor 将使用 asyncio 自己的线程池,这个线程池就是为此而存在的。该线程池仍然有自己的线程,并且不会阻塞事件循环线程。 - user4815162342
我明白了。我刚才对官方文档中的这个短语感到困惑:“loop.run_in_executor()方法可以与concurrent.futures.ThreadPoolExecutor一起使用,以在不阻塞事件循环运行的操作系统线程的情况下执行阻塞代码。” 我原本认为默认情况下只有一个带有事件循环的线程,并且使用自己的线程池可以避免主线程被事件循环阻塞。 - d.golov
你的想法是正确的,确实只有一个线程运行事件循环。这个线程池是一个辅助工具,用于避免每个人都创建自己的线程池并且线程数量无限增长的情况。因此,使用_一个_线程池确实是必要的,但它不一定是你自己的 - asyncio提供的_线程池_(与运行事件循环的线程不同)是完全可以的。 - user4815162342
@user4815162342,非常感谢您的解释。现在对我来说更加清晰了。您真是太棒了。谢谢。 - d.golov
1个回答

4

必须 使用相同的循环。该循环委托给执行器,该执行器在单独线程中运行任务以操作事件循环。因此,您不必担心阻塞任务会阻塞事件循环。如果您使用单独的循环,则来自事件循环的异步函数将无法等待在新循环中运行的阻塞函数的结果。

事件循环通过创建一个 future 来管理这个过程来表示执行器任务。然后,在执行器的一个线程中运行阻塞任务,当执行器任务返回时,将设置 future 的结果并将控制返回到事件循环中的等待函数(如果有的话)。


非常感谢。那么上面的代码似乎是正确的吗?创建了一个事件循环,并用于阻塞和非阻塞任务。 - d.golov
很好,除了你应该避免在异步函数中传递事件循环 - 这可能会导致错误。相反,如果异步函数需要访问事件循环,请使用asyncio.get_running_loop()。这是在3.7中新增的功能,因此如果您使用的是3.6或更低版本,请使用get_event_loop() - Dunes

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接