为什么使用Python异步比同步从文件中读取和调用API速度更慢?

3

我有一个大文件,每一行都有一个JSON记录。我正在编写一个脚本,通过API将其中的子集上传到CouchDB,并尝试使用不同的方法来确定最快的方法。以下是我在本地CouchDB实例上找到的最快到最慢的方法:

  1. 将每个所需的记录读入内存。当所有记录都在内存中时,为每个记录生成上传协程,并同时收集/运行所有协程。

  2. 同步读取文件,遇到所需记录时,同步上传。

  3. 使用 aiofiles 读取文件,遇到所需记录时进行异步更新。

第1种方法比其他两种(约快两倍)要快得多。我对第2种方法比第3种方法更快感到困惑,特别是与这里的示例形成对比,后者异步运行的时间只有同步运行的一半(未提供同步代码,我不得不自己重写)。是否由于从文件 I/O 切换到 HTTP I/O 的上下文切换,特别是文件读取比 API 上传发生得更频繁?

另外,以下是每种方法的 Python 伪代码表示:

方法 1 - 同步文件 I/O,异步 HTTP I/O

import json
import asyncio
import aiohttp

records = []
with open('records.txt', 'r') as record_file:
    for line in record_file:
        record = json.loads(line)
        if valid(record):
            records.append(record)

async def batch_upload(records):
    async with aiohttp.ClientSession() as session:
        tasks = []
        for record in records:
            task = async_upload(record, session)
            tasks.append(task)  
        await asyncio.gather(*tasks)

asyncio.run(batch_upload(properties))

方法2 - 同步文件IO,同步HTTP IO

import json

with open('records.txt', 'r') as record_file:
    for line in record_file:
        record = json.loads(line)
        if valid(record):
            sync_upload(record)

方案三 - 异步文件IO,异步HTTP IO

import json
import asyncio
import aiohttp
import aiofiles

async def batch_upload()
    async with aiohttp.ClientSession() as session:
        async with open('records.txt', 'r') as record_file:
            line = await record_file.readline()
            while line:
                record = json.loads(line)
                if valid(record):
                    await async_upload(record, session)
                line = await record_file.readline()

asyncio.run(batch_upload())

我正在开发的文件大小约为1.3GB,总共有10万条记录,我上传了其中的691条。每次上传都会发出一个GET请求以查看CouchDB中是否已存在该记录。如果存在,则使用PUT更新CouchDB记录中的任何新信息;如果不存在,则使用POST将该记录发送到数据库中。因此,每次上传包含两个API请求。为了开发目的,我仅创建记录,因此运行GET和POST请求,总计1382个API调用。
方法一需要大约17秒,方法二需要大约33秒,方法三需要大约42秒。

1
你能加上一些时间吗?你执行了多少个HTTP请求?我知道它们取决于你的系统/网络,但对于确定你的用例很有用。 - Sam Mason
我已更新帖子,包括时间和请求数据。 - James Kelleher
1个回答

2

你的代码使用了异步(async)关键字,但实际上它是同步执行的,在这种情况下,它会比同步方法更慢。如果没有有效地构建/使用异步方法,异步并不能加速执行。

你可以创建两个协程(coroutine)并让它们并行运行,也许这样可以加速操作。

示例:

最初的回答
#!/usr/bin/env python3

import asyncio


async def upload(event, queue):
    # This logic is not so correct when it comes to shutdown,
    # but gives the idea
    while not event.is_set():
        record = await queue.get()
        print(f'uploading record : {record}')
    return


async def read(event, queue):
    # dummy logic : instead read here and populate the queue.
    for i in range(1, 10):
        await queue.put(i)
    # Initiate shutdown..
    event.set()


async def main():
    event = asyncio.Event()
    queue = asyncio.Queue()

    uploader = asyncio.create_task(upload(event, queue))
    reader = asyncio.create_task(read(event, queue))
    tasks = [uploader, reader]

    await asyncio.gather(*tasks)


if __name__ == '__main__':
    asyncio.run(main())

我实现了一个基于队列的方法,但它似乎只比同步方法快一点点(大约30秒到33秒左右)。最快的方法仍然是将所有内容加载到内存中,并在不进行任何文件I/O的情况下异步上传。有什么想法为什么会这样?也许这是因为在我的特定情况下读取行数与上传行数的比例相对较低? - James Kelleher
刚刚测试了从文件中上传每个记录,而不仅仅是其中的一部分,结果在这种情况下,队列似乎比批量上传更快! - James Kelleher

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接