如何在FastAPI中上传CSV文件并将其转换为JSON?

5
我正在尝试将我的.csv文件上传到我的FastAPI服务器,然后将其转换为JSON并返回给客户端。但是,当我尝试直接处理它(而不将其存储在某个地方)时,我会遇到以下错误:
Error : FileNotFoundError: [Error 2] No such file or directory : "testdata.csv"

这是我的FastAPI代码:
async def upload(file: UploadFile = File(...)):
    data = {}    
    with open(file.filename,encoding='utf-8') as csvf:
        csvReader = csv.DictReader(csvf)
        for rows in csvReader:             
            key = rows['No']
            data[key] = rows    
    return {data}```


os.getcwd()的输出是什么?它和testdata.csv的位置相同吗? - BrokenBenchmark
实际上,我直接在用户界面上传文件,没有存储在任何地方,所以当我使用getcwd()命令时,我确实得到了200响应代码,但响应体为[ null ]。 - sourabh
3个回答

16

以下是将上传的.csv文件转换为JSON的各种选项。以下示例中使用以下.csv样本文件。

data.csv

Id,name,age,height,weight
1,Alice,20,62,120.6
2,Freddie,21,74,190.6
3,Bob,17,68,120.0

选项1

csv.DictReader()方法也可以接受文件对象作为file参数。FastAPI的UploadFile使用Python的SpooledTemporaryFile, 一个类似文件的对象(关于这个,请看这个答案)。你可以通过UploadFile对象的.file属性访问它。然而,由于FastAPI/Starlette以bytes模式打开文件,如果直接将其传递给csv.DictReader()方法,你会得到一个错误,即_csv.Error: iterator should return strings, not bytes。因此,你可以使用codecs.iterdecode()(如这个答案所建议的)来使用增量解码器迭代地解码输入(在这种情况下从bytesstr)。例如:

from fastapi import FastAPI, File, UploadFile
import csv
import codecs

app = FastAPI()
    
@app.post("/upload")
def upload(file: UploadFile = File(...)):
    csvReader = csv.DictReader(codecs.iterdecode(file.file, 'utf-8'))
    data = {}
    for rows in csvReader:             
        key = rows['Id']  # Assuming a column named 'Id' to be the primary key
        data[key] = rows  
    
    file.file.close()
    return data

输出

{
  "1": {
    "Id": "1",
    "name": "Alice",
    "age": "20",
    "height": "62",
    "weight": "120.6"
  },
  "2": {
    "Id": "2",
    "name": "Freddie",
    "age": "21",
    "height": "74",
    "weight": "190.6"
  },
  "3": {
    "Id": "3",
    "name": "Bob",
    "age": "17",
    "height": "68",
    "weight": "120.0"
  }
}

如果你想要返回一个字典列表,可以使用下面的代码。由于下面的代码需要在返回结果时打开文件,因此会阻止服务器在完成后正确关闭文件(通过调用file.file.close())。因此,可以使用BackgroundTasks(在返回响应后运行)来关闭文件:
from fastapi import FastAPI, File, UploadFile, BackgroundTasks
import csv
import codecs

app = FastAPI()

@app.post("/upload")
def upload(background_tasks: BackgroundTasks, file: UploadFile = File(...)):
    csvReader = csv.DictReader(codecs.iterdecode(file.file, 'utf-8'))
    background_tasks.add_task(file.file.close)
    return list(csvReader)

输出

[
  {
    "Id": "1",
    "name": "Alice",
    "age": "20",
    "height": "62",
    "weight": "120.6"
  },
  {
    "Id": "2",
    "name": "Freddie",
    "age": "21",
    "height": "74",
    "weight": "190.6"
  },
  {
    "Id": "3",
    "name": "Bob",
    "age": "17",
    "height": "68",
    "weight": "120.0"
  }
]

方案二

另一个解决方案是读取上传文件的字节数据,使用contents = file.file.read()(关于async读/写,请参见此答案),然后将字节转换为字符串,并最终将它们加载到内存文本缓冲区中(即StringIO),如此处所述,可以将其传递给csv.DictReader()。以下是示例:

from fastapi import FastAPI, File, UploadFile
import csv
from io import StringIO

app = FastAPI()
    
@app.post("/upload")
def upload(file: UploadFile = File(...)):
    data = {}
    contents = file.file.read()
    buffer = StringIO(contents.decode('utf-8'))
    csvReader = csv.DictReader(buffer)
    for row in csvReader:  
        key = row['Id']  # Assuming a column named 'Id' to be the primary key
        data[key] = row  
    
    buffer.close()
    file.file.close()
    return data

选项 3

按照您的方法解决问题,即使用文件路径读取 csv 文件,而不是像前面描述的直接使用文件内容或文件对象。您可以将文件内容复制到 NamedTemporaryFile 中,该对象与 UploadFile 提供的 SpooledTemporaryFile 不同,它“在文件系统中有一个可见名称”,可用于打开文件(有关更多信息,请参见 此答案)。下面是一个有效的示例:

from fastapi import FastAPI, File, UploadFile
from tempfile import NamedTemporaryFile
import os
import csv

app = FastAPI()
    
@app.post("/upload")
def upload(file: UploadFile = File(...)):
    data = {}
    temp = NamedTemporaryFile(delete=False)
    try:
        try:
            contents = file.file.read()
            with temp as f:
                f.write(contents);
        except Exception:
            return {"message": "There was an error uploading the file"}
        finally:
            file.file.close()
        
        with open(temp.name,'r', encoding='utf-8') as csvf:
            csvReader = csv.DictReader(csvf)
            for rows in csvReader:             
                key = rows['Id']  # Assuming a column named 'Id' to be the primary key
                data[key] = rows
    except Exception:
        return {"message": "There was an error processing the file"}
    finally:
        #temp.close()  # the `with` statement above takes care of closing the file
        os.remove(temp.name)  # Delete the file
    
    return data

选项4

你也可以将上传文件的字节写入BytesIO流中,然后将其转换为Pandas DataFrame。接下来,使用to_dict()方法(如此答案所述),将DataFrame转换为字典并返回它 - FastAPI在幕后将其转换为JSON兼容数据,使用jsonable_encoder, 最后序列化数据并返回一个JSONResponse(有关详细信息,请参见此答案)。作为更快速的替代方法,您可以使用to_json()方法并直接返回自定义Response,如此答案的选项1(更新2)所述。

from fastapi import FastAPI, File, UploadFile
from io import BytesIO
import pandas as pd

app = FastAPI()
    
@app.post("/upload")
def upload(file: UploadFile = File(...)):
    contents = file.file.read()
    buffer = BytesIO(contents)
    df = pd.read_csv(buffer)
    buffer.close()
    file.file.close()
    return df.to_dict(orient='records')

注意: 如果文件太大,占用了所有的内存和/或处理时间太长并且返回结果太慢,请参考这个答案,以及这个答案这个答案


file.read() 必须改为 file.file.read() - snowmanstark
@snowmanstark 所有的 UploadFile 方法 "在底层调用相应的文件方法(使用内部的 SpooledTemporaryFile"。 请查看文档 - Chris
我之前说错了,应该是 "The file.read() must be file.file.read()"。如果我们想要将以下代码转换: """ contents = await file.read() decoded = contents.decode() buffer = StringIO(decoded) """ 那么正确的写法应该是 buffer = StringIO(file.file.read().decode()) - snowmanstark
除非您已决定使用 def 而不是 async def 声明路由,否则不应将内容的异步读取更改为同步读取。如果在使用 async def 声明路由时这样做,将导致整个服务器被阻塞,直到该操作完成。请参考以下引用了解 async/await 的概念:1, 2, 3 - Chris
1
谢谢Chris,我明白你的意思了! - snowmanstark

0
你收到“Error : FileNotFoundError: [Error 2] No such file or directory : "testdata.csv"”的原因是你试图读取一个未存储在本地的文件。
如果想要以这种方式读取该文件,你应该在继续之前保存上传的文件:
async def upload(uploaded_file: UploadFile = File(...)):
    # save csv to local dir
    csv_name = uploaded_file.filename
    csv_path = 'path_to/csv_dir/'
    file_path = os.path.join(csv_path, csv_name)
    with open(file_path, mode='wb+') as f:
        f.write(uploaded_file.file.read())

    # read csv and convert to json
    data = {}
    with open(file_path, mode='r', encoding='utf-8') as csvf:
        csvReader = csv.DictReader(csvf)
        for rows in csvReader:             
            key = rows['No']
            data[key] = rows    
    return {data}

但是我试图直接完成它,就像在UI中上传文件时直接获取并转换为JSON格式,然后将该JSOn数据保存到MySQL数据库中,但我的代码只获取已上传文件的名称,而不是实际文件本身。 - sourabh
你尝试过在函数定义后直接使用 content = await file.read() 吗?然后你可以使用 file.content_type 属性获取文件扩展名来确定处理方法。 - Lars

0
在异步函数upload()中,file已经打开,您可以直接从中提取字符,无需再次打开。此外,在FastAPI中,类UploadFile实际上是从标准库tempfile.SpooledTemporaryFile派生而来的,不能通过指定临时文件的路径来访问它。
例如,如果您在Unix-like系统中使用CPython并在upload()中读取file.filename的值,则返回一个数字而不是格式良好的路径,因为任何SpooledTemporaryFile类的实例都将创建一个文件描述符(当当前存储的数据超过max_size时),并简单地返回文件描述符(在Unix中应该是一个数字)以访问SpooledTemporaryFile.filename

那么我应该如何使用这个Spooled临时文件,我非常困惑。 - sourabh

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接