如何正确使用Python的asyncio模块创建和运行并发任务?

96
我正在尝试理解并使用Python 3相对较新的asyncio模块来正确地实现两个并发运行的Task对象。
简而言之,asyncio似乎是设计用于处理事件循环中异步进程和并发Task执行的。它推广了await(在async函数中应用)作为一种无回调等待和使用结果的方式,而无需阻塞事件循环。(Futures和回调仍然是可行的替代方案)。
它还提供了asyncio.Task()类,这是Future的一个特殊子类,旨在包装协程。最好使用asyncio.ensure_future()方法来调用它。asyncio任务的预期用途是允许独立运行的任务在同一事件循环中与其他任务“并发”运行。我的理解是,Tasks与事件循环连接,然后自动在await语句之间驱动协程。
我很喜欢能够使用并发任务而无需使用Executor类之一的想法,但我没有找到太多有关实现的详细信息。
这是我目前的做法:
import asyncio

print('running async test')

async def say_boo():
    i = 0
    while True:
        await asyncio.sleep(0)
        print('...boo {0}'.format(i))
        i += 1

async def say_baa():
    i = 0
    while True:
        await asyncio.sleep(0)
        print('...baa {0}'.format(i))
        i += 1

# wrap in Task object
# -> automatically attaches to event loop and executes
boo = asyncio.ensure_future(say_boo())
baa = asyncio.ensure_future(say_baa())

loop = asyncio.get_event_loop()
loop.run_forever()

在尝试同时运行两个循环任务的情况下,我发现除非任务有内部await表达式,否则它将在while循环中卡住,有效地阻塞其他任务的运行(就像普通的while循环一样)。然而,一旦任务必须等待,它们似乎可以并发运行而没有问题。
因此,await语句似乎为事件循环提供了一个支点,以便在任务之间进行切换,从而产生并发效果。
带有内部await的示例输出:
running async test
...boo 0
...baa 0
...boo 1
...baa 1
...boo 2
...baa 2

没有内部await的示例输出:

...boo 0
...boo 1
...boo 2
...boo 3
...boo 4

问题

这个实现是否是使用 asyncio 进行并发循环任务的“正确”示例?

唯一的方式是让 Task 提供一个阻塞点(await 表达式),以便事件循环可以处理多个任务,这是正确的吗?

编辑

2022年更新:请注意,自此问题提出以来,asyncio API 已经发生了相当大的变化。请参见新标记为正确答案的答案,该答案现在显示了在 Python 3.10 中给出的 API 的正确使用方式。我仍然建议从 @dano 的答案中获取更广泛的了解。


4
是的,任务会在从yield from到下一个yield from期间原子化地执行自己。 - Andrew Svetlov
3个回答

102

是的,在您的事件循环中运行的任何协程都会阻塞其他协程和任务的运行,除非:

  1. 使用 yield fromawait(如果使用 Python 3.5+)调用另一个协程。
  2. 返回。

这是因为 asyncio 是单线程的;事件循环运行的唯一方式是没有其他协程在积极执行。使用 yield from/await 会暂时挂起协程,给事件循环运行的机会。

您的示例代码没问题,但在许多情况下,您可能不希望长时间运行的代码内嵌在事件循环中而不进行异步I/O。在这些情况下,通常最好使用asyncio.loop.run_in_executor在后台线程或进程中运行代码。如果您的任务是CPU密集型,则应该选择ProcessPoolExecutor,如果需要执行一些不太友好于 asyncio 的 I/O,则应该使用 ThreadPoolExecutor

例如,您的两个循环完全是 CPU 密集型的,并且不共享任何状态,因此最佳性能是使用 ProcessPoolExecutor 在多个 CPU 上并行运行每个循环:

import asyncio
from concurrent.futures import ProcessPoolExecutor

print('running async test')

def say_boo():
    i = 0
    while True:
        print('...boo {0}'.format(i))
        i += 1


def say_baa():
    i = 0
    while True:
        print('...baa {0}'.format(i))
        i += 1

if __name__ == "__main__":
    executor = ProcessPoolExecutor(2)
    loop = asyncio.new_event_loop()
    boo = loop.run_in_executor(executor, say_boo)
    baa = loop.run_in_executor(executor, say_baa)

    loop.run_forever()

还重新调整了run_in_executor的部分,如下所示:loop.run_in_executor(executor, asyncio.Task(say_boo())) - songololo
2
您IP地址为143.198.54.68,由于运营成本限制,当前对于免费用户的使用频率限制为每个IP每72小时10次对话,如需解除限制,请点击左下角设置图标按钮(手机用户先点击左上角菜单按钮)。 - dano
当执行器中的任务完成时,我如何添加回调函数? - luisgepeto
1
看起来 asyncio.asyncensure_future 的别名,现在已被弃用。 - srobinson
1
我复制了代码并在Python 3.9中运行它。它抛出一个错误 - RuntimeError: no running event loop; 但是继续执行。 此外,如上所述,boo正在阻止进一步的代码执行。我想我明白了 - 创建任务,在当前循环中生成,但在“外部”执行器上执行 - 哇,可以在后台工作。但它不起作用。第一个任务立即运行并阻塞其余任务,因为它在同一个循环中。我删除了asyncio.create_task(),现在似乎正常工作 - 两个函数都开始在loop.run_forever()行上运行。 - bez imienny
显示剩余7条评论

18

在Python 3.10中,asyncio.ensure_futureasyncio.get_event_loop这两个函数已被弃用。

你可以通过asyncio.create_task同时运行两个协程say_boosay_baa

async def main():
    boo = asyncio.create_task(say_boo())
    baa = asyncio.create_task(say_baa())
    await boo
    await baa

asyncio.run(main())

您还可以使用asyncio.gather

async def main():
    await asyncio.gather(say_boo(), say_baa())

asyncio.run(main())

2
为了新观点的受益,我将这个问题标记为正确,考虑到API的最新更改。 - songololo

16

你不一定需要yield from x来控制事件循环。

在你的示例中,我认为正确的方法是执行yield None或者等价于简单的yield,而不是执行yield from asyncio.sleep(0.001)

import asyncio

@asyncio.coroutine
def say_boo():
  i = 0
  while True:
    yield None
    print("...boo {0}".format(i))
    i += 1

@asyncio.coroutine
def say_baa():
  i = 0
  while True:
    yield
    print("...baa {0}".format(i))
    i += 1

boo_task = asyncio.async(say_boo())
baa_task = asyncio.async(say_baa())

loop = asyncio.get_event_loop()
loop.run_forever()
协程只是普通的 Python 生成器。在内部,asyncio事件循环会记录这些生成器并逐一调用它们中的每一个gen.send(),从而形成一个无限循环。每当您使用yield时,对gen.send()的调用完成后,循环就可以继续执行。(我正在简化它;请参阅https://hg.python.org/cpython/file/3.4/Lib/asyncio/tasks.py#l265查看实际代码)
话虽如此,如果您需要进行CPU密集型计算而不共享数据,我仍然建议您使用run_in_executor

在Python 3.4中可以工作,但在Python 3.5中似乎无法工作。是否有类似的方法适用于3.5?(使用“None”似乎比在每个地方都使用“asyncio.sleep()”更优雅...) - songololo
25
自 Python 3.5 开始,正确的方法是使用 asyncio.sleep(0)。[参见此讨论。] - Jashandeep Sohi

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接