Python:异步生成器已在运行

6

就像下面的例子一样,当使用异步生成器时我遇到了一个不寻常的错误。

async def demo():
    async def get_data():
        for i in range(5):  # loop: for or while
            await asyncio.sleep(1)  # some IO code

            yield i

    datas = get_data()

    await asyncio.gather(
        anext(datas),
        anext(datas),
        anext(datas),
        anext(datas),
        anext(datas),
    )


if __name__ == '__main__':
    # asyncio.run(main())
    asyncio.run(demo())

控制台输出:

2022-05-11 23:55:24,530 DEBUG asyncio 29180 30600 Using proactor: IocpProactor
Traceback (most recent call last):
  File "E:\workspace\develop\python\crawlerstack-proxypool\demo.py", line 77, in <module>
    asyncio.run(demo())
  File "D:\devtools\Python310\lib\asyncio\runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "D:\devtools\Python310\lib\asyncio\base_events.py", line 641, in run_until_complete
    return future.result()
  File "E:\workspace\develop\python\crawlerstack-proxypool\demo.py", line 66, in demo
    await asyncio.gather(
RuntimeError: anext(): asynchronous generator is already running

情境描述:我有一个循环逻辑,每次从Redis中获取一批数据,并想使用yield返回结果。但在创建并发任务时出现了错误。

有没有好的解决方案?我不是要改变现在的使用方式,而是想看看是否可以判断它是否正在运行或者像锁定一样等待它运行,然后再执行anext。

也许我的逻辑现在不太合理,但我也想了解一些关键语言,让我意识到这个问题的严重性。

谢谢您的帮助。


1
你根本不需要异步生成器。如果你的 get_data 执行了 asyncio.sleep 模拟的任何操作并返回其结果,那么你只需收集 get_data 五次(或实际数字是多少次)即可。 - dirn
1个回答

11

简短概述:正确的方法

异步生成器不适合并行消费。请看下面的解释。作为一个适当的解决方法,使用asyncio.Queue来实现生产者和消费者之间的通信:

queue = asyncio.Queue()

async def producer():
    for item in range(5):
        await asyncio.sleep(random.random())  # imitate async fetching
        print('item fetched:', item)
        await queue.put(item)

async def consumer():
    while True:
        item = await queue.get()
        await asyncio.sleep(random.random())  # imitate async processing
        print('item processed:', item)

await asyncio.gather(producer(), consumer(), consumer())

上面的代码片段适用于无限流的项目:例如,一个永远运行并为客户端提供请求的Web服务器。但是,如果我们需要处理有限数量的项目呢?消费者应该如何知道何时停止?

这值得在Stack Overflow上提出另一个问题来涵盖所有的替代方案,但最简单的选择是使用下面描述的sentinel方法。

Sentinel: 有限数据流方法

引入一个sentinel = object()。当从外部数据源获取并将所有项目放入队列时,producer必须向队列中推送与您拥有的consumer数量相同的sentinel。一旦consumer获取了sentinel,它就知道应该停止:if item is sentinel: break退出循环。

sentinel = object()
consumers_count = 2

async def producer():
    ...  # the same code as above
    if new_item is None:  # if no new data
        for _ in range(consumers_count):
            await queue.put(sentinel)

async def consumer():
    while True:
        ...  # the same code as above
        if item is sentinel:
            break

await asyncio.gather(
    producer(),
    *(consumer() for _ in range(consumers_count)),
)

TL;DR [2]: 一个不太干净的解决方法

由于您需要不更改异步生成器方法,因此这里提供了一种基于 asyncgen 的替代方案。为了以一种简单但不太干净的方式解决此问题,您可以使用锁来包装源异步生成器:

async def with_lock(agen, lock: asyncio.Lock):
    while True:
        async with lock:  # only one consumer is allowed to read
            try:
                item = await anext(agen)
            except StopAsyncIteration:
                break
        # exiting lock section => let others consume
        yield item  # consumer processes an item asyncly

lock = asyncio.Lock()  # a common lock for all consumers
await asyncio.gather(
    # every consumer must have its own "wrapped" generator
    anext(with_lock(datas, lock)),
    anext(with_lock(datas, lock)),
    ...
)

这将确保每次只有一个消费者从生成器中等待获取项目。当此消费者等待时,其他消费者正在执行,因此并行性不会丢失。

重要提示!在上述方法中,不要将yield await anext(agen)作为单个表达式放在lock下:您的包装生成器将挂起(在yield上),并且lock未释放,没有其他消费者能够并行消费另一个项目。即仅使用锁包装anext调用,但不要在锁定部分中yield

使用async for的大致等效代码(看起来更智能):

async def with_lock(agen, lock: asyncio.Lock):
    await lock.acquire()
    async for item in agen:
        lock.release()
        yield item
        await lock.acquire()
    lock.release()

然而,这段代码仅处理异步生成器的anext方法。而生成器API还包括acloseathrow方法。下面进行解释。

虽然您可以将对这些方法的支持添加到with_lock函数中,但我建议要么子类化一个生成器并在内部处理锁支持,要么更好地使用上面的基于Queue的方法。

参见contextlib.aclosing以获取一些灵感。

解释

同步和异步生成器都有一个特殊属性:.gi_running(用于常规生成器)和.ag_running(用于异步生成器)。 您可以通过在生成器上执行dir来发现它们:

>>> dir((i for i in range(0))
[..., 'gi_running', ...]

当生成器的.__next__.__anext__方法被执行时(next(...)anext(...)只是语法糖),它们被设置为True

这可以防止在一个生成器上重新执行next(...),当同一生成器上的另一个next(...)调用已经在执行时:如果运行标志为True,则会引发异常(对于同步生成器,它会引发ValueError: generator already executing)。

因此,回到您的示例,当您运行await anext(datas)(通过asyncio.gather)时,会发生以下情况:

  1. datas.ag_running 被设置为 True
  2. 执行流程进入 datas.__anext__ 方法。
  3. 一旦在 __anext__ 方法内部达到了一个内部的 await 语句(例如你的 await asyncio.sleep(1)),asyncio 的循环将切换到另一个消费者。
  4. 现在,另一个消费者也尝试调用 await anext(datas),但由于 datas.ag_running 标志仍然被设置为 True,这会导致 RuntimeError

为什么需要这个标志?

生成器的执行可以暂停和恢复。但只能在 yield 语句处。因此,如果生成器在内部的 await 语句处暂停,它就无法“恢复”,因为它的状态不允许。

这就是为什么对生成器进行并行的 next/anext 调用会引发异常的原因:它还没有准备好被恢复,它已经在运行中了。

athrowaclose

生成器的API(同步和异步)不仅包括用于迭代的send/asend方法,还包括:

  • close/aclose用于在退出或异常时释放生成器分配的资源(例如数据库连接)
  • throw/athrow用于通知生成器必须处理异常。

acloseathrow也是异步方法。这意味着如果两个消费者尝试并行关闭/抛出底层生成器,则会遇到相同的问题,因为在关闭(或处理异常)生成器时,它又被关闭(抛出异常)。

同步生成器示例

虽然这是异步生成器的常见情况,但对于同步生成器来说,重现它并不那么简单,因为同步next(...)调用很少被中断。

中断同步生成器的一种方法是运行具有多个消费者的多线程代码(在并行线程中运行),从单个生成器读取。 在这种情况下,当生成器的代码在执行next调用时被中断时,所有其他消费者并行尝试调用next将导致异常。

另一种实现此目的的方法是通过自消耗生成器在生成器相关PEP#255中演示:

>>> def g():
...     i = next(me)
...     yield i
... 
>>> me = g()
>>> next(me)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "<stdin>", line 2, in g
ValueError: generator already executing

当外部调用next(me)时,它会将me.gi_running设置为True,然后执行生成器函数代码。随后的内部next(me)调用会导致ValueError

结论

生成器(特别是异步)最适合由单个读取器使用。多个消费者的支持很困难,因为需要修补所有生成器方法的行为,因此不鼓励这样做。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接