我有一个大文件,每一行都有一个JSON记录。我正在编写一个脚本,通过API将其中的子集上传到CouchDB,并尝试使用不同的方法来确定最快的方法。以下是我在本地CouchDB实例上找到的最快到最慢的方法:
将每个所需的记录读入内存。当所有记录都在内存中时,为每个记录生成上传协程,并同时收集/运行所有协程。
同步读取文件,遇到所需记录时,同步上传。
使用
aiofiles
读取文件,遇到所需记录时进行异步更新。
第1种方法比其他两种(约快两倍)要快得多。我对第2种方法比第3种方法更快感到困惑,特别是与这里的示例形成对比,后者异步运行的时间只有同步运行的一半(未提供同步代码,我不得不自己重写)。是否由于从文件 I/O 切换到 HTTP I/O 的上下文切换,特别是文件读取比 API 上传发生得更频繁?
另外,以下是每种方法的 Python 伪代码表示:
方法 1 - 同步文件 I/O,异步 HTTP I/O
import json
import asyncio
import aiohttp
records = []
with open('records.txt', 'r') as record_file:
for line in record_file:
record = json.loads(line)
if valid(record):
records.append(record)
async def batch_upload(records):
async with aiohttp.ClientSession() as session:
tasks = []
for record in records:
task = async_upload(record, session)
tasks.append(task)
await asyncio.gather(*tasks)
asyncio.run(batch_upload(properties))
方法2 - 同步文件IO,同步HTTP IO
import json
with open('records.txt', 'r') as record_file:
for line in record_file:
record = json.loads(line)
if valid(record):
sync_upload(record)
方案三 - 异步文件IO,异步HTTP IO
import json
import asyncio
import aiohttp
import aiofiles
async def batch_upload()
async with aiohttp.ClientSession() as session:
async with open('records.txt', 'r') as record_file:
line = await record_file.readline()
while line:
record = json.loads(line)
if valid(record):
await async_upload(record, session)
line = await record_file.readline()
asyncio.run(batch_upload())
我正在开发的文件大小约为1.3GB,总共有10万条记录,我上传了其中的691条。每次上传都会发出一个GET请求以查看CouchDB中是否已存在该记录。如果存在,则使用PUT更新CouchDB记录中的任何新信息;如果不存在,则使用POST将该记录发送到数据库中。因此,每次上传包含两个API请求。为了开发目的,我仅创建记录,因此运行GET和POST请求,总计1382个API调用。
方法一需要大约17秒,方法二需要大约33秒,方法三需要大约42秒。