我正在尝试使用COPY修改Pandas插入方法。目的是为了在Postgres数据库中实现"upsert"机制。
我正在使用这个SO答案来创建临时表并将数据复制到其中,然后插入到目标表中。
以下代码可以工作,但我必须明确地将primary_key设置为我的实际表PK。问题是,我能否从此范围内可见的变量中获取PK?
我正在使用这个SO答案来创建临时表并将数据复制到其中,然后插入到目标表中。
以下代码可以工作,但我必须明确地将primary_key设置为我的实际表PK。问题是,我能否从此范围内可见的变量中获取PK?
import csv
from io import StringIO
from typing import Iterable
from sqlalchemy.engine.base import Connection
from pandas.io.sql import SQLTable
# Alternative to_sql() *method* for DBs that support COPY FROM
# https://pandas.pydata.org/pandas-docs/stable/user_guide/io.html#io-sql-method
def psql_upsert_copy(table: SQLTable, conn: Connection, keys: Iterable, data_iter: Iterable[tuple]):
# gets a DBAPI connection that can provide a cursor
dbapi_conn = conn.connection
with dbapi_conn.cursor() as cur:
s_buf = StringIO()
writer = csv.writer(s_buf)
writer.writerows(data_iter)
s_buf.seek(0)
columns = ', '.join(f'"{k}"' for k in keys)
excluded_columns = ', '.join(f'EXCLUDED."{k}"' for k in keys)
# is it possible to get it from the table?
primary_key = ', '.join(['"PK_col_a"', '"PK_col_b"'])
if table.schema:
table_name = f'{table.schema}.{table.name}'
else:
table_name = table.name
sql = f'''
CREATE TEMP TABLE tmp_table
ON COMMIT DROP
AS SELECT * FROM {table_name}
WITH NO DATA;
COPY tmp_table ({columns}) FROM STDIN WITH CSV;
INSERT INTO {table_name}
SELECT *
FROM tmp_table
ON CONFLICT ({primary_key}) DO UPDATE
SET ({columns}) = ({excluded_columns});
'''
cur.copy_expert(sql=sql, file=s_buf)
附注:使用方法如下:
df.to_sql(name='orinal_table_name', con=some_psql_db_engine, if_exists='append', index=False, method=psql_upsert_copy)
psql_upsert_copy
的签名,使其接受一个额外的参数primary_key: str
,然后在代码中使用functools.partial
来填充此参数,然后将函数传递给df.to_sql()
。但也许这已经在该范围内可见了? - Valery Kustov