多个异步上下文管理器

11

在Python中是否有可能将异步上下文管理器组合起来使用?类似于asyncio.gather,但可以与上下文管理器一起使用。就像这样:

async def foo():
    async with asyncio.gather_cm(start_vm(), start_vm()) as vm1, vm2:
        await vm1.do_something()
        await vm2.do_something()

目前这是否可行?

1个回答

15

类似于 gather_cm 的功能可以使用 Python 3.7 中引入的AsyncExitStack 实现:

async def foo():
    async with AsyncExitStack() as stack:
        vm1, vm2 = await asyncio.gather(
            stack.enter_async_context(start_vm()),
            stack.enter_async_context(start_vm()))
        await vm1.do_something()
        await vm2.do_something()

很遗憾,__aexit__ 仍然会按顺序运行。这是因为 AsyncExitStack 模拟了嵌套的上下文管理器,它们有明确定义的顺序,并且不能重叠。外部上下文管理器的 __aexit__ 会得到有关内部上下文管理器是否引发异常的信息。(数据库句柄的 __aexit__ 可能会用这个信息来回滚事务,如果没有异常则提交。) 并行运行 __aexit__ 会使上下文管理器重叠,并且异常信息不可用或不可靠。因此,尽管 gather(...) 并行运行 __aenter__,但 AsyncExitStack 记录了哪一个先执行,并按相反顺序运行 __aexit__

使用异步上下文管理器时,像 gather_cm 这样的替代方法非常合理。可以放弃嵌套语义,并提供一个类似于“退出池”而不是栈的聚合上下文管理器。退出池接受一些彼此独立的上下文管理器,这允许它们的 __aenter____aexit__ 方法并行运行。

棘手的部分是正确处理异常: 如果任何一个 __aenter__ 引发异常,必须传播该异常以防止运行 with 块。为了确保正确性,池必须保证对所有已完成 __aenter__ 的上下文管理器调用 __aexit__

以下是一个示例实现:

import asyncio
import sys

class gather_cm:
    def __init__(self, *cms):
        self._cms = cms

    async def __aenter__(self):
        futs = [asyncio.create_task(cm.__aenter__())
                for cm in self._cms]
        await asyncio.wait(futs)
        # only exit the cms we've successfully entered
        self._cms = [cm for cm, fut in zip(self._cms, futs)
                     if not fut.cancelled() and not fut.exception()]
        try:
            return tuple(fut.result() for fut in futs)
        except:
            await self._exit(*sys.exc_info())
            raise

    async def _exit(self, *args):
        # don't use gather() to ensure that we wait for all __aexit__s
        # to complete even if one of them raises
        done, _pending = await asyncio.wait(
            [cm.__aexit__(*args)
             for cm in self._cms if cm is not None])
        return all(suppress.result() for suppress in done)

    async def __aexit__(self, *args):
        # Since exits are running in parallel, so they can't see each
        # other exceptions.  Send exception info from `async with`
        # body to all.
        return await self._exit(*args)

这个测试程序展示了它的工作原理:

class test_cm:
    def __init__(self, x):
        self.x = x
    async def __aenter__(self):
        print('__aenter__', self.x)
        return self.x
    async def __aexit__(self, *args):
        print('__aexit__', self.x, args)

async def foo():
    async with gather_cm(test_cm('foo'), test_cm('bar')) as (cm1, cm2):
        print('cm1', cm1)
        print('cm2', cm2)

asyncio.run(foo())

AsyncExitStack方法似乎不安全;看起来AsyncExitStack可能会在所有__aenter__调用终止之前被解除,导致并发修改错误和/或未退出的上下文管理器。 - user2357112
@user2357112 如果 __aenter__ 都没有引发异常,await gather(...) 将等待它们都完成。但是如果其中一个引发了异常,其他的将会在后台继续执行,并且可能会出现您所描述的情况。 - user4815162342

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接