创建一个生成器,当协程完成时将协程结果作为生成器的输出。

10
目前,我有一个效率低下的同步生成器,它按顺序发出许多HTTP请求并产生结果。我想使用 asyncioaiohttp 并行处理请求,从而加速这个生成器,但我希望将其保留为普通生成器(而不是PEP 525异步生成器),以便调用它的非异步代码无需修改。如何创建这样的生成器?
2个回答

21

asyncio.as_completed()接收一个协程或Future对象的可迭代对象,返回Future对象的可迭代对象,按输入的Future对象完成的顺序排列。通常情况下,您将循环遍历其结果,并在async函数内部await成员对象...

import asyncio

async def first():
    await asyncio.sleep(5)
    return 'first'

async def second():
    await asyncio.sleep(1)
    return 'second'

async def third():
    await asyncio.sleep(3)
    return 'third'

async def main():
    for future in asyncio.as_completed([first(), second(), third()]):
        print(await future)

# Prints 'second', then 'third', then 'first'
asyncio.run(main())

...但是为了回答这个问题,我们希望能够从普通生成器中产生这些结果,以便正常的同步代码可以消费它们,而不需要知道async函数在底层被使用。我们可以通过调用由as_completed调用产生的future上的loop.run_until_complete()来实现这一点...

import asyncio

async def first():
    await asyncio.sleep(5)
    return 'first'

async def second():
    await asyncio.sleep(1)
    return 'second'

async def third():
    await asyncio.sleep(3)
    return 'third'

def ordinary_generator():
    loop = asyncio.get_event_loop()
    for future in asyncio.as_completed([first(), second(), third()]):
        yield loop.run_until_complete(future)

# Prints 'second', then 'third', then 'first'
for element in ordinary_generator():
    print(element)

通过这种方式,我们已经将异步代码暴露给了非异步环境,而不需要调用者将任何函数定义为async,甚至不需要知道ordinary_generator在幕后使用了asyncio

作为ordinary_generator()的备选实现,在某些情况下提供更大的灵活性,我们可以多次使用带有FIRST_COMPLETED标志的asyncio.wait()来代替循环as_completed()

import concurrent.futures

def ordinary_generator():
    loop = asyncio.get_event_loop()
    pending = [first(), second(), third()]
    while pending:
        done, pending = loop.run_until_complete(
            asyncio.wait(
                pending,
                return_when=concurrent.futures.FIRST_COMPLETED
            )
        )
        for job in done:
            yield job.result()

这种方法是通过维护一个“pending”作业列表来实现的,它的优点在于我们可以根据需要随时将作业添加到“pending”列表中。这在异步作业可能会向队列中添加不可预测数量的进一步作业的使用情况下非常有用,比如网络爬虫访问每个页面上的全部链接。

需要注意的是:以上方法假定我们从主线程调用同步代码,在这种情况下,get_event_loop保证会给我们一个循环并且我们不需要.close它。如果我们想要从非主线程(特别是可能之前已经创建了事件循环的线程)使用ordinary_generator,那么情况就会变得更加复杂,因为我们不能依赖get_event_loop(它会在任何尚未拥有事件循环的非主线程上引发RuntimeError)。在这种情况下,我认为最简单的做法是启动一个新线程来运行我们的asyncio代码,并通过一个队列与其通信:

def ordinary_generator():
    sentinel = object()
    queue = Queue()

    def thread_entry_point():
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        for future in asyncio.as_completed([first(), second(), third()]):
            try:
                queue.put(loop.run_until_complete(future))
            except Exception as e:
                queue.put((sentinel, e))
                break
        loop.close()
        queue.put(sentinel)

    Thread(target=thread_entry_point).start()
    while True:
        val = queue.get()
        if val is sentinel:
            return
        if isinstance(val, tuple) and len(val) == 2 and val[0] is sentinel:
            raise val[1]
        yield val

将倒数第二个示例中使用的run_until_complete与最后一个示例中使用额外线程的方法相结合,留给需要这样做的任何读者作为练习。


0

Mark的回答很好,但我想提供一个不依赖于低级事件循环方法的不同实现。

关键区别在于,不是使用yield,而是提供一个回调函数来处理结果:

import asyncio
import random

async def do_stuff():
    proc_time = round(random.random(), 2)
    print('START: ', proc_time)
    await asyncio.sleep(proc_time)
    return proc_time


def concurrent_stuff(awaitables, callback):
    # Must be async to wait
    async def _as_completed():
        for coro in asyncio.as_completed(awaitables):
            result = await coro
            callback(result)  # Send result to callback.

    # Perform the async calls inside a regular method
    asyncio.run(_as_completed())


def when_done(result):
    print('FINISHED: ', result)


def main():
    awaitables = [do_stuff() for _ in range(5)]
    concurrent_stuff(awaitables, when_done)


main()

# START:  0.56
# START:  0.98
# START:  0.39
# START:  0.23
# START:  0.94
# FINISHED:  0.23
# FINISHED:  0.39
# FINISHED:  0.56
# FINISHED:  0.94
# FINISHED:  0.98

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接