使用SQLAlchemy将CSV文件加载到数据库中

62
我想把csv文件加载到数据库中。
6个回答

60

如果你的CSV文件很大,使用INSERT语句是非常低效的。你应该使用批量加载机制,不同的数据库有不同的方法。例如,在PostgreSQL中,你应该使用"COPY FROM"方法:

with open(csv_file_path, 'r') as f:    
    conn = create_engine('postgresql+psycopg2://...').raw_connection()
    cursor = conn.cursor()
    cmd = 'COPY tbl_name(col1, col2, col3) FROM STDIN WITH (FORMAT CSV, HEADER FALSE)'
    cursor.copy_expert(cmd, f)
    conn.commit()

9
对于任何重要的情况,您确实希望直接使用psycopg中的copy_fromcopy_expert。此解决方案可以一次性插入数百万行数据。 - Peter Kilczuk
有没有办法在不导入庞大的库的情况下实现这个功能? - Sajuuk
@Sajuuk:这个解决方案实际上只能使用psycopg2(它包含在sqlalchemy中),从而大大减少了它的依赖性(因为我们在这里并没有真正使用它,所以不需要依赖于全部的sqlalchemy)。 其想法是直接用psycopg的psycopg2.connect("...")替换第一个语句的sqlalchemy.create_engine("...").raw_connection() - bluu
5
我使用SQLAlchemy ORM(更多是出于规定而不是偏好),但这个答案确实应该被接受,至少对于Postgres来说是如此。我有一个包含> 1百万行的CSV文件,使用sqlalchemy-core的“insert”策略花费了9分钟(并且需要1.3 GB内存),这里是 最佳表现。使用COPY ... FROM ...策略和StringIO缓冲区只需要10秒钟,并且仅占用250 MB内存。 - kevlarr

57
因为SQLAlchemy的强大,我也在一个项目中使用它。它的强大之处在于面向对象的方式与数据库进行“交流”,而不是硬编码 SQL 语句,这样管理起来非常麻烦。更不用说,它还更快。
直截了当地回答你的问题,是的!使用SQLAlchemy将CSV中的数据存储到数据库中非常简单。以下是一个完整的工作示例(我使用的是SQLAlchemy 1.0.6和Python 2.7.6):
from numpy import genfromtxt
from time import time
from datetime import datetime
from sqlalchemy import Column, Integer, Float, Date
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

def Load_Data(file_name):
    data = genfromtxt(file_name, delimiter=',', skip_header=1, converters={0: lambda s: str(s)})
    return data.tolist()

Base = declarative_base()

class Price_History(Base):
    #Tell SQLAlchemy what the table name is and if there's any table-specific arguments it should know about
    __tablename__ = 'Price_History'
    __table_args__ = {'sqlite_autoincrement': True}
    #tell SQLAlchemy the name of column and its attributes:
    id = Column(Integer, primary_key=True, nullable=False) 
    date = Column(Date)
    opn = Column(Float)
    hi = Column(Float)
    lo = Column(Float)
    close = Column(Float)
    vol = Column(Float)

if __name__ == "__main__":
    t = time()

    #Create the database
    engine = create_engine('sqlite:///csv_test.db')
    Base.metadata.create_all(engine)

    #Create the session
    session = sessionmaker()
    session.configure(bind=engine)
    s = session()

    try:
        file_name = "t.csv" #sample CSV file used:  http://www.google.com/finance/historical?q=NYSE%3AT&ei=W4ikVam8LYWjmAGjhoHACw&output=csv
        data = Load_Data(file_name) 

        for i in data:
            record = Price_History(**{
                'date' : datetime.strptime(i[0], '%d-%b-%y').date(),
                'opn' : i[1],
                'hi' : i[2],
                'lo' : i[3],
                'close' : i[4],
                'vol' : i[5]
            })
            s.add(record) #Add all the records

        s.commit() #Attempt to commit all the records
    except:
        s.rollback() #Rollback the changes on error
    finally:
        s.close() #Close the connection
    print "Time elapsed: " + str(time() - t) + " s." #0.091s

(注:这不一定是“最佳”方法,但我认为这种格式对于初学者来说非常易读;它也非常快:插入251条记录只需要0.091秒!)
我认为如果你逐行查看它,你会发现使用起来非常轻松。注意缺少SQL语句--万岁!我还利用numpy在两行内加载了CSV内容,但如果你喜欢,也可以不用它。
如果你想与传统的做法进行比较,这里有一个完整的工作示例供参考:
import sqlite3
import time
from numpy import genfromtxt

def dict_factory(cursor, row):
    d = {}
    for idx, col in enumerate(cursor.description):
        d[col[0]] = row[idx]
    return d


