Python异步生成器不是异步的

7
我的代码如下。我希望两个sleep可以共享同一时间框架,并且需要1+2*3=7秒才能运行脚本。 但似乎出了些问题,导致它仍然需要3*(1+2)秒才能完成。
有没有任何想法可以修改这个代码?
import asyncio

async def g():
    for i in range(3):
        await asyncio.sleep(1)
        yield i

async def main():
    async for x in g():
        print(x)
        await asyncio.sleep(2)

loop = asyncio.get_event_loop()
res = loop.run_until_complete(main())
loop.close()


你在yield之前仍然在g中运行sleep,因此在main的sleep之前。 async/await语法存在是为了同时执行多个任务,而不是同时执行一个任务。 你只有一个任务,因此没有任何东西可以同时运行。 - MisterMiyagi
我假设你的“sleep”是在你的实际代码中进行一些实际工作?它是计算还是I/O操作? - MisterMiyagi
1
是的,我期望 gmain 睡眠时准备好下一个 isleep 是我实际代码中的一些实际工作吗? - cjs840312
2个回答

8
async/await 的目的是交替执行 任务,而不是函数/生成器。例如,当您使用 await asyncio.sleep(1) 时,您当前的协程会随着 sleep 被延迟。同样,async for 会延迟其协程,直到下一个项目准备就绪。
为了运行您的单独功能,必须将每个部分创建为单独的任务。使用 Queue 在它们之间交换项目 - 任务只会被延迟,直到它们已经交换了一个项目。
from asyncio import Queue, sleep, run, gather


# the original async generator
async def g():
    for i in range(3):
        await sleep(1)
        yield i


async def producer(queue: Queue):
    async for i in g():
        print('send', i)
        await queue.put(i)  # resume once item is fetched
    await queue.put(None)


async def consumer(queue: Queue):
    x = await queue.get()  # resume once item is fetched
    while x is not None:
        print('got', x)
        await sleep(2)
        x = await queue.get()


async def main():
    queue = Queue()
    # tasks only share the queue
    await gather(
        producer(queue),
        consumer(queue),
    )


run(main())

如果您经常需要这个功能,您也可以将其放入一个帮助对象中,该对象包装了异步可迭代器。该帮助程序封装队列和单独的任务。您可以直接在async for语句中应用该帮助程序到异步可迭代器上。

from asyncio import Queue, sleep, run, ensure_future


# helper to consume iterable as concurrent task
async def _enqueue_items(async_iterable, queue: Queue, sentinel):
    async for item in async_iterable:
        await queue.put(item)
    await queue.put(sentinel)


async def concurrent(async_iterable):
    """Concurrently fetch items from ``async_iterable``"""
    queue = Queue()
    sentinel = object()
    consumer = ensure_future(  # concurrently fetch items for the iterable
        _enqueue_items(async_iterable, queue, sentinel)
    )
    try:
        item = await queue.get()
        while item is not sentinel:
            yield item
            item = await queue.get()
    finally:
        consumer.cancel()


# the original generator
async def g():
    for i in range(3):
        await sleep(1)
        yield i


# the original main - modified with `concurrent`
async def main():
    async for x in concurrent(g()):
        print(x)
        await sleep(2)


run(main())

@cjs840312 由于这是一个经常发生的模式,我现在添加了一个示例助手,可以包装现有的异步生成器。 - MisterMiyagi

4
作为使用队列的替代方案,这个解决方案将 Future 链接在一起,使得 Future 的结果是当前项目,并且另一个 Future 用来检索下一个项目(有点像链表):
from asyncio import sleep, get_event_loop, run, create_task

async def aiter(fut, async_generator):
    try:
        async for item in async_generator:
            fut, prev_fut = get_event_loop().create_future(), fut
            prev_fut.set_result((item, fut))
        else:
            fut.set_exception(StopAsyncIteration())
    except Exception as e:
        fut.set_exception(e)


async def concurrent(async_generator):
    fut = get_event_loop().create_future()
    create_task(aiter(fut, async_generator))

    try:
        while True:
            item, fut = await fut
            yield item
    except StopAsyncIteration as e:
        return

作为额外的奖励,这个解决方案将正确地处理在g()中发生的异常,并通过在main()方法中重新引发异常以及对调试有用的回溯来使其具备更好的可读性。

非常好,因为它保证了恒定的内存使用! - Labo

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接