正如@Pylander所指出的那样
Turbodbc是数据摄取的最佳选择,远胜于其他!
我对此感到非常兴奋,以至于在我的github和medium上写了一篇“博客”文章:
请查看https://medium.com/@erickfis/etl-process-with-turbodbc-1d19ed71510e
以获取一个可行的示例,并与pandas.to_sql进行比较
长话短说,
使用turbodbc
我在3秒钟内得到了10000行(77列)
使用pandas.to_sql
我需要198秒才能得到相同的10000行(77列)...
以下是我详细的操作步骤
导入内容:
import sqlalchemy
import pandas as pd
import numpy as np
import turbodbc
import time
加载并处理一些数据 - 将我的sample.pkl替换为你自己的:
df = pd.read_pickle('sample.pkl')
df.columns = df.columns.str.strip()
df = df.applymap(str.strip)
df = df.replace('', np.nan)
df = df.dropna(how='all', axis=0)
df = df.dropna(how='all', axis=1)
df = df.replace(np.nan, 'NA')
使用 sqlAlchemy 创建表格
不幸的是,turbodbc 需要大量的工作和 sql 手动劳动来创建表格和向其中插入数据。
幸运的是,Python 是纯粹的快乐,我们可以自动化编写 sql 代码的过程。
第一步是创建接收我们数据的表格。然而,如果您的表格有超过几列,手动编写 sql 代码会很麻烦。在我的情况下,很多表格通常有 240 列!
这就是 sqlAlchemy 和 pandas 可以帮助我们的地方:pandas 不适合写大量行(如此例中的 10000 行),但对于表格的头部只有 6 行呢?这样,我们可以自动化创建表格的过程。
创建 sqlAlchemy 连接:
mydb = 'someDB'
def make_con(db):
"""Connect to a specified db."""
database_connection = sqlalchemy.create_engine(
'mssql+pymssql://{0}:{1}@{2}/{3}'.format(
myuser, mypassword,
myhost, db
)
)
return database_connection
pd_connection = make_con(mydb)
在SQL Server上创建表格
使用pandas + sqlAlchemy,但仅为之前提到的turbodbc做准备。请注意这里的df.head():我们使用pandas + sqlAlchemy仅插入了6行数据以自动创建表格。这将运行得非常快,并且是为了自动化表格创建而完成的。
table = 'testing'
df.head().to_sql(table, con=pd_connection, index=False)
现在表已经就位,让我们认真对待这个问题。
Turbodbc连接:
def turbo_conn(mydb):
"""Connect to a specified db - turbo."""
database_connection = turbodbc.connect(
driver='ODBC Driver 17 for SQL Server',
server=myhost,
database=mydb,
uid=myuser,
pwd=mypassword
)
return database_connection
为turbodbc准备SQL命令和数据。让我们通过创新来自动化这个代码生成过程:
def turbo_write(mydb, df, table):
"""Use turbodbc to insert data into sql."""
start = time.time()
colunas = '('
colunas += ', '.join(df.columns)
colunas += ')'
val_place_holder = ['?' for col in df.columns]
sql_val = '('
sql_val += ', '.join(val_place_holder)
sql_val += ')'
sql = f"""
INSERT INTO {mydb}.dbo.{table} {colunas}
VALUES {sql_val}
"""
valores_df = [df[col].values for col in df.columns]
with connection.cursor() as cursor:
cursor.execute(f"delete from {mydb}.dbo.{table}")
connection.commit()
with connection.cursor() as cursor:
try:
cursor.executemanycolumns(sql, valores_df)
connection.commit()
except Exception:
connection.rollback()
print('something went wrong')
stop = time.time() - start
return print(f'finished in {stop} seconds')
使用turbodbc写入数据 - 我在3秒内处理了10000行(77列):
turbo_write(mydb, df.sample(10000), table)
pandas方法比较 - 我处理了相同的10000行(77列)所用时间为198秒...
table = 'pd_testing'
def pandas_comparisson(df, table):
"""Load data using pandas."""
start = time.time()
df.to_sql(table, con=pd_connection, index=False)
stop = time.time() - start
return print(f'finished in {stop} seconds')
pandas_comparisson(df.sample(10000), table)
环境和条件
Python 3.6.7 :: Anaconda, Inc.
TURBODBC version ‘3.0.0’
sqlAlchemy version ‘1.2.12’
pandas version ‘0.23.4’
Microsoft SQL Server 2014
user with bulk operations privileges
请查看
https://erickfis.github.io/loose-code/ 获取此代码的更新!
to_sql
方法的。你现在问的是同一方法中参数错误的问题,这与原问题不再相关 - 据我所知。只是试图遵守我通常看到的SO规范。关于您现在提供的额外信息,也许出现错误是因为已经存在的表的大小不同,因此无法附加(类型错误)?另外,我提供的最后一个代码片段仅用于说明目的,您可能需要进行一些修改。 - hetspookjeecon._init_engine(SET_FAST_EXECUTEMANY_SWITCH=False)
祝你好运。 - hetspookjeeengine = create_engine(sqlalchemy_url, fast_executemany=True)
来处理mssql+pyodbc
方言。也就是说,现在不再需要定义一个函数并使用@event.listens_for(engine, 'before_cursor_execute')
。谢谢。 - Gord Thompson