等待Python异步生成器

5

假设我有两个异步生成器:

async def get_rules():
    while True:
        yield 'rule=1'
        asyncio.sleep(2)


async def get_snapshots():
    while True:
        yield 'snapshot=1'
        asyncio.sleep(5)

我希望将它们合并成一个异步生成器,返回含有两个值的二元组,这两个值都是最新的。类似于combineLatest的功能。
那么,最好的方法是什么?

你能澄清一下你希望合并生成器产生输出的时机吗?它只有在两个子生成器都产生输出时才会产生输出,还是只要其中一个产生输出就会产生输出? - Blckknght
@Blckknght 当其中一个函数执行时,就会发生这种情况。尽管我对asyncio的了解越多,我越不确定这是否有效。我开始认为asyncio希望我使用任务,并通过队列或通道等方式来传递这些函数的结果。 - miracle2k
2个回答

7

您可能会对 aiostream 感兴趣,特别是 stream.mergestream.accumulate 这两个操作:

import asyncio
from itertools import count
from aiostream import stream


async def get_rules():
    for x in count():
        await asyncio.sleep(2)
        yield 'rule', x


async def get_snapshots():
    for x in count():
        await asyncio.sleep(5)
        yield 'snapshot', x


async def main():
    xs = stream.merge(get_rules(), get_snapshots())
    ys = stream.map(xs, lambda x: {x[0]: x[1]})
    zs = stream.accumulate(ys, lambda x, e: {**x, **e}, {})

    async with zs.stream() as streamer:
        async for z in streamer:
            print(z)


loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()

输出:

{}
{'rule': 0}
{'rule': 1}
{'rule': 1, 'snapshot': 0}
{'rule': 2, 'snapshot': 0}
[...]

请查看项目页面文档以获取更多信息。
免责声明:我是项目的维护者。

只要你的项目可以使用GPL许可证,这个库非常不错。 - Rotareti
@Rotareti 谢谢!就我个人而言,我对使用Copyleft许可证感到满意 :) - Vincent

1
我想到了这个:

我想到了这个:

async def combine(**generators):
    """Given a bunch of async generators, merges the events from
    all of them. Each should have a name, i.e. `foo=gen, bar=gen`.
    """
    combined = Channel()
    async def listen_and_forward(name, generator):
        async for value in generator:
            await combined.put({name: value})
    for name, generator in generators.items():
        asyncio.Task(listen_and_forward(name, generator))

    async for item in combined:
        yield item


async def combine_latest(**generators):
    """Like "combine", but always includes the latest value from
    every generator.
    """
    current = {}
    async for value in combine(**generators):
        current.update(value)
        yield current

Call it like so:

async for item in combine_latest(rules=rulesgen, snap=snapgen):
    print(item)

输出看起来像这样:

{'rules': 'rule-1'}
{'rules': 'rule-1', 'snap': 'snapshot-1'}
{'rules': 'rule-1', 'snap': 'snapshot-1'}
....

我正在使用aiochannel,但普通的asyncio.Queue也可以。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接