如何在使用 pd.DataFrame.to_sql 插入方法进行 PostgreSQL "upsert" 操作时获取主键列?

3
我正在尝试使用COPY修改Pandas插入方法。目的是为了在Postgres数据库中实现"upsert"机制。
我正在使用这个SO答案来创建临时表并将数据复制到其中,然后插入到目标表中。
以下代码可以工作,但我必须明确地将primary_key设置为我的实际表PK。问题是,我能否从此范围内可见的变量中获取PK?
import csv
from io import StringIO
from typing import Iterable

from sqlalchemy.engine.base import Connection
from pandas.io.sql import SQLTable


# Alternative to_sql() *method* for DBs that support COPY FROM
# https://pandas.pydata.org/pandas-docs/stable/user_guide/io.html#io-sql-method
def psql_upsert_copy(table: SQLTable, conn: Connection, keys: Iterable, data_iter: Iterable[tuple]):
    # gets a DBAPI connection that can provide a cursor
    dbapi_conn = conn.connection
    with dbapi_conn.cursor() as cur:
        s_buf = StringIO()
        writer = csv.writer(s_buf)
        writer.writerows(data_iter)
        s_buf.seek(0)
        columns = ', '.join(f'"{k}"' for k in keys)
        excluded_columns = ', '.join(f'EXCLUDED."{k}"' for k in keys)

        # is it possible to get it from the table?
        primary_key = ', '.join(['"PK_col_a"', '"PK_col_b"'])

        if table.schema:
            table_name = f'{table.schema}.{table.name}'
        else:
            table_name = table.name
        sql = f'''
        CREATE TEMP TABLE tmp_table
        ON COMMIT DROP
        AS SELECT * FROM {table_name}
        WITH NO DATA;

        COPY tmp_table ({columns}) FROM STDIN WITH CSV;

        INSERT INTO {table_name}
        SELECT *
        FROM tmp_table
        ON CONFLICT ({primary_key}) DO UPDATE
        SET ({columns}) = ({excluded_columns});
        '''
        cur.copy_expert(sql=sql, file=s_buf)

附注:使用方法如下:

df.to_sql(name='orinal_table_name', con=some_psql_db_engine, if_exists='append', index=False, method=psql_upsert_copy)

你是否热衷于使用pandas的COPY插入方法?因为SQLalchemy有一个内置的机制可以很好地进行upsert操作。我过去也尝试过使用to_sql,但最终转向了SQLalchemy,因为它更加容易使用。 - PvG
@PvGelder 不是这样的。只是我比较熟悉pandas,不太熟悉SQLAlchemy ORM。如果您能为我提供一些使用SQLAlchemy构建upsert机制的指导,那就太好了。 - Valery Kustov
同时,我认为可以更改 psql_upsert_copy 的签名,使其接受一个额外的参数 primary_key: str,然后在代码中使用 functools.partial 来填充此参数,然后将函数传递给 df.to_sql()。但也许这已经在该范围内可见了? - Valery Kustov
1个回答

0

你可以使用SQLalchemy做类似这样的事情:

from sqlalchemy import MetaData
from sqlalchemy.dialects.postgresql import insert
import psycopg2
import sqlalchemy


def upsert_data(df, url, schema, table, primarykey):
    insrt_vals = df.to_dict(orient='records')
    engine = sqlalchemy.create_engine(url)
    connect = engine.connect()
    meta = MetaData(bind=engine, schema=schema)
    meta.reflect(bind=engine)
    table_used = meta.tables[table]
    insrt_stmnt = insert(table_used).values(insrt_vals)

    update_columns = {col.name: col for col in insrt_stmnt.excluded if col.name not in (primarykey)}
    upsert_stmt = insrt_stmnt.on_conflict_do_update(index_elements=[primarykey], set_=update_columns)

更多信息可以在这里找到:https://docs.sqlalchemy.org/en/13/dialects/postgresql.html


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接