使用pyODBC的fast_executemany加速pandas.DataFrame.to_sql

77

我想要将一个大型的pandas.DataFrame发送到运行MS SQL的远程服务器。目前我的做法是将data_frame对象转换为元组列表,然后使用pyODBC的executemany()函数发送数据。具体操作如下:

 import pyodbc as pdb

 list_of_tuples = convert_df(data_frame)

 connection = pdb.connect(cnxn_str)

 cursor = connection.cursor()
 cursor.fast_executemany = True
 cursor.executemany(sql_statement, list_of_tuples)
 connection.commit()

 cursor.close()
 connection.close()

我开始思考是否可以通过使用 data_frame.to_sql() 方法来加快(或至少使其更易读)。我想出了以下解决方案:

 import sqlalchemy as sa

 engine = sa.create_engine("mssql+pyodbc:///?odbc_connect=%s" % cnxn_str)
 data_frame.to_sql(table_name, engine, index=False)

现在代码更易读了,但上传速度慢了至少150倍...

在使用SQLAlchemy时,是否有一种方法可以翻转fast_executemany

我使用的是pandas-0.20.3、pyODBC-4.0.21和sqlalchemy-1.1.13。

9个回答

87

编辑(2019-03-08): Gord Thompson在下面的评论中提到,来自sqlalchemy更新日志的好消息:自SQLAlchemy 1.3.0发布于2019-03-04以来,sqlalchemy现在支持engine=create_engine(sqlalchemy_url,fast_executemany=True)用于mssql+pyodbc方言。也就是说,不再需要定义一个函数并使用@event.listens_for(engine,'before_cursor_execute')这意味着下面的函数可以被移除,只需要在create_engine语句中设置标志 - 就可以保留加速。

原帖:

我为了发表这篇文章而注册了账户。我想在上面的主题下发表评论,因为这是对已经提供答案的跟进。上面的解决方案适用于我在基于Ubuntu的安装上使用版本17 SQL驱动程序连接微软SQL存储时的情况。

我用来加快速度(让速度提高了100倍以上)的完整代码如下。只需更改连接字符串以符合您的相关细节即可获得一键式片段。向上面的帖子发帖人表示感谢,因为我已经寻找了相当长的时间。

import pandas as pd
import numpy as np
import time
from sqlalchemy import create_engine, event
from urllib.parse import quote_plus


conn =  "DRIVER={ODBC Driver 17 for SQL Server};SERVER=IP_ADDRESS;DATABASE=DataLake;UID=USER;PWD=PASS"
quoted = quote_plus(conn)
new_con = 'mssql+pyodbc:///?odbc_connect={}'.format(quoted)
engine = create_engine(new_con)


@event.listens_for(engine, 'before_cursor_execute')
def receive_before_cursor_execute(conn, cursor, statement, params, context, executemany):
    print("FUNC call")
    if executemany:
        cursor.fast_executemany = True


table_name = 'fast_executemany_test'
df = pd.DataFrame(np.random.random((10**4, 100)))


s = time.time()
df.to_sql(table_name, engine, if_exists = 'replace', chunksize = None)
print(time.time() - s)

基于下面的评论,我想花点时间解释一下pandas to_sql 的实现以及查询处理方式的一些限制。根据我的了解,有2件事可能会导致出现MemoryError错误:

1)假设您正在写入远程SQL存储。当您尝试使用to_sql方法写入大型pandas DataFrame时,它会将整个DataFrame转换为值列表。这种转换占用的RAM比原始DataFrame多得多(除此之外,由于旧的DataFrame仍然存在于RAM中)。将此列表提供给ODBC连接器的最终executemany调用。我认为ODBC连接器在处理如此大的查询时可能会遇到一些问题。解决此问题的方法是为to_sql方法提供一个chunksize参数(10 ** 5似乎是最佳选择,在Azure的2 CPU 7GB RAM MSSQL存储应用程序中可提供大约600 mbit / s的写入速度 - 不能推荐Azure)。因此,第一个限制,即查询大小,可以通过提供chunksize参数来避免。但是,无法在VM上写入像10 ** 7或更大的DataFrame(至少在我正在使用的VM上,其RAM大小约为55GB),这是问题编号2。

这可以通过使用np.split将DataFrame拆分为10 ** 6大小的DataFrame块来避免。可以迭代地将这些块写入。当我有一个针对pandas核心中to_sql方法的解决方案时,我将尝试提交拉取请求,以便您无需每次都进行预分解。无论如何,我最终编写了类似(不是完全适用)于以下函数的函数:

import pandas as pd
import numpy as np

