Python asyncio gather 字典

3

我知道如何通过列表创建异步任务集合,并使用asyncio.gather()执行它们,但我不知道如何使用字典来实现同样的效果:

    from fastapi import FastAPI
    import asyncio
    import httpx

    app = FastAPI()

    urls = [
        "http://www.google.com",
        "https://www.example.org",
        "https://stackoverflow.com/",
        "https://www.wikipedio.org"
    ]

    async def async_request_return_dict(url: str):
        async with httpx.AsyncClient() as client:
            r = await client.get(url)
        return {url : r.status_code}

    # List of async coroutines
    @app.get("/async_list")
    async def async_list():

        tasks = []
        for url in urls:
            tasks.append(async_request_return_dict(url=url))
        response = await asyncio.gather(*tasks)

        # Convert list of dict -> dict
        dict_response = dict()
        for url in response:
            dict_response.update(url)

        return dict_response

    async def async_request_return_status(url: str):
        async with httpx.AsyncClient() as client:
            r = await client.get(url)
        return r.status_code

    # Dict of async coroutines
    @app.get("/async_dict")
    async def async_dict():

        tasks = dict()
        for url in urls:
            tasks[url] = async_request_return_status(url=url)

        ### How do you run async co-routines inside a dict??? ###
        await asyncio.gather(*tasks.values())
        
        print(tasks)

        return tasks    

/async_list的输出:

    {
        "http://www.google.com":200,
        "https://www.example.org":200,
        "https://stackoverflow.com/":200,
        "https://www.wikipedio.org":200
    }

/async_dict 的回溯:

INFO:     Application startup complete.
{'http://www.google.com': <coroutine object async_request_return_status at 0x7fec83ded6c0>, 'https://www.example.org': <coroutine object async_request_return_status at 0x7fec83ded740>, 'https://stackoverflow.com/': <coroutine object async_request_return_status at 0x7fec83ded7c0>, 'https://www.wikipedio.org': <coroutine object async_request_return_status at 0x7fec83ded840>}
INFO:     127.0.0.1:58350 - "GET /async_dict HTTP/1.1" 500 Internal Server Error
ERROR:    Exception in ASGI application
Traceback (most recent call last):
  File "/home/cmaggio/repos/async_dict/.venv/lib/python3.8/site-packages/uvicorn/protocols/http/h11_impl.py", line 396, in run_asgi
    result = await app(self.scope, self.receive, self.send)
  File "/home/cmaggio/repos/async_dict/.venv/lib/python3.8/site-packages/uvicorn/middleware/proxy_headers.py", line 45, in __call__
    return await self.app(scope, receive, send)
  File "/home/cmaggio/repos/async_dict/.venv/lib/python3.8/site-packages/fastapi/applications.py", line 199, in __call__
    await super().__call__(scope, receive, send)
  File "/home/cmaggio/repos/async_dict/.venv/lib/python3.8/site-packages/starlette/applications.py", line 111, in __call__
    await self.middleware_stack(scope, receive, send)
  File "/home/cmaggio/repos/async_dict/.venv/lib/python3.8/site-packages/starlette/middleware/errors.py", line 181, in __call__
    raise exc from None
  File "/home/cmaggio/repos/async_dict/.venv/lib/python3.8/site-packages/starlette/middleware/errors.py", line 159, in __call__
    await self.app(scope, receive, _send)
  File "/home/cmaggio/repos/async_dict/.venv/lib/python3.8/site-packages/starlette/exceptions.py", line 82, in __call__
    raise exc from None
  File "/home/cmaggio/repos/async_dict/.venv/lib/python3.8/site-packages/starlette/exceptions.py", line 71, in __call__
    await self.app(scope, receive, sender)
  File "/home/cmaggio/repos/async_dict/.venv/lib/python3.8/site-packages/starlette/routing.py", line 566, in __call__
    await route.handle(scope, receive, send)
  File "/home/cmaggio/repos/async_dict/.venv/lib/python3.8/site-packages/starlette/routing.py", line 227, in handle
    await self.app(scope, receive, send)
  File "/home/cmaggio/repos/async_dict/.venv/lib/python3.8/site-packages/starlette/routing.py", line 41, in app
    response = await func(request)
  File "/home/cmaggio/repos/async_dict/.venv/lib/python3.8/site-packages/fastapi/routing.py", line 209, in app
    response_data = await serialize_response(
  File "/home/cmaggio/repos/async_dict/.venv/lib/python3.8/site-packages/fastapi/routing.py", line 137, in serialize_response
    return jsonable_encoder(response_content)
  File "/home/cmaggio/repos/async_dict/.venv/lib/python3.8/site-packages/fastapi/encoders.py", line 90, in jsonable_encoder
    encoded_value = jsonable_encoder(
  File "/home/cmaggio/repos/async_dict/.venv/lib/python3.8/site-packages/fastapi/encoders.py", line 141, in jsonable_encoder
    raise ValueError(errors)
ValueError: [TypeError("'coroutine' object is not iterable"), TypeError('vars() argument must have __dict__ attribute')]

基本上如何使用 asyncio.gather() 来执行一个字典中的内容。或者采用先将协程放在列表中执行,等待所有可等待对象都被解决后,再重新构建字典的第一种方法。

错误看起来有点奇怪,你能发布完整的回溯吗? - user4815162342
我基本上正在尝试在字典内使用asyncio.gather()执行异步协程。 - captobvious
1
我无法复现这个问题。也许可以提供一个包含2-3个真实URL的MRC示例。 - Vikash Balasubramanian
使用Fastapi编写的示例应用程序和回溯已更新此问题。 - captobvious
1个回答

8
由于您的字典值是协程,而实际结果值是从asyncio.gather返回的,所以这种方法无法按照您的期望工作。
asyncio.gather:
结果值的顺序对应于aws中可等待对象的顺序。
基于这个事实,您可以尝试像这样做:
test.py:
import asyncio

import httpx


URLS = (
    "http://www.google.com",
    "https://www.example.org",
    "https://stackoverflow.com/",
    "https://www.wikipedio.org",
)


async def req(url):
    async with httpx.AsyncClient() as client:
        r = await client.get(url)

    return r.status_code


async def main():
    tasks = []

    for url in URLS:
        tasks.append(asyncio.create_task(req(url)))

    results = await asyncio.gather(*tasks)

    print(dict(zip(URLS, results)))


if __name__ == "__main__":
    asyncio.run(main())

测试:

$ python test.py
{'http://www.google.com': 200, 'https://www.example.org': 200, 'https://stackoverflow.com/': 200, 'https://www.wikipedio.org': 200}

太棒了,感谢你的回答,我将来会使用zip()函数。 - captobvious

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接