Python Pandas与SQLAlchemy中的to_sql:如何加速导出到MS SQL?

82

我的数据框有大约155,000行和12列。 如果我使用dataframe.to_csv导出到csv文件,输出的文件大小为11MB(几乎是瞬间生成)。

但是,如果我使用to_sql方法将其导出到Microsoft SQL Server,需要5到6分钟的时间!没有列是文本:只有int、float、bool和日期。我曾经看到过ODBC驱动程序设置nvarchar(max)会减慢数据传输速度的案例,但这里不可能是这种情况。

有什么建议可以加快导出过程吗?花费6分钟导出11MB的数据使ODBC连接几乎无法使用。

谢谢!

我的代码是:

import pandas as pd
from sqlalchemy import create_engine, MetaData, Table, select
ServerName = "myserver"
Database = "mydatabase"
TableName = "mytable"

engine = create_engine('mssql+pyodbc://' + ServerName + '/' + Database)
conn = engine.connect()

metadata = MetaData(conn)

my_data_frame.to_sql(TableName,engine)

我能想到的唯一方法就是仅导出结构,即列名和数据类型,但不包括行,然后将文件导出为CSV格式,并使用类似于导入/导出向导的工具将CSV文件附加到SQL表中。这样我就不必再次定义所有列类型;这很重要,因为导入工具倾向于读取前x行以猜测数据类型,如果前几行都是NULL,则猜测将是错误的。然而,事实仍然存在,即to_sql方法除了用于小型表之外几乎无法使用。您是否在其他数据库中也遇到过这种情况? - Pythonista anonymous
1
我在家里也试过同样的事情,使用在我的电脑上运行的 SQL Server Express 和 Python 将一个 100 万行 x 12 列的随机数数据框转移到 SQL 中需要 2 分钟(CSV 大小为 228MB)。虽然不是非常快,但可以接受。 在连接到几英里外的 SQL Server 的工作 PC 上,即使对比一个更小的文件,也需要 6 分钟。您知道 pandas、SQLAlchemy 或 pyodbc 中是否有任何参数可以加快传输速度吗?我经常使用许多其他工具连接到同一个 SQL Server,但从未遇到这么慢的情况。谢谢! - Pythonista anonymous
1
有人吗?我还验证了pandas.read_sql_table方法相当快。只有写入是慢的,即使写入一个没有约束的表格。有什么想法吗?我不能是唯一一个经历过这种情况的人,但我似乎在网上找不到任何文档... :( - Pythonista anonymous
也许尝试按块大小分割它?例如,通过for循环遍历10000行的块(my_data_frame.to_sql(TableName,engine,chunksize=10000))。 - Gohawks
1
或者只需将数据导出为 CSV,然后使用批量插入(非常快)。您将需要构建一个格式文件,但这可能是值得的。 [链接](https://msdn.microsoft.com/en-us/library/ms188365.aspx) - Gohawks
12个回答

115

最近我遇到了相同的问题,并且想为其他人添加一个答案。to_sql似乎会为每一行发送一次INSERT查询,这使得它非常慢。但是从 0.24.0版本开始,在pandas.to_sql()中有一个method参数,您可以定义自己的插入函数,或者只需使用method='multi'来告诉pandas在单个INSERT查询中传递多行,这样速度就快得多。

请注意,您的数据库可能具有参数限制。在这种情况下,您还必须定义一个块大小。

因此,解决方案应该简单地看起来像这样:

my_data_frame.to_sql(TableName, engine, chunksize=<yourParameterLimit>, method='multi')

如果您不知道您的数据库参数限制,请尝试不使用chunksize参数来运行。它将运行或者给您一个告诉您您的限制的错误信息。


17
在将数据加载到Postgres时,我使用了method='multi',这使得加载速度提高了1000倍 :) 有900k行数据在6小时内无法加载完成,但是当我使用'multi'时,只用了5分钟。谢谢您的提示。 - zwornik
2
一样。这个工作得很好。有人可以在答案中解释一下chunksize吗? - technazi
2
如我在我的答案中所述,一次性发送所有行可能会超出数据库参数限制并导致错误。为了避免这种情况,您可以指定一个“块大小”。这将把插入分为指定块大小的行数块。如果您的数据库具有例如100,000个参数限制,并且您的DataFrame具有1百万行,则除非添加 chunksize=100000,否则会失败。 - NemesisMF
2
为什么这不是默认参数呢 ;-( - LYu
3
如@Fips所述,对于SQL服务器,您需要使用2100 // len(df.columns)来设置块大小(chunksize),如果您不这样做,您可能会收到相当晦涩的错误消息。此外,对于我的具有大量列且进行约10,000次插入的表格而言,并没有提高速度,因此效果因人而异。 - user12861
显示剩余5条评论

21

DataFrame.to_sql方法会将插入语句生成到您的ODBC连接器中,然后由ODBC连接器作为常规插入进行处理。

当这个过程变慢时,不是pandas的问题。

DataFrame.to_sql方法的输出保存到文件中,然后通过ODBC连接器回放这个文件,所需的时间相同。

将数据批量导入数据库的正确方法是生成一个csv文件,然后使用加载命令,在MS SQL数据库中称为BULK INSERT

例如:

BULK INSERT mydatabase.myschema.mytable
FROM 'mydatadump.csv';

语法参考如下:

BULK INSERT 
   [ database_name . [ schema_name ] . | schema_name . ] [ table_name | view_name ] 
      FROM 'data_file' 
     [ WITH 
    ( 
   [ [ , ] BATCHSIZE = batch_size ] 
   [ [ , ] CHECK_CONSTRAINTS ] 
   [ [ , ] CODEPAGE = { 'ACP' | 'OEM' | 'RAW' | 'code_page' } ] 
   [ [ , ] DATAFILETYPE = 
      { 'char' | 'native'| 'widechar' | 'widenative' } ] 
   [ [ , ] FIELDTERMINATOR = 'field_terminator' ] 
   [ [ , ] FIRSTROW = first_row ] 
   [ [ , ] FIRE_TRIGGERS ] 
   [ [ , ] FORMATFILE = 'format_file_path' ] 
   [ [ , ] KEEPIDENTITY ] 
   [ [ , ] KEEPNULLS ] 
   [ [ , ] KILOBYTES_PER_BATCH = kilobytes_per_batch ] 
   [ [ , ] LASTROW = last_row ] 
   [ [ , ] MAXERRORS = max_errors ] 
   [ [ , ] ORDER ( { column [ ASC | DESC ] } [ ,...n ] ) ] 
   [ [ , ] ROWS_PER_BATCH = rows_per_batch ] 
   [ [ , ] ROWTERMINATOR = 'row_terminator' ] 
   [ [ , ] TABLOCK ] 
   [ [ , ] ERRORFILE = 'file_name' ] 
    )] 

8
有兴趣通过Python进行向SQL Server批量插入的人可能对查看我在相关问题中的答案也感兴趣。 - Gord Thompson
这么快...快速 - DonkeyKong

13
你可以使用这个:使它更快的是pandas to_sql的method参数。希望这有所帮助。 这对我来说的结果是从无限时间到8秒。

df = pd.read_csv('test.csv')

conn = create_engine(<connection_string>)

start_time = time.time()
df.to_sql('table_name', conn, method='multi',index=False, if_exists='replace')
print("--- %s seconds ---" % (time.time() - start_time))

此内容截止到2022年3月仍然有效。 - janicebaratheon

8

使用SQLAlchemy>=1.3创建engine对象时,设置fast_executemany=True参考文档


3
「多合一」的解决方案虽然对其他人有用,但并没有帮助到我,不过这个解决方案却行得通。谢谢。 - user12861

6
你可以使用d6tstack,它具有将快速的pandas导入SQL功能,因为它使用本地DB导入命令。它支持MS SQL、Postgres和MYSQL。
uri_psql = 'postgresql+psycopg2://usr:pwd@localhost/db'
d6tstack.utils.pd_to_psql(df, uri_psql, 'table')
uri_mssql = 'mssql+pymssql://usr:pwd@localhost/db'
d6tstack.utils.pd_to_mssql(df, uri_mssql, 'table', 'schema') # experimental

这也适用于导入多个CSV文件,数据模式发生变化和/或在写入数据库之前使用pandas进行预处理,详见示例笔记本中的进一步说明。

d6tstack.combine_csv.CombinerCSV(glob.glob('*.csv'), 
    apply_after_read=apply_fun).to_psql_combine(uri_psql, 'table')

在连接字符串中,我应该放置用户名和密码?你能给个例子吗? - pyeR_biz
谢谢!我在一个有3000万行的pandas数据框上运行了pd_to_psql,只用了约4分钟。 - pedrostrusso

6

为什么 pandas.DataFrame.to_sql 很慢?

当将数据从 pandas 上传到 Microsoft SQL Server 时,大部分时间实际上是在将从 pandas 转换成 MS SQL ODBC 驱动程序所需的 Python 对象表示形式。 pandas 比基本的 Python 代码更快的原因之一是它使用的是整数/浮点数/... 的精简本地数组,没有与其相应的 Python 对象的同样开销。 to_sql 方法实际上是将所有这些精简列转换为许多单独的 Python 对象,因此它不像其他 pandas 操作一样得到常规的性能处理。

使用 turbodbc.Cursor.insertmanycolumns 可以加速

给定一个 pandas.DataFrame,您可以使用 turbodbcpyarrow 插入数据,比将数据转换为 Python 对象时减少了转换开销。

import pyarrow as pa
import turbodbc

cursor = …  # cursor to a MS SQL connection initiated with turbodbc
df = …  # the pd.DataFrame to be inserted

# Convert the pandas.DataFrame to a pyarrow.Table, most of the columns
# will be zero-copy and thus this is quite fast.
table = pa.Table.from_pandas(table)

# Insert into the database
cursor.executemanycolumns("INSERT INTO my_table VALUES (?, ?, ?)",
                           table)

为什么这更快?

与将 pd.DataFrame 转换为 Python 对象集合,再转换为 ODBC 数据结构不同,我们采用了一种转换路径 pd.DataFrame -> pyarrow.Table -> ODBC 结构。这种方式更有效率,因为:

  • pandas.DataFrame 的大部分列可以转换为 pyarrow.Table 中的列,而不需要复制。表的列将引用相同的内存。因此,实际上不进行任何转换。
  • 转换完全使用本地代码和本地类型完成。这意味着只要没有 object 类型的列,就不会出现 Python 对象的开销。

虽然这是一种敏锐的分析,但我不能使用这个解决方案,因为pyarrow库太大了(50MB)。AWS Lambdas对所有库有250MB的限制,在我们的情况下,我们已经接近这个限制。换句话说,不使用额外库的解决方案总是更受欢迎的。尽管如此,在分析慢性能时,这仍然是一个值得考虑的重要点。 - autonopy

5
对于版本号大于等于1.3的sqlalchemy,不要使用to_sql()的方法参数,而是在sqlalchemy's create_engine()中使用fast_executemany=True。这应该至少和method="multi"一样快,同时避免了T-SQL对存储过程的2100个参数值的限制,这会导致在这里看到的错误。
感谢来自相同链接的Gord Thompson。

1
对于MS ODBC来说,最好将to_sql保留为默认值(不使用multi),并在引擎中使用fast_executemany。 - wordsforthewise

3

这行代码让我时间不够用,而且内存也不够(一个从120MB的CSV文件中加载的DataFrame占用了超过18GB的空间):

df.to_sql('my_table', engine, if_exists='replace', method='multi', dtype={"text_field": db.String(64), "text_field2": db.String(128), "intfield1": db.Integer(), "intfield2": db.Integer(), "floatfield": db.Float()})

这是帮助我同时导入和跟踪插入进度的代码:

import sqlalchemy as db
engine = db.create_engine('mysql://user:password@localhost:3306/database_name', echo=False)
connection = engine.connect()
metadata = db.MetaData()

my_table = db.Table('my_table', metadata,
              db.Column('text_field', db.String(64), index=True),
              db.Column('text_field2', db.String(128), index=True),
              db.Column('intfield1', db.Integer()),
              db.Column('intfield2', db.Integer()),
              db.Column('floatfield', db.Float())
             )
metadata.create_all(engine)
kw_dict = df.reset_index().sort_values(by="intfield2", ascending=False).to_dict(orient="records")

batch_size=10000
for batch_start in range(0, len(kw_dict), batch_size):
    print("Inserting {}-{}".format(batch_start, batch_start + batch_size))
    connection.execute(my_table.insert(), kw_dict[batch_start:batch_start + batch_size])

2
你应该在pd.to_sql中添加chunksize参数,例如5000。正如文档所说,chunksize int, 指定一次写入的批次中的行数。默认情况下,所有行将一次性写入。 - Ferris

2

基于这个答案 - Aseem

您可以使用copy_from方法来模拟使用游标对象进行批量加载。这在Postgres上进行了测试,您可以尝试在您的数据库上使用:

import pandas as pd
from sqlalchemy import create_engine, MetaData, Table, select
from StringIO import StringIO

ServerName = "myserver"
Database = "mydatabase"
TableName = "mytable"

engine = create_engine('mssql+pyodbc://' + ServerName + '/' + Database) #don't forget to add a password if needed

my_data_frame.head(0).to_sql(TableName, engine, if_exists='replace', index=False)  # create an empty table - just for structure
conn = engine.raw_connection()
cur = conn.cursor()
output = StringIO()
my_data_frame.to_csv(output, sep='\t', header=False, index=False) # a CSV that will be used for the bulk load
output.seek(0)
cur.copy_from(output, TableName, null="")  # null values become ''
conn.commit()
conn.close()
cur.close()

2

如果有帮助的话,以下是我解决这个问题的方法。根据我所读的内容,pandas tosql方法一次只加载一条记录。

您可以创建一个批量插入语句,每次加载1000行并提交该事务,而不是每次提交单行。这大大提高了速度。

最初的回答如上,希望能对您有所帮助。

import pandas as pd
from sqlalchemy import create_engine
import pymssql
import os

connect_string  = [your connection string]
engine = create_engine(connect_string,echo=False)
connection = engine.raw_connection()
cursor = connection.cursor()

def load_data(report_name):
    # my report_name variable is also my sql server table name so I use that variable to create table name string
    sql_table_name = 'AR_'+str(report_name)
    global chunk # to QC chunks that fail for some reason
    for chunk in pd.read_csv(report_full_path_new,chunksize=1000):
        chunk.replace('\'','\'\'',inplace=True,regex=True) #replace single quotes in data with double single quotes to escape it in mysql
        chunk.fillna('NULL',inplace=True)

        my_data = str(chunk.to_records(index=False).tolist()) # convert data to string 
        my_data = my_data[1:-1] # clean up the ends
        my_data = my_data.replace('\"','\'').replace('\'NULL\'','NULL') #convert blanks to NULLS for mysql
        sql_table_name = [your sql server table name]

        sql = """
        INSERT INTO {0} 
        VALUES {1}

         """.format(sql_table_name,my_data)

        cursor.execute(sql)
        # you must call commit() to persist your data if you don't set autocommit to True
        connection.commit()

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接