def write_df_to_sql(df, **kwargs):
    chunks = np.split(df, df.shape()[0] / 10**6)
    for chunk in chunks:
        chunk.to_sql(**kwargs)
    return True

可以在此处查看以上代码片段的更完整示例:https://gitlab.com/timelord/timelord/blob/master/timelord/utils/connector.py

这是我编写的一个类,它包含补丁并简化了设置与 SQL 连接所需的一些必要开销。仍需要编写一些文档。同时我计划将该补丁贡献给 pandas,但尚未找到合适的方法。

希望这能有所帮助。


1
我认为这与原问题有关系,因为原问题是关于加速to_sql方法的。你现在问的是同一方法中参数错误的问题,这与原问题不再相关 - 据我所知。只是试图遵守我通常看到的SO规范。关于您现在提供的额外信息,也许出现错误是因为已经存在的表的大小不同,因此无法附加(类型错误)?另外,我提供的最后一个代码片段仅用于说明目的,您可能需要进行一些修改。 - hetspookjee
2
不确定为什么以前没有分享过这个,但这是我经常用来在SQL数据库中获取和导出数据框的类:https://gitlab.com/timelord/timelord/blob/master/timelord/utils/connector.py享受吧! - hetspookjee
2
@erickfis 我已经更新了这个类,并提供了一个合适的示例。请注意,不是每个数据库都使用相同的驱动程序,因此在使用此类时可能会引发错误。一个不使用此驱动程序的示例数据库是PostgreSQL。我还没有找到一种快速的方法来将数据插入PSQL中。仍然可以通过显式关闭开关来使用此类,方法是在初始化类后调用:con._init_engine(SET_FAST_EXECUTEMANY_SWITCH=False)祝你好运。 - hetspookjee
10
@hetspookjee - 由于这是迄今为止最流行的答案,请考虑更新它以提及SQLAlchemy 1.3.0,发布于2019-03-04,现在支持使用engine = create_engine(sqlalchemy_url, fast_executemany=True)来处理mssql+pyodbc方言。也就是说,现在不再需要定义一个函数并使用@event.listens_for(engine, 'before_cursor_execute')。谢谢。 - Gord Thompson
1
感谢Gord Thompson的更新!我已将您的评论置于顶部,并将我的帖子制作成社区维基文章以供未来更新。 - hetspookjee
显示剩余20条评论

42

在联系了SQLAlchemy的开发人员后,出现了解决此问题的方法。非常感谢他们所做的出色工作!

必须使用游标执行事件并检查是否已引发executemany标志。 如果确实如此,则打开fast_executemany选项。例如:

from sqlalchemy import event

@event.listens_for(engine, 'before_cursor_execute')
def receive_before_cursor_execute(conn, cursor, statement, params, context, executemany):
    if executemany:
        cursor.fast_executemany = True

想了解有关执行事件的更多信息,请 点击这里


更新:pyodbcfast_executemany支持已在SQLAlchemy 1.3.0中添加,因此不再需要这种方法。


2
非常感谢您为此付出的努力。仅为了明确起见,这个装饰器和函数应该在实例化SQLAlchemy引擎之前声明吗? - Pylander
2
你太客气了。 我在类的构造函数中实例化引擎后立即声明它。 - J.K.
1
这样就不需要使用pyodbc特定的连接代码了吗?只需要在这个函数之后调用to_sql()函数就可以了吗? - OverflowingTheGlass
3
我尝试在函数后直接调用 to_sql,但它并没有加快任何速度。 - OverflowingTheGlass
4
请考虑更新您的答案,提到 SQLAlchemy 1.3.0 (发布于2019-03-04)现在支持 engine = create_engine(sqlalchemy_url, fast_executemany=True) 用于 mssql+pyodbc 语言。也就是说,不再需要定义一个函数并使用 @event.listens_for(engine, 'before_cursor_execute')。谢谢。 - Gord Thompson
显示剩余6条评论

23

我遇到了与您相似的问题,但是我使用的是PostgreSQL。他们刚刚发布了 pandas版本0.24.0,在to_sql函数中增加了一个名为method的新参数,它解决了我的问题。

from sqlalchemy import create_engine

engine = create_engine(your_options)
data_frame.to_sql(table_name, engine, method="multi")

对我来说,上传速度快了100倍。如果你要发送大量数据,我还建议设置chunksize参数。



11

我只是想提供这个完整的示例作为额外的高性能选项,供那些可以使用新的turbodbc库的人使用:http://turbodbc.readthedocs.io/en/latest/

