将大型Pandas数据框写入SQL Server数据库

13

我有74个相对较大的Pandas数据帧(大约34,600行和8列),我正在尝试尽快将它们插入到SQL Server数据库中。经过一些研究,我了解到老式的pandas.to_sql函数不适用于将大量数据插入到SQL Server数据库中,而这是我采取的初始方法(非常缓慢-几乎需要一个小时才能完成应用程序,而使用mysql数据库只需约4分钟)。

这篇文章和许多其他StackOverflow帖子在指导我正确的方向上很有帮助,但我遇到了一些问题:

我正在尝试使用SQLAlchemy的Core而不是ORM,原因在上面的链接中有解释。因此,我正在将数据帧转换为字典,使用pandas.to_dict,然后执行execute()insert()

self._session_factory.engine.execute(
    TimeSeriesResultValues.__table__.insert(),
    data)
# 'data' is a list of dictionaries.
问题在于插入操作没有获取到任何值——它们显示为空的括号,并且我收到了以下错误提示:

问题在于插入操作没有获取到任何值——它们显示为空的括号,并且我收到了以下错误提示:

(pyodbc.IntegretyError) ('23000', "[23000] [FreeTDS][SQL Server]Cannot
insert the value NULL into the column...

我传递了一个包含字典的列表,但是列表中的值没有显示出来,我不知道原因所在。

编辑:

这是我参考的示例:

def test_sqlalchemy_core(n=100000):
    init_sqlalchemy()
    t0 = time.time()
    engine.execute(
        Customer.__table__.insert(),
        [{"name": 'NAME ' + str(i)} for i in range(n)]
    )
    print("SQLAlchemy Core: Total time for " + str(n) +
        " records " + str(time.time() - t0) + " secs")

使用MySQL数据库时,大约需要4分钟时间。因此,to_sql()是一个可行的解决方案,只是与MySQL相比,MSSQL连接速度较慢?您正在使用哪个ODBC API?数据库服务器是本地还是远程?考虑导入临时表,然后迁移到最终表。 - Parfait
@Parfait:使用to_sql()在MySQL中可以获得可接受的性能,但在MSSQL中不行。我正在使用pyodbc。数据库是远程的,因此在CSV文件中写入然后通过原始SQL代码进行批量插入在这种情况下也不太可行。此外,用户需要批量管理权限才能这样做,这对于此应用程序的用户可能并不总是可能的。 - denvaar
2
考虑绕过ODBC驱动程序,使用严格的Python API - pmyssl。MySQL ODBC API呢?pymysql呢?两者的表结构和数据类型相同吗?记录数量也相同吗?务必进行深入调查。两者都是高级企业RDMS,不应该有如此大的差距(4分钟对比约60分钟)。 - Parfait
2个回答

14

我有些不幸的消息要告诉你,SQLAlchemy实际上没有为SQL Server实现批量导入功能,它实际上只会执行与to_sql相同的缓慢的单个INSERT语句。我建议你最好尝试使用bcp命令行工具编写脚本。以下是我过去使用过的脚本,但不能保证一定有效:

from subprocess import check_output, call
import pandas as pd
import numpy as np
import os

pad = 0.1
tablename = 'sandbox.max.pybcp_test'
overwrite=True
raise_exception = True
server = 'P01'
trusted_connection= True
username=None
password=None
delimiter='|'
df = pd.read_csv('D:/inputdata.csv', encoding='latin', error_bad_lines=False)



def get_column_def_sql(col):
   if col.dtype == object:
      width = col.str.len().max() * (1+pad)
      return '[{}] varchar({})'.format(col.name, int(width)) 
   elif np.issubdtype(col.dtype, float):
      return'[{}] float'.format(col.name) 
   elif np.issubdtype(col.dtype, int):
      return '[{}] int'.format(col.name) 
   else:
      if raise_exception:
         raise NotImplementedError('data type {} not implemented'.format(col.dtype))
      else:
         print('Warning: cast column {} as varchar; data type {} not implemented'.format(col, col.dtype))
         width = col.str.len().max() * (1+pad)
         return '[{}] varchar({})'.format(col.name, int(width)) 

def create_table(df, tablename, server, trusted_connection, username, password, pad):         
    if trusted_connection:
       login_string = '-E'
    else:
       login_string = '-U {} -P {}'.format(username, password)

    col_defs = []
    for col in df:
       col_defs += [get_column_def_sql(df[col])]

    query_string = 'CREATE TABLE {}\n({})\nGO\nQUIT'.format(tablename, ',\n'.join(col_defs))       
    if overwrite == True:
       query_string = "IF OBJECT_ID('{}', 'U') IS NOT NULL DROP TABLE {};".format(tablename, tablename) + query_string


    query_file = 'c:\\pybcp_tempqueryfile.sql'
    with open (query_file,'w') as f:
       f.write(query_string)

    if trusted_connection:
       login_string = '-E'
    else:
       login_string = '-U {} -P {}'.format(username, password)

    o = call('sqlcmd -S {} {} -i {}'.format(server, login_string, query_file), shell=True)
    if o != 0:
       raise BaseException("Failed to create table")
   # o = call('del {}'.format(query_file), shell=True)


def call_bcp(df, tablename):   
    if trusted_connection:
       login_string = '-T'
    else:
       login_string = '-U {} -P {}'.format(username, password)
    temp_file = 'c:\\pybcp_tempqueryfile.csv'

    #remove the delimiter and change the encoding of the data frame to latin so sql server can read it
    df.loc[:,df.dtypes == object] = df.loc[:,df.dtypes == object].apply(lambda col: col.str.replace(delimiter,'').str.encode('latin'))
    df.to_csv(temp_file, index = False, sep = '|', errors='ignore')
    o = call('bcp sandbox.max.pybcp_test2 in c:\pybcp_tempqueryfile.csv -S "localhost" -T -t^| -r\n -c')

7

1
fast_executemany=True在我的情况下并没有帮助。与其他关系型数据库管理系统相比,写入速度非常慢。 - hui chen
使用df.to_sql()和Azure上的SQL Server数据库,这对我很有效。速度提高了超过10倍。 - Nic
对我也帮助很大...现在Azure SQL DB可以在几秒钟内写入超过36k条记录。 - Peter

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接