从一个 future 字典中,以惯用方式收集结果。

22

我正在尝试尽可能地写出习惯用语来从存储在字典中的未来结果中收集结果。

让我们假设我有以下代码:

import asyncio

async def sleep(seconds):
    print(f'sleeping for {seconds} seconds')
    await asyncio.sleep(seconds)
    print(f'finished sleeping {seconds} seconds')


async def run():
    tasks = {
        '4': sleep(4),
        '3': sleep(3),
        '2': sleep(2),
        '1': sleep(1),
    }
    print(await gather_from_dict(tasks))


if __name__ == '__main__':
     asyncio.get_event_loop().run_until_complete(run())

我期望的输出是:

sleeping for 2 seconds
sleeping for 1 seconds
sleeping for 4 seconds
sleeping for 3 seconds
finished sleeping 1 seconds
finished sleeping 2 seconds
finished sleeping 3 seconds
finished sleeping 4 seconds
{'4': None, '3': None, '2': None, '1': None}

到目前为止,我发现最清晰的解决方案是:

async def gather_from_dict(tasks:Dict[Hashable, Awaitable],
                           loop=None, return_exceptions=False) -> Dict:

    results = await asyncio.gather(
        *tasks.values(),
        loop=loop,
        return_exceptions=return_exceptions
    )
    return dict(zip(tasks.keys(), results))

有没有更简单的方法实现这个?谢谢!!!


根据@vaultah的建议进行了更新 - jordixou
我认为你的解决方案是迄今为止最干净的:)我认为可能需要改进的是等待dict(zip(tasks.keys(), results)),因为它有点耗时。 - Newskooler
很棒的解决方案!顺便说一下:在Python中,字典现在是有序的,但在未来的Python版本中可能会改变。因此,可能有序字典更加健壮。 - Mark Mishyn
2个回答

1
我将您的任务重新定义为协程列表,以使其更加纯净,并且更喜欢从run_until_complete方法中获取结果,代码如下所示,并注意您在sleep代码中返回了某些内容,在您的代码中,实际上返回了None。
import asyncio


async def sleep(seconds):
    print('sleeping for {seconds} seconds'.format(seconds=seconds))
    await asyncio.sleep(seconds)
    print('finished sleeping {seconds} seconds'.format(seconds=seconds))
    return {seconds: 'value of {seconds}'.format(seconds=seconds)}


if __name__ == '__main__':

    loop = asyncio.get_event_loop()

    tasks = [sleep(i) for i in range(1, 5)]

    finished, _ = loop.run_until_complete(
        asyncio.wait(tasks))

    result = {}

    for task in finished:
        result.update(task.result())

    print(result)
    loop.close()

0
假设: Futures(或协程)将在列表和字典的结构中,而不是在任何自定义对象或元组内部!
给定一些像这样的异步函数:
import asyncio


async def slow(result):
    await asyncio.sleep(0.1)
    return result

async def some_complex_stuff_here():
    return {
        'a': slow(1),
        'b': [
            slow('hello'),
            {
                'c': slow('fna'),
                1: slow('world'),
            }
        ],
    }

您可以使用以下代码等待所有内容:
def __gather_futures(container, path, futures):
    if isinstance(container, dict):
        for key, value in container.items():
            sub_path = path + (key, )
            __gather_futures(value, sub_path, futures)

    if isinstance(container, list):
        for idx, value in enumerate(container):
            sub_path = path + (idx,)
            __gather_futures(value, sub_path, futures)

    if inspect.isawaitable(container):
        futures.append((path, container))


async def gather_object_futures(container):
    futures = []
    wrapper = {'content': container}
    __gather_futures(container=wrapper, path=(), futures=futures)
    results = await asyncio.gather(*[f[1] for f in futures])
    for (path, future), result in zip(futures, results):
        current_object = wrapper
        for key in path[:-1]:
            current_object = current_object[key]
        current_object[path[-1]] = result

    return wrapper['content']

你可以使用以下方式调用:

async def run():
    return await gather_object_futures(await some_complex_stuff_here())

import time
time_start = time.time()
loop = asyncio.get_event_loop()
result = loop.run_until_complete(run())

# will print something like 100ms --> all futures have been gathred at once!
print(time.time() - time_start)  

# result contains all of the resolved results
print(result

请注意:在 gather_object_futures 函数调用内部使用 await 是至关重要的!!

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接