很明显,在pandas的.to_sql()、通过sqlalchemy触发fast_executemany、直接使用元组/列表等pyodbc,甚至尝试使用平面文件进行BULK UPLOAD之间有很多选择。

希望以下内容可以让生活更愉快,随着当前pandas项目的功能演变,或者在将来包括类似turbodbc集成的内容。

import pandas as pd
import numpy as np
from turbodbc import connect, make_options
from io import StringIO

test_data = '''id,transaction_dt,units,measures
               1,2018-01-01,4,30.5
               1,2018-01-03,4,26.3
               2,2018-01-01,3,12.7
               2,2018-01-03,3,8.8'''

df_test = pd.read_csv(StringIO(test_data), sep=',')
df_test['transaction_dt'] = pd.to_datetime(df_test['transaction_dt'])

options = make_options(parameter_sets_to_buffer=1000)
conn = connect(driver='{SQL Server}', server='server_nm', database='db_nm', turbodbc_options=options)

test_query = '''DROP TABLE IF EXISTS [db_name].[schema].[test]

                CREATE TABLE [db_name].[schema].[test]
                (
                    id int NULL,
                    transaction_dt datetime NULL,
                    units int NULL,
                    measures float NULL
                )

                INSERT INTO [db_name].[schema].[test] (id,transaction_dt,units,measures)
                VALUES (?,?,?,?) '''