def Create_DB(db):      
    #Create DB and format it as needed
    with sqlite3.connect(db) as conn:
        conn.row_factory = dict_factory
        conn.text_factory = str

        cursor = conn.cursor()

        cursor.execute("CREATE TABLE [Price_History] ([id] INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL UNIQUE, [date] DATE, [opn] FLOAT, [hi] FLOAT, [lo] FLOAT, [close] FLOAT, [vol] INTEGER);")


def Add_Record(db, data):
    #Insert record into table
    with sqlite3.connect(db) as conn:
        conn.row_factory = dict_factory
        conn.text_factory = str

        cursor = conn.cursor()

        cursor.execute("INSERT INTO Price_History({cols}) VALUES({vals});".format(cols = str(data.keys()).strip('[]'), 
                    vals=str([data[i] for i in data]).strip('[]')
                    ))


def Load_Data(file_name):
    data = genfromtxt(file_name, delimiter=',', skiprows=1, converters={0: lambda s: str(s)})
    return data.tolist()


if __name__ == "__main__":
    t = time.time() 

    db = 'csv_test_sql.db' #Database filename 
    file_name = "t.csv" #sample CSV file used:  http://www.google.com/finance/historical?q=NYSE%3AT&ei=W4ikVam8LYWjmAGjhoHACw&output=csv

    data = Load_Data(file_name) #Get data from CSV

    Create_DB(db) #Create DB

    #For every record, format and insert to table
    for i in data:
        record = {
                'date' : i[0],
                'opn' : i[1],
                'hi' : i[2],
                'lo' : i[3],
                'close' : i[4],
                'vol' : i[5]
            }
        Add_Record(db, record)

    print "Time elapsed: " + str(time.time() - t) + " s." #3.604s

(注意:即使按照“旧”方法,这也绝不是最好的方法,但它非常易读,并且是从SQLAlchemy方法与“旧”方法进行“1对1”转换。)
请注意SQL语句:一个用于创建表格,另一个用于插入记录。此外,注意长SQL字符串比简单类属性添加更加麻烦。到目前为止,您喜欢SQLAlchemy吗?
至于您的外键查询,当然可以。 SQLAlchemy也有这个功能。以下是一个带有外键分配的类属性的示例(假设还从sqlalchemy模块导入了ForeignKey类):
class Asset_Analysis(Base):
    #Tell SQLAlchemy what the table name is and if there's any table-specific arguments it should know about
    __tablename__ = 'Asset_Analysis'
    __table_args__ = {'sqlite_autoincrement': True}
    #tell SQLAlchemy the name of column and its attributes:
    id = Column(Integer, primary_key=True, nullable=False) 
    fid = Column(Integer, ForeignKey('Price_History.id'))

这个语句将"fid"列指向Price_History表的id列,作为外键。

希望对您有所帮助!


7
我会用旧的方式使用SQL。 - WestCoastProjects
1
这是有用的代码,但如果数据文件包含在示例中会更有帮助。那样它就真正成为了自包含的。 - Faheem Mitha
我还没有检查为什么会发生这种情况,但是genfromtxt返回错误:genfromtxt() got an unexpected keyword argument 'skiprows'。Numpy的版本是1.12.1-3(Debian 9.0)。 - Faheem Mitha
根据 https://docs.scipy.org/doc/numpy/reference/generated/numpy.genfromtxt.html#numpy.genfromtxt,Faheem,请注意 skiprows 已被弃用。请将 skiprows=1 替换为 skip_header=1。我已经修改了我的答案以反映这个变化。 - Manuel J. Diaz
1
这很酷,但这是否意味着数据首先将被读入Python,然后再插入到数据库中?直接使用数据库的批量加载器并完全跳过数据必须通过Python流动的部分不是更快吗?我相信ARA1307的答案解决了这个问题,但它是针对特定数据库的。拥有一个内部使用特定品牌数据库的批量加载命令的SQLAlchemy方法会很好。 - Ben
显示剩余5条评论

22

我曾经遇到过完全相同的问题,并且我发现使用pandas进行两步处理更加容易,这似乎有些矛盾:

import pandas as pd
with open(csv_file_path, 'r') as file:
    data_df = pd.read_csv(file)
data_df.to_sql('tbl_name', con=engine, index=True, index_label='id', if_exists='replace')

请注意,我的方法类似于这个,但是不知何故 Google 把我引到了这个帖子,所以我想分享一下。


3
如果您有一个非常大的文件需要上传到数据库中,该怎么办? - Naufal
对于中等大小的文件,您可以使用create_engine(..., fast_executemany=True)设置 sqlalchemy 引擎,这将加速 pandas 的 to_sql - Nickolay
什么是“engine”? - Dave

