在异步程序中将Web响应写入文件

6

我正在替换使用 ThreadPoolExecutor 的服务器查询工具实现,改用所有异步调用使用 asyncioaiohttp。由于网络调用是非阻塞 IO,大部分转换都很直接,问题在于保存响应。

我使用的所有示例,甚至是两个库的文档,都使用 asyncio.gather() 收集所有可等待结果。在我的情况下,这些结果可能是几 GB 的文件,而我不想将它们存储在内存中。

有什么适当的方法来解决这个问题吗? 是使用 asyncio.as_completed() 然后:

for f in as_completed(aws):
    earliest_result = await f
    # Assumes `loop` defined under `if __name__` block outside coroutine
    loop = get_event_loop()
    # Run the blocking IO in an exectuor and write to file
    _ = await loop.run_in_executor(None, save_result, earliest_result)

这是否引入了一个线程(假设默认情况下使用 ThreadPoolExecutor),从而使这成为异步、多线程程序,而不是异步、单线程程序?
此外,这是否确保每次只写入 1 个 earliest_result 到文件中?我不希望调用 await loop.run_in_executor(...),然后另一个结果出现,我尝试运行到同一个文件;我可以用一个信号量来限制。
3个回答

3

我建议使用aiohttp Streaming API。将响应直接写入磁盘而不是RAM,从gather返回文件名而不是响应本身。这样做几乎不会使用太多内存。这是我所说的内容的一个小演示:

import asyncio

import aiofiles
from aiohttp import ClientSession


async def make_request(session, url):
    response = await session.request(method="GET", url=url)
    filename = url.split('/')[-1]
    async for data in response.content.iter_chunked(1024):
        async with aiofiles.open(filename, "ba") as f:
            await f.write(data)
    return filename


async def main():
    urls = ['https://github.com/Tinche/aiofiles',
            'https://github.com/aio-libs/aiohttp']
    async with ClientSession() as session:
        coros = [make_request(session, url) for url in urls]
        result_files = await asyncio.gather(*coros)
    print(result_files)


asyncio.run(main())

2
非常聪明的使用了asyncio.gather方法,由@merrydeath提出。我对助手函数进行了微调,如下所示,并获得了巨大的性能提升:
    response = await session.get(url)
    filename = url.split('/')[-1]
    async with aiofiles.open(filename, "ba") as f:
        await f.write(response.read())

根据下载连接速度,结果可能有所不同。


0
在我的情况下,这些结果可能是几个GB的文件,我不想将它们存储在内存中。
如果我理解正确,在您的代码中单个aws表示下载单个文件,您可能会遇到以下问题:虽然as_completed允许尽快将数据从RAM交换到HDD,但是所有aws同时运行并将各自的数据(部分下载文件的缓冲区)存储在RAM中。
为了避免这种情况,您需要使用信号量来确保首先不要下载太多文件,从而防止过度使用RAM。
这里是使用semaphore的示例。
“这是否引入了一个线程(假设我默认使用ThreadPoolExecutor),从而使其成为异步多线程程序而不是异步单线程程序?”
我不确定我是否理解了您的问题,但是是的,您的代码将使用线程,但仅save_result将在这些线程中执行。所有其他代码仍在单个主线程中运行。没有任何问题。
此外,这是否确保每次只写入一个最早的结果到文件中?
是的,它确实如此[*]。更准确地说,您代码片段中最后一行的关键字await将确保它:
_ = await loop.run_in_executor(None, save_result, earliest_result)

您可以这样阅读它:“异步地执行run_in_executor,并在此行暂停执行流,直到run_in_executor完成并返回结果”。
[*] 是的,如果你一开始不并行运行多个 f in as_completed(aws) 循环的话。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接