如何使用迭代器包装asyncio

7

我有以下简化的代码:

async def asynchronous_function(*args, **kwds):
    statement = await prepare(query)
    async with conn.transaction():
        async for record in statement.cursor():
            ??? yield record ???

...

class Foo:

    def __iter__(self):
        records = ??? asynchronous_function ???
        yield from records

...

x = Foo()
for record in x:
    ...

我不知道如何填写上面的???。 我想得到记录数据,但是如何包装asyncio代码并不明显。


将异步代码和阻塞代码混合使用通常是一个不好的主意,用async for record in x替换for record in x是否可行? - Vincent
问题在于一旦我使用了异步,我就必须将其全部推到堆栈的顶部 - 我不想重写整个堆栈以符合异步的风格。或者换句话说,我有这段代码可以在没有异步的情况下工作,但我想尝试异步代码,看看它是否更高效。我看到的所有示例都是玩具示例... - Brian Bruggeman
1
嗯,异步编程通常并不提供性能。它确实提供了协作式多任务处理,但您通常需要对每个阻塞调用使用异步/等待范例才能看到其优势。在这种情况下,“for record in x:”确实是一个阻塞调用。 - Vincent
Asyncio更多关注的是可伸缩性而非性能。使用线程可以轻松地与50个同事进行交流;但是,如果要与500或者5000人进行交流,则会出现问题,因为您将不得不生成大量的操作系统线程(并在它们之间调试争用问题,特别是与GIL相结合),或者使用线程池并花费不必要的时间等待空闲池中的插槽。Asyncio允许您处理许多连接而无需每个连接都使用操作系统线程,同时保持具有协程的可读代码。请参见我的答案,以了解在非Asyncio程序中使用Asyncio的示例。 - user4815162342
也许在这里跟随评论。我觉得术语在这里正在影响我们的判断。当我提到性能时,我真正考虑的是并发的关键方面,即我在等待I/O时花费的闲置时钟周期。我有独立的工作,如果我有一种释放它们的方法,肯定可以利用这些时钟周期。我认为asyncio可以做到这一点,但与当前同步代码进行交互时非常笨拙。 - Brian Bruggeman
2个回答

9
虽然asyncio旨在全面使用,但有时立即将大型软件(及其所有依赖项)转换为异步是不可能的。幸运的是,有办法将传统同步代码与新编写的asyncio部分结合起来。一种简单的方法是在专用线程中运行事件循环,并使用asyncio.run_coroutine_threadsafe将任务提交给它。
使用这些低级工具,您可以编写通用适配器,将任何异步迭代器转换为同步迭代器。例如:
import asyncio, threading, queue

# create an asyncio loop that runs in the background to
# serve our asyncio needs
loop = asyncio.get_event_loop()
threading.Thread(target=loop.run_forever, daemon=True).start()

def wrap_async_iter(ait):
    """Wrap an asynchronous iterator into a synchronous one"""
    q = queue.Queue()
    _END = object()

    def yield_queue_items():
        while True:
            next_item = q.get()
            if next_item is _END:
                break
            yield next_item
        # After observing _END we know the aiter_to_queue coroutine has
        # completed.  Invoke result() for side effect - if an exception
        # was raised by the async iterator, it will be propagated here.
        async_result.result()

    async def aiter_to_queue():
        try:
            async for item in ait:
                q.put(item)
        finally:
            q.put(_END)

    async_result = asyncio.run_coroutine_threadsafe(aiter_to_queue(), loop)
    return yield_queue_items()

然后您的代码只需要调用wrap_async_iter来将异步迭代器包装成同步迭代器:
async def mock_records():
    for i in range(3):
        yield i
        await asyncio.sleep(1)

for record in wrap_async_iter(mock_records()):
    print(record)

在你的情况下,Foo.__iter__ 将使用 yield from wrap_async_iter(asynchronous_function(...))

2
这遵循我的原则 - 如果它不够优雅,那是因为我还没有找到合适的抽象。谢谢。 - Brian Bruggeman

1
如果您想从异步生成器中接收所有记录,可以使用async for或简写的异步推导式
async def asynchronous_function(*args, **kwds):
    # ...
    yield record


async def aget_records():
    records = [
        record 
        async for record 
        in asynchronous_function()
    ]
    return records

如果你想要同步地(即阻塞式地)从异步函数中获取结果,你可以运行这个函数在 asyncio 循环中
def get_records():
    records = asyncio.run(aget_records())
    return records

请注意,一旦在事件循环中运行某个协程,您就失去了同时(即并行)运行此协程与其他协程并因此获得所有相关好处的能力。
正如Vincent在评论中指出的那样,asyncio不是使代码更快的魔术棒,而是一种工具,有时可以用于以低开销并发运行不同的I/O任务。
您可能会对阅读this answer感兴趣,以了解asyncio背后的主要思想。

2
你在这里编写的代码收集了所有来自asyncio的数据,并阻塞直到asyncio运行完成。我可能没有很好地描述我的用例,因为这并不比简单地运行标准的阻塞、非异步IO代码更好。 - Brian Bruggeman
@BrianBruggeman 是的,这并没有改善。我不确定你想做什么:在未获取async for循环中的所有值之前,无法将数据从异步生成器传播到普通同步for循环。您可以尝试以下方式思考:“为什么statement.cursor()可以与async for一起使用,而一开始不能与for一起使用?” - Mikhail Gerasimov
“如果不先获取来自异步 for 循环的全部值,就无法将数据从异步生成器传播到普通同步 for 循环。我认为这就是我想要理解的内容。如果真是这种情况,那么除非完全重写我的代码库或者从头开始使用 asyncio,否则我很可能永远不会使用 asyncio。” - Brian Bruggeman

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接