在异步生成器函数内部使用yield from语句是否不好?

4
我被告知以下代码不安全,因为在托育场所内不允许有从异步生成器产生的内容,除非它是异步上下文管理器。
T = TypeVar('T')

async def delay(interval: float, source: AsyncIterable[T]) -> AsyncIterable[T]:
    """Delays each item in source by an interval.

    Received items are temporarily stored in an unbounded queue, along with a timestamp, using
    a background task. The foreground task takes items from the queue, and waits until the
    item is older than the given interval and then yields it."""

    send_channel, receive_channel = trio.open_memory_channel(math.inf)

    async def pull_task():
        async with aclosing(source) as agen:
            async for item in agen:
                send_channel.send_nowait((item, trio.current_time() + interval))

    async with trio.open_nursery() as nursery:
        nursery.start_soon(pull_task)
        async with receive_channel:
            async for item, timestamp in receive_channel:
                now = trio.current_time()
                if timestamp > now:
                    await trio.sleep(timestamp - now)
                yield item

我很难理解这个生成器函数是如何可能出错的。如果有人能提供一个使用这个确切的生成器函数的示例代码,演示其不安全性,那将不胜感激并得到奖励。
上述代码的目标是延迟处理异步序列,而不施加任何背压。如果您可以证明此代码与我的期望不同,请也告知。
谢谢。
1个回答

6

不幸的是,这是正确的 - 除了使用@contextlib.asynccontextmanager创建异步上下文管理器或编写异步pytest fixture的狭窄情况之外,nursery或cancel scope内部的yield不受支持。

这其中有几个原因。其中一些是技术性的:Trio必须跟踪当前堆栈上活动的哪些nursery/cancel scopes,当您从一个中yield出来时,它会打破嵌套,并且Trio无法知道您已经这样做了。(库无法检测到上下文管理器中的yield

但也有一个根本上无法解决的原因,那就是Trio和结构化并发的整个理念是每个任务都“属于”可以接收子任务崩溃通知的父任务。但是当您在生成器中yield时,生成器帧被冻结并与当前任务分离 - 它可能在另一个任务中恢复,也可能永远不会恢复。因此,当您yield时,它会打破nursery中所有子任务与其父任务之间的链接。没有办法将其与结构化并发的原则协调起来。

在 Trio 聊天中,Joshua Oreman 给出了一个具体的例子,该例子可以帮助解决你的问题:

if I run the following

async def arange(*args):
    for val in range(*args):
        yield val

async def break_it():
    async with aclosing(delay(0, arange(3))) as aiter:
        with trio.move_on_after(1):
            async for value in aiter:
                await trio.sleep(0.4)
                print(value)

trio.run(break_it)

then I get

RuntimeError: Cancel scope stack corrupted: attempted to exit
<trio.CancelScope at 0x7f364621c280, active, cancelled> in <Task
'__main__.break_it' at 0x7f36462152b0> that's still within its child
<trio.CancelScope at 0x7f364621c400, active>

This is probably a bug in your code, that has caused Trio's internal
state to become corrupted. We'll do our best to recover, but from now
on there are no guarantees.

Typically this is caused by one of the following:
  - yielding within a generator or async generator that's opened a cancel
    scope or nursery (unless the generator is a @contextmanager or
    @asynccontextmanager); see https://github.com/python-trio/trio/issues/638 [...]

By changing the timeouts and delay so that the timeout expired while inside the generator rather than while outside of it, I was able to get a different error also: trio.MultiError: Cancelled(), GeneratorExit() raised out of aclosing()

这里也有一份关于所有这些问题的长篇讨论,我们在这里得出结论,这是不可支持的:https://github.com/python-trio/trio/issues/264

这是一个不幸的情况,因为我们不能支持它,甚至更糟糕的是,它在简单情况下看起来像是可以工作的,所以人们可能会在意识到它无法工作之前编写大量使用此技巧的代码 :-(

我们的计划是使非法情况在尝试 yield 时立即给出明显错误,以至少避免第二个问题。但是,这需要 向Python解释器添加一些额外的钩子,所以需要一段时间。

还有一种构造方式可以创建一个与异步生成器编写和使用几乎一样简单的构造方式,但避免了这个问题。其思想是,不是将生成器从正在消耗它的任务的堆栈中推入和弹出,而是将“生成器”代码作为第二个任务运行,向消费者任务提供值。请参见此处开始的线程以获取更多详细信息。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接