8
要使用SQLAlchemy将一个相对较小的CSV文件导入数据库,您可以使用engine.execute(my_table.insert(), list_of_row_dicts),详细描述在SQLAlchemy教程的"执行多个语句"部分中。这有时被称为"executemany"风格的调用,因为它会导致一个executemany DBAPI调用。DB驱动程序可能会执行一个单个的多值INSERT .. VALUES (..), (..), (..)语句,这会减少到DB的往返次数并加快执行速度。 根据SQLAlchemy的FAQ,如果不使用特定于数据库的批量加载方法(如Postgres中的COPY FROM,MySQL中的LOAD DATA LOCAL INFILE等),这就是您可以获得的最快速度。特别是比使用纯ORM(如@Manuel J. Diaz在此处的答案中所述)、bulk_save_objectsbulk_insert_mappings更快。
import csv
from sqlalchemy import create_engine, Table, Column, Integer, MetaData

engine = create_engine('sqlite:///sqlalchemy.db', echo=True)

metadata = MetaData()
# Define the table with sqlalchemy:
my_table = Table('MyTable', metadata,
    Column('foo', Integer),
    Column('bar', Integer),
)
metadata.create_all(engine)
insert_query = my_table.insert()

# Or read the definition from the DB:
# metadata.reflect(engine, only=['MyTable'])
# my_table = Table('MyTable', metadata, autoload=True, autoload_with=engine)
# insert_query = my_table.insert()

# Or hardcode the SQL query:
# insert_query = "INSERT INTO MyTable (foo, bar) VALUES (:foo, :bar)"

with open('test.csv', 'r', encoding="utf-8") as csvfile:
    csv_reader = csv.reader(csvfile, delimiter=',')
    engine.execute(
        insert_query,
        [{"foo": row[0], "bar": row[1]} 
            for row in csv_reader]
    )

1
+1 对于声明“这是最快的方法,而不需要特定于数据库的...”表示赞同 - 但如果有人确实有使用特定的批量加载选项(如COPY FROM等)的选择,那么他们可能应该使用它。 - kevlarr

3

用逗号分隔并带有标题名称的CSV文件转换到PostrgeSQL

  1. 我使用Python csv reader。CSV数据由逗号(,)分隔
  2. 然后将其转换为Pandas DataFrame。列名与您的csv文件中相同。
  3. 最后,使用引擎连接到数据库将DataFrame转换为sql。如果需要替换/追加则if_exists='replace/append'
import csv
import pandas as pd
from sqlalchemy import create_engine

# Create engine to connect with DB
try:
    engine = create_engine(
        'postgresql://username:password@localhost:5432/name_of_base')
except:
    print("Can't create 'engine")

# Get data from CSV file to DataFrame(Pandas)
with open('test.csv', newline='') as csvfile:
    reader = csv.DictReader(csvfile)
    columns = ['one', 'two', 'three']
    df = pd.DataFrame(data=reader, columns=columns)

# Standart method of Pandas to deliver data from DataFrame to PastgresQL
try:
    with engine.begin() as connection:
        df.to_sql('name_of_table', con=connection, index_label='id', if_exists='replace')
        print('Done, ok!')
except Exception as e:
        print(e)

1
这是我找到的唯一可行的方法。其他答案没有明确提交游标的连接。这也意味着你在使用现代的Python、SQLAlchemy和显然是Postgres,因为语法使用了COPY ... FROM
没有错误处理,可能不安全,并且使用ORM映射器定义中除主键外的所有列,但对于简单任务来说,它可能足够了。
import io

import sqlalchemy

Base: sqlalchemy.orm.DeclarativeMeta = db.orm.declarative_base()


def upload_to_model_table(
        Model: Base,
        csv_stream: io.IOBase,
        engine: sqlalchemy.engine,
        header=True,
        delimiter=';'
):
    """ It's assumed you're using postgres, otherwise this won't work. """
    fieldnames = ', '.join([
        f'"{col.name}"' for col in Model.__mapper__.columns if not col.primary_key
    ])

    sql = """
    COPY {0} ({1}) FROM stdin WITH (format CSV, header {2}, delimiter '{3}')
    """.format(Model.__tablename__, fieldnames, header, delimiter)

    chunk_size = getattr(csv_stream, "_DEFAULT_CHUNK_SIZE", 1024)
    with engine.connect() as connection:
        cursor = connection.connection.cursor()
        cursor.copy_expert(sql, csv_stream, chunk_size)
        cursor.connection.commit()
        cursor.close()


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接