我想把csv文件加载到数据库中。
如果你的CSV文件很大,使用INSERT语句是非常低效的。你应该使用批量加载机制,不同的数据库有不同的方法。例如,在PostgreSQL中,你应该使用"COPY FROM"方法:
with open(csv_file_path, 'r') as f:
conn = create_engine('postgresql+psycopg2://...').raw_connection()
cursor = conn.cursor()
cmd = 'COPY tbl_name(col1, col2, col3) FROM STDIN WITH (FORMAT CSV, HEADER FALSE)'
cursor.copy_expert(cmd, f)
conn.commit()
from numpy import genfromtxt
from time import time
from datetime import datetime
from sqlalchemy import Column, Integer, Float, Date
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
def Load_Data(file_name):
data = genfromtxt(file_name, delimiter=',', skip_header=1, converters={0: lambda s: str(s)})
return data.tolist()
Base = declarative_base()
class Price_History(Base):
#Tell SQLAlchemy what the table name is and if there's any table-specific arguments it should know about
__tablename__ = 'Price_History'
__table_args__ = {'sqlite_autoincrement': True}
#tell SQLAlchemy the name of column and its attributes:
id = Column(Integer, primary_key=True, nullable=False)
date = Column(Date)
opn = Column(Float)
hi = Column(Float)
lo = Column(Float)
close = Column(Float)
vol = Column(Float)
if __name__ == "__main__":
t = time()
#Create the database
engine = create_engine('sqlite:///csv_test.db')
Base.metadata.create_all(engine)
#Create the session
session = sessionmaker()
session.configure(bind=engine)
s = session()
try:
file_name = "t.csv" #sample CSV file used: http://www.google.com/finance/historical?q=NYSE%3AT&ei=W4ikVam8LYWjmAGjhoHACw&output=csv
data = Load_Data(file_name)
for i in data:
record = Price_History(**{
'date' : datetime.strptime(i[0], '%d-%b-%y').date(),
'opn' : i[1],
'hi' : i[2],
'lo' : i[3],
'close' : i[4],
'vol' : i[5]
})
s.add(record) #Add all the records
s.commit() #Attempt to commit all the records
except:
s.rollback() #Rollback the changes on error
finally:
s.close() #Close the connection
print "Time elapsed: " + str(time() - t) + " s." #0.091s
import sqlite3
import time
from numpy import genfromtxt
def dict_factory(cursor, row):
d = {}
for idx, col in enumerate(cursor.description):
d[col[0]] = row[idx]
return d
def Create_DB(db):
#Create DB and format it as needed
with sqlite3.connect(db) as conn:
conn.row_factory = dict_factory
conn.text_factory = str
cursor = conn.cursor()
cursor.execute("CREATE TABLE [Price_History] ([id] INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL UNIQUE, [date] DATE, [opn] FLOAT, [hi] FLOAT, [lo] FLOAT, [close] FLOAT, [vol] INTEGER);")
def Add_Record(db, data):
#Insert record into table
with sqlite3.connect(db) as conn:
conn.row_factory = dict_factory
conn.text_factory = str
cursor = conn.cursor()
cursor.execute("INSERT INTO Price_History({cols}) VALUES({vals});".format(cols = str(data.keys()).strip('[]'),
vals=str([data[i] for i in data]).strip('[]')
))
def Load_Data(file_name):
data = genfromtxt(file_name, delimiter=',', skiprows=1, converters={0: lambda s: str(s)})
return data.tolist()
if __name__ == "__main__":
t = time.time()
db = 'csv_test_sql.db' #Database filename
file_name = "t.csv" #sample CSV file used: http://www.google.com/finance/historical?q=NYSE%3AT&ei=W4ikVam8LYWjmAGjhoHACw&output=csv
data = Load_Data(file_name) #Get data from CSV
Create_DB(db) #Create DB
#For every record, format and insert to table
for i in data:
record = {
'date' : i[0],
'opn' : i[1],
'hi' : i[2],
'lo' : i[3],
'close' : i[4],
'vol' : i[5]
}
Add_Record(db, record)
print "Time elapsed: " + str(time.time() - t) + " s." #3.604s
class Asset_Analysis(Base):
#Tell SQLAlchemy what the table name is and if there's any table-specific arguments it should know about
__tablename__ = 'Asset_Analysis'
__table_args__ = {'sqlite_autoincrement': True}
#tell SQLAlchemy the name of column and its attributes:
id = Column(Integer, primary_key=True, nullable=False)
fid = Column(Integer, ForeignKey('Price_History.id'))
这个语句将"fid"列指向Price_History表的id列,作为外键。
希望对您有所帮助!
genfromtxt
返回错误:genfromtxt() got an unexpected keyword argument 'skiprows'
。Numpy的版本是1.12.1-3
(Debian 9.0)。 - Faheem Mithaskiprows
已被弃用。请将 skiprows=1
替换为 skip_header=1
。我已经修改了我的答案以反映这个变化。 - Manuel J. Diaz我曾经遇到过完全相同的问题,并且我发现使用pandas进行两步处理更加容易,这似乎有些矛盾:
import pandas as pd
with open(csv_file_path, 'r') as file:
data_df = pd.read_csv(file)
data_df.to_sql('tbl_name', con=engine, index=True, index_label='id', if_exists='replace')
请注意,我的方法类似于这个,但是不知何故 Google 把我引到了这个帖子,所以我想分享一下。
to_sql
。 - Nickolayengine.execute(my_table.insert(), list_of_row_dicts)
,详细描述在SQLAlchemy教程的"执行多个语句"部分中。这有时被称为"executemany"风格的调用,因为它会导致一个executemany
DBAPI调用。DB驱动程序可能会执行一个单个的多值INSERT .. VALUES (..), (..), (..)
语句,这会减少到DB的往返次数并加快执行速度。
bulk_save_objects
或bulk_insert_mappings
更快。import csv
from sqlalchemy import create_engine, Table, Column, Integer, MetaData
engine = create_engine('sqlite:///sqlalchemy.db', echo=True)
metadata = MetaData()
# Define the table with sqlalchemy:
my_table = Table('MyTable', metadata,
Column('foo', Integer),
Column('bar', Integer),
)
metadata.create_all(engine)
insert_query = my_table.insert()
# Or read the definition from the DB:
# metadata.reflect(engine, only=['MyTable'])
# my_table = Table('MyTable', metadata, autoload=True, autoload_with=engine)
# insert_query = my_table.insert()
# Or hardcode the SQL query:
# insert_query = "INSERT INTO MyTable (foo, bar) VALUES (:foo, :bar)"
with open('test.csv', 'r', encoding="utf-8") as csvfile:
csv_reader = csv.reader(csvfile, delimiter=',')
engine.execute(
insert_query,
[{"foo": row[0], "bar": row[1]}
for row in csv_reader]
)
用逗号分隔并带有标题名称的CSV文件转换到PostrgeSQL
import csv
import pandas as pd
from sqlalchemy import create_engine
# Create engine to connect with DB
try:
engine = create_engine(
'postgresql://username:password@localhost:5432/name_of_base')
except:
print("Can't create 'engine")
# Get data from CSV file to DataFrame(Pandas)
with open('test.csv', newline='') as csvfile:
reader = csv.DictReader(csvfile)
columns = ['one', 'two', 'three']
df = pd.DataFrame(data=reader, columns=columns)
# Standart method of Pandas to deliver data from DataFrame to PastgresQL
try:
with engine.begin() as connection:
df.to_sql('name_of_table', con=connection, index_label='id', if_exists='replace')
print('Done, ok!')
except Exception as e:
print(e)
COPY ... FROM
。import io
import sqlalchemy
Base: sqlalchemy.orm.DeclarativeMeta = db.orm.declarative_base()
def upload_to_model_table(
Model: Base,
csv_stream: io.IOBase,
engine: sqlalchemy.engine,
header=True,
delimiter=';'
):
""" It's assumed you're using postgres, otherwise this won't work. """
fieldnames = ', '.join([
f'"{col.name}"' for col in Model.__mapper__.columns if not col.primary_key
])
sql = """
COPY {0} ({1}) FROM stdin WITH (format CSV, header {2}, delimiter '{3}')
""".format(Model.__tablename__, fieldnames, header, delimiter)
chunk_size = getattr(csv_stream, "_DEFAULT_CHUNK_SIZE", 1024)
with engine.connect() as connection:
cursor = connection.connection.cursor()
cursor.copy_expert(sql, csv_stream, chunk_size)
cursor.connection.commit()
cursor.close()
copy_from
或copy_expert
。此解决方案可以一次性插入数百万行数据。 - Peter Kilczukpsycopg2.connect("...")
替换第一个语句的sqlalchemy.create_engine("...").raw_connection()
。 - bluuCOPY ... FROM ...
策略和StringIO
缓冲区只需要10秒钟,并且仅占用250 MB内存。 - kevlarr