cursor.executemanycolumns(test_query, [df_test['id'].values, df_test['transaction_dt'].values, df_test['units'].values, df_test['measures'].values]

turbodbc在许多情况下(特别是使用numpy数组时)应该非常快。请注意,将数据框列的底层numpy数组作为参数直接传递给查询非常简单。我也相信这有助于防止创建过多中间对象导致内存消耗剧增。希望这有所帮助!


我会在接下来的几天尝试这个,然后再回来分享我的发现。 - erickfis
1
@erickfis,这对你有帮助吗?很高兴在这里听到你的发现。 - Pylander
嗨,Pylander!我还没有时间尝试,这里很忙。目前我正在使用公司的工具来摄取数据。但是我非常需要在下一个项目中使用它,以便在SQL服务器上摄取大量数据。我看到的主要缺点是我的dfs每个有240列。当使用pd.to_sql时,我不需要担心每一列。然而,pd.to_sql非常慢,甚至到了禁止的程度。使用turbodbc可能是我的解决方案,但是手动输入那240列中的每一个似乎对我来说并不是最优选择(因为有很多不同的df需要被摄取)。 - erickfis
1
我搞定了:太酷了!让我非常兴奋,我在我的Github上写了一篇关于它的“博客”:github - erickfis
2
@erickfis 太棒了!我很高兴最终你发现这对你的需求很有价值,谢谢你链接了你漂亮的演示帖子。它应该会有助于推广这个答案,并将 turbodbc 项目的知名度提高给寻找解决方案的人们。 - Pylander
这个能和pandas的to_sql一起使用吗? - wordsforthewise

7

看起来Pandas 0.23.0和0.24.0 使用多值插入与PyODBC一起,这阻止了快速执行executemany - 每个块只发出一个INSERT ... VALUES ...语句。多值插入块是旧的慢executemany默认值的改进,但在简单测试中,快速executemany方法仍然占优势,更不用说无需手动chunksize计算,这是多值插入所必需的。如果将来没有提供配置选项,则可以通过monkeypatching强制执行旧行为:

import pandas.io.sql

def insert_statement(self, data, conn):
    return self.table.insert(), data

pandas.io.sql.SQLTable.insert_statement = insert_statement

未来已经到来,至少在master分支中,可以使用to_sql()的关键字参数method = 来控制插入方法。它默认为None,强制使用executemany方法。传递method ='multi'会导致使用多个值插入。甚至可以用于实现特定于DBMS的方法,例如Postgresql COPY

1
Pandas的开发人员在这个问题上反复思考了一段时间,但最终似乎放弃了多行插入方法,至少对于mssql+pyodbc SQLAlchemy引擎是如此。确实,Pandas 0.23.4让fast_executemany发挥了作用。 - Gord Thompson
2
我还没有检查当前的情况,但在0.24.0版本中已经重新纳入。编辑:至少在“master”分支中仍然存在,但现在可以控制了:https://github.com/pandas-dev/pandas/blob/master/pandas/io/sql.py#L1157。似乎通过传递 to_sql(…,method = None) 应该强制使用 executemany 方法。 - Ilja Everilä
3
...而None是默认值。 - Ilja Everilä

7

正如@Pylander所指出的那样

Turbodbc是数据摄取的最佳选择,远胜于其他!

我对此感到非常兴奋,以至于在我的github和medium上写了一篇“博客”文章: 请查看https://medium.com/@erickfis/etl-process-with-turbodbc-1d19ed71510e

以获取一个可行的示例,并与pandas.to_sql进行比较

长话短说,

使用turbodbc 我在3秒钟内得到了10000行(77列)

使用pandas.to_sql 我需要198秒才能得到相同的10000行(77列)...

以下是我详细的操作步骤

导入内容:

import sqlalchemy
import pandas as pd
import numpy as np
import turbodbc
import time

加载并处理一些数据 - 将我的sample.pkl替换为你自己的:

df = pd.read_pickle('sample.pkl')

df.columns = df.columns.str.strip()  # remove white spaces around column names
df = df.applymap(str.strip) # remove white spaces around values
df = df.replace('', np.nan)  # map nans, to drop NAs rows and columns later
df = df.dropna(how='all', axis=0)  # remove rows containing only NAs
df = df.dropna(how='all', axis=1)  # remove columns containing only NAs
df = df.replace(np.nan, 'NA')  # turbodbc hates null values...

使用 sqlAlchemy 创建表格

不幸的是,turbodbc 需要大量的工作和 sql 手动劳动来创建表格和向其中插入数据。

幸运的是,Python 是纯粹的快乐,我们可以自动化编写 sql 代码的过程。

第一步是创建接收我们数据的表格。然而,如果您的表格有超过几列,手动编写 sql 代码会很麻烦。在我的情况下,很多表格通常有 240 列!

这就是 sqlAlchemy 和 pandas 可以帮助我们的地方:pandas 不适合写大量行(如此例中的 10000 行),但对于表格的头部只有 6 行呢?这样,我们可以自动化创建表格的过程。

创建 sqlAlchemy 连接:

mydb = 'someDB'

def make_con(db):
    """Connect to a specified db."""
    database_connection = sqlalchemy.create_engine(
        'mssql+pymssql://{0}:{1}@{2}/{3}'.format(
            myuser, mypassword,
            myhost, db
            )
        )
    return database_connection

pd_connection = make_con(mydb)

在SQL Server上创建表格

使用pandas + sqlAlchemy,但仅为之前提到的turbodbc做准备。请注意这里的df.head():我们使用pandas + sqlAlchemy仅插入了6行数据以自动创建表格。这将运行得非常快,并且是为了自动化表格创建而完成的。

table = 'testing'
df.head().to_sql(table, con=pd_connection, index=False)

现在表已经就位,让我们认真对待这个问题。

Turbodbc连接:

def turbo_conn(mydb):
    """Connect to a specified db - turbo."""
    database_connection = turbodbc.connect(
                                            driver='ODBC Driver 17 for SQL Server',
                                            server=myhost,
                                            database=mydb,
                                            uid=myuser,
                                            pwd=mypassword
                                        )
    return database_connection

为turbodbc准备SQL命令和数据。让我们通过创新来自动化这个代码生成过程:

def turbo_write(mydb, df, table):
    """Use turbodbc to insert data into sql."""
    start = time.time()
    # preparing columns
    colunas = '('
    colunas += ', '.join(df.columns)
    colunas += ')'

    # preparing value place holders
    val_place_holder = ['?' for col in df.columns]
    sql_val = '('
    sql_val += ', '.join(val_place_holder)
    sql_val += ')'

    # writing sql query for turbodbc
    sql = f"""
    INSERT INTO {mydb}.dbo.{table} {colunas}
    VALUES {sql_val}
    """

    # writing array of values for turbodbc
    valores_df = [df[col].values for col in df.columns]

    # cleans the previous head insert
    with connection.cursor() as cursor:
        cursor.execute(f"delete from {mydb}.dbo.{table}")
        connection.commit()

    # inserts data, for real
    with connection.cursor() as cursor:
        try:
            cursor.executemanycolumns(sql, valores_df)
            connection.commit()
        except Exception:
            connection.rollback()
            print('something went wrong')

    stop = time.time() - start
    return print(f'finished in {stop} seconds')

使用turbodbc写入数据 - 我在3秒内处理了10000行(77列):
turbo_write(mydb, df.sample(10000), table)

pandas方法比较 - 我处理了相同的10000行(77列)所用时间为198秒...

table = 'pd_testing'

def pandas_comparisson(df, table):
    """Load data using pandas."""
    start = time.time()
    df.to_sql(table, con=pd_connection, index=False)
    stop = time.time() - start
    return print(f'finished in {stop} seconds')

pandas_comparisson(df.sample(10000), table)

环境和条件

Python 3.6.7 :: Anaconda, Inc.
TURBODBC version ‘3.0.0’
sqlAlchemy version ‘1.2.12’
pandas version ‘0.23.4’
Microsoft SQL Server 2014
user with bulk operations privileges

请查看 https://erickfis.github.io/loose-code/ 获取此代码的更新!

我也发现pandas速度较慢,但是在一个项目中,我采用了不同的方法来解决这个问题。我的数据分布在多个文件中(13列),但总共有100万行。我使用MySQL INFILE并将文件存储在本地。通过从Python中调用它,并使用线程,我能够在约20秒内导入100万行数据。 - Coffee and Code

6

SQL Server插入性能:pyodbc vs. turbodbc

使用to_sql将pandas DataFrame上传到SQL Server时,启用fast_executemany的pyodbc与turbodbc相比速度差不多。然而,如果没有启用fast_executemany,turbodbc肯定比pyodbc快。

测试环境:

[venv1_pyodbc]
pyodbc 2.0.25

[venv2_turbodbc]
turbodbc 3.0.0
sqlalchemy-turbodbc 0.1.0

[两者共同]
Python 3.6.4 64位Windows版本
SQLAlchemy 1.3.0b1
pandas 0.23.4
numpy 1.15.4

测试代码:

# for pyodbc
engine = create_engine('mssql+pyodbc://sa:whatever@SQL_panorama', fast_executemany=True)
# for turbodbc
# engine = create_engine('mssql+turbodbc://sa:whatever@SQL_panorama')

# test data
num_rows = 10000
num_cols = 100
df = pd.DataFrame(
    [[f'row{x:04}col{y:03}' for y in range(num_cols)] for x in range(num_rows)],
    columns=[f'col{y:03}' for y in range(num_cols)]
)

t0 = time.time()
df.to_sql("sqlalchemy_test", engine, if_exists='replace', index=None)
print(f"pandas wrote {num_rows} rows in {(time.time() - t0):0.1f} seconds")

每个环境都运行了12次测试,对于每个环境,丢弃最好和最差的一次测试。结果(以秒为单位):

   rank  pyodbc  turbodbc
   ----  ------  --------
      1    22.8      27.5
      2    23.4      28.1
      3    24.6      28.2
      4    25.2      28.5
      5    25.7      29.3
      6    26.9      29.9
      7    27.0      31.4
      8    30.1      32.1
      9    33.6      32.5
     10    39.8      32.9
   ----  ------  --------
average    27.9      30.0

4

我想补充@J.K.的回答。

如果您使用此方法:

@event.listens_for(engine, 'before_cursor_execute')
def receive_before_cursor_execute(conn, cursor, statement, params, context, executemany):
    if executemany:
        cursor.fast_executemany = True

你遇到了这个错误:

"sqlalchemy.exc.DBAPIError: (pyodbc.Error) ('HY010', '[HY010] [Microsoft][SQL Server Native Client 11.0]Function sequence error (0) (SQLParamData)') [SQL: 'INSERT INTO ... (...) VALUES (?, ?)'] [parameters: ((..., ...), (..., ...)] (关于此错误的背景信息请访问: http://sqlalche.me/e/dbapi)"

像这样编码您的字符串值:'yourStringValue'.encode('ascii')

这将解决您的问题。


0

我刚刚修改了引擎代码,帮助我将插入速度提高了100倍。

旧代码 -

import json
import maya
import time
import pandas
import pyodbc
import pandas as pd
from sqlalchemy import create_engine

retry_count = 0
retry_flag = True

hostInfoDf = pandas.read_excel('test.xlsx', sheet_name='test')
print("Read Ok")

engine = create_engine("mssql+pyodbc://server_name/db_name?trusted_connection=yes&driver=ODBC+Driver+17+for+SQL+Server")

while retry_flag and retry_count < 5:
  try:
    df.to_sql("table_name",con=engine,if_exists="replace",index=False,chunksize=5000,schema="dbo")
    retry_flag = False
  except:
    retry_count = retry_count + 1
    time.sleep(30)

修改后的引擎线路 -

原始 -

engine = create_engine("mssql+pyodbc://server_name/db_name?trusted_connection=yes&driver=ODBC+Driver+17+for+SQL+Server")

到 -

engine = create_engine("mssql+pyodbc://server_name/db_name?trusted_connection=yes&driver=ODBC+Driver+17+for+SQL+Server", fast_executemany=True)

如果您有任何关于Python到SQL连接的问题,请随时向我提问,我很乐意为您提供帮助。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接