异步生成器上的异步for循环

9

拥有一个异步生成器,我希望能够异步地迭代它。然而,我可能错过了某些东西,或者弄错了什么,最终我得到的是一个常规的同步for循环:

import asyncio


async def time_consuming(t):
    print(f"Going to sleep for {t} seconds")
    await asyncio.sleep(t)
    print(f"Slept {t} seconds")
    return t


async def generator():
    for i in range(4, 0, -1):
        yield await time_consuming(i)


async def consumer():
    async for t in generator():
        print(f"Doing something with {t}")


if __name__ == '__main__':
    loop = asyncio.new_event_loop()
    loop.run_until_complete(consumer())
    loop.close()

这将需要大约12秒钟来运行并返回以下内容:
Going to sleep for 4 seconds
Slept 4 seconds
Doing something with 4
Going to sleep for 3 seconds
Slept 3 seconds
Doing something with 3
Going to sleep for 2 seconds
Slept 2 seconds
Doing something with 2
Going to sleep for 1 seconds
Slept 1 seconds
Doing something with 1

尽管我预计它需要大约4秒钟才能运行并返回类似以下的内容:
Going to sleep for 4 seconds
Going to sleep for 3 seconds
Going to sleep for 2 seconds
Going to sleep for 1 seconds
Slept 4 seconds
Doing something with 4
Slept 3 seconds
Doing something with 3
Slept 2 seconds
Doing something with 2
Slept 1 seconds
Doing something with 1
1个回答

10

异步生成器并不意味着您可以同时执行迭代!您所获得的只是让协程有更多的地方可以通过yield暂停以便其他任务运行。 迭代步骤仍然按顺序运行。

换句话说:异步迭代器对于需要使用I/O获取每个迭代步骤的迭代器非常有用。例如在Web套接字或文件中循环读取行。如果每个迭代器的next()步骤都需要等待慢速I/O源提供数据,那么就可以将控制权切换到已设置为并发运行的其他任务。

如果您希望您的生成器的每个单独步骤都同时运行,那么您仍然需要显式地安排额外的任务与事件循环一起运行。

当所有这些额外的任务完成后,您可以从生成器返回结果。 如果您将4个time_consuming()协程安排为任务,则使用asyncio.wait()等待一个或所有任务完成,并从完成的任务中yield结果,那么在完成for i in range(...): 循环后,您的进程总共只需要4秒:

async def generator():
    pending = []
    for i in range(4, 0, -1):
        pending.append(asyncio.create_task(time_consuming(i)))

    while pending:
        done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED)
        for task in done:
            yield task.result()

在那个时候输出变得

Going to sleep for 4 seconds
Going to sleep for 3 seconds
Going to sleep for 2 seconds
Going to sleep for 1 seconds
Slept 1 seconds
Doing something with 1
Slept 2 seconds
Doing something with 2
Slept 3 seconds
Doing something with 3
Slept 4 seconds
Doing something with 4

请注意,这是与您预期输出相反的顺序,因为它会在任务完成时获取任务结果,而不是等待第一个任务完成。通常,这正是您想要的。当您已经在1秒后获得了结果时,为什么要等待4秒呢?

您也可以使用您自己的变体,但需要以不同的方式编写代码。然后,您可以在这4个任务上使用asyncio.gather(),该函数会将一堆协程作为并发任务运行,并将它们的结果作为列表返回,之后您可以返回这些结果:

async def generator():
    tasks = []
    for i in range(4, 0, -1):
        tasks.append(time_consuming(i))

    for res in await asyncio.gather(*tasks):
        yield res 

但现在输出结果变成了

Going to sleep for 4 seconds
Going to sleep for 3 seconds
Going to sleep for 2 seconds
Going to sleep for 1 seconds
Slept 1 seconds
Slept 2 seconds
Slept 3 seconds
Slept 4 seconds
Doing something with 4
Doing something with 3
Doing something with 2
Doing something with 1

由于最长的任务 time_consuming(4)尚未完成,因此在此之前我们无法执行任何其他操作,但是在此点之前,较短时间运行的任务已经完成并输出了它们的睡眠...秒消息。


1
@RodrigoMartins:这就像是 asyncio.create_task() 一样简单。 - Martijn Pieters
1
@RodrigoMartins:抱歉,我用队列过于复杂化了。在这里,你只需要等待任务完成,这样done就拥有了我们所需的所有信息。 - Martijn Pieters
@MartjinPieters 我只是在测试不使用队列,谢谢! - Rodrigo Oliveira
1
@user4815162342 谢谢,是的,当然。我现在去拿我的棕色纸袋。 - Martijn Pieters
@MartijnPieters :D 注意,答案中还有几个地方连续使用了相同的方式。 - user4815162342
显示剩余7条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接