如何使用SQLAlchemy在PostgreSQL上执行正确的upsert操作?

24

我想使用PostgreSQL 9.5添加的“新”功能,使用SQLAlchemy Core进行upsert操作。虽然它已经实现了,但是我对语法感到很困惑,我无法使其符合我的需求。 以下是一个示例代码,我希望能够执行:

from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()
class User(Base):
    __tablename__ = 'test'
    a_id = Column('id',Integer, primary_key=True)
    a = Column("a",Integer)

engine = create_engine('postgres://name:password@localhost/test')
User().metadata.create_all(engine)
meta = MetaData(engine)
meta.reflect()
table = Table('test', meta, autoload=True)
conn = engine.connect()

from sqlalchemy.dialects.postgresql import insert as psql_insert
stmt = psql_insert(table).values({
    table.c['id']: bindparam('id'),
    table.c['a']: bindparam('a'),
})
stmt = stmt.on_conflict_do_update(
    index_elements=[table.c['id']],
    set_={'a': bindparam('a')},
)
list_of_dictionary = [{'id':1, 'a':1, }, {'id':2, 'a':2,}]
conn.execute(stmt, list_of_dictionary)
我想插入一组行,如果其中一个ID已经存在,则希望使用我最初要插入的值进行更新。然而,SQLAlchemy抛出了以下错误:
CompileError: bindparam() name 'a' is reserved for automatic usage in the VALUES or SET clause of this insert/update statement.   Please use a name other than column name when using bindparam() with insert() or update() (for example, 'b_a').

虽然这是一个已知的问题(请参见https://groups.google.com/forum/#!topic/sqlalchemy/VwiUlF1cz_o), 但我没有找到任何不需要修改list_of_dictionary的键或列名的适当答案。

我想知道是否有一种构建stmt的方法,以便具有一致的行为,不依赖于变量list_of_dictionary的键是否是插入表的列名(在这些情况下,我的代码可以正常运行而不出错)。

2个回答

34

这个对我来说很有用:

from sqlalchemy import create_engine
from sqlalchemy import MetaData, Table
from sqlalchemy.dialects import postgresql
from sqlalchemy.inspection import inspect

def upsert(engine, schema, table_name, records=[]):

    metadata = MetaData(schema=schema)
    metadata.bind = engine

    table = Table(table_name, metadata, schema=schema, autoload=True)

    # get list of fields making up primary key
    primary_keys = [key.name for key in inspect(table).primary_key]

    # assemble base statement
    stmt = postgresql.insert(table).values(records)

    # define dict of non-primary keys for updating
    update_dict = {
        c.name: c
        for c in stmt.excluded
        if not c.primary_key
    }

    # cover case when all columns in table comprise a primary key
    # in which case, upsert is identical to 'on conflict do nothing.
    if update_dict == {}:
        warnings.warn('no updateable columns found for table')
        # we still wanna insert without errors
        insert_ignore(table_name, records)
        return None


    # assemble new statement with 'on conflict do update' clause
    update_stmt = stmt.on_conflict_do_update(
        index_elements=primary_keys,
        set_=update_dict,
    )

    # execute
    with engine.connect() as conn:
        result = conn.execute(update_stmt)
        return result

我之前不知道stmt.excluded,但那正是我所需要的。然而,我不明白你为什么要排除主键,set={c.name : c for c in stmt.excluded} 看起来就像预期的一样工作(我不介意“更新”主键,因为它根据定义是相同的值)。 - Trolin
哦,那是一个很好的观点。这会让代码至少更加优雅一些。 - ryantuck
这个能和 execute(query) 一起工作吗?query 在任何地方都没有定义。它应该是 execute(update_stmt) 吗? - zebrainatree
是的,完全正确 - 我已经更新了代码块以反映那个。 - ryantuck
4
insert_ignore是什么,我在你的导入中没有看到这个函数,另外records是什么?它是一个列表还是一个字典? - spitfiredd
抱歉 - 假设insert_ignore()是在文件中定义的一个函数,它会忽略而不是更新冲突的行。records是一个字典列表。 - ryantuck

0

如果有人正在寻找一个对象关系映射(ORM)解决方案,以下内容适用于我:

def upsert(
    sa_sessionmaker: Union[sessionmaker, scoped_session],
    model: DeclarativeMeta,
    get_values: Dict[str, Any],
    update_values: Dict[str, Any],
) -> Any:
    """Upserts (updates if exists, else inserts) a SQLAlchemy model object.

    Note that get_values must uniquely identify a single model object (row) for this
    function to work.

    Args:
        sa_sessionmaker: SQLAlchemy sessionmaker to connect to the database.
        model: Model declarative metadata.
        get_values: Arguments used to try to retrieve an existing object.
        update_values: Desired attributes for the object fetched via get_values, 
            or the new object if nothing was fetched.

    Returns:
        Model object subject to upsert.
    """
    with sa_sessionmaker() as session:
        instance = session.query(model).filter_by(**get_values).one_or_none()
        if instance:
            for attr, new_val in update_values.items():
                setattr(instance, attr, new_val)
        else:
            create_kwargs = get_values | update_values
            session.add(model(**create_kwargs))
        session.commit()
        instance = session.query(model).filter_by(**get_values).one_or_none()
    return instance

几点说明:

  • 如果对象的主键已知,使用 Session.merge() 可能是一个更好的选择,而不是上面的函数。在这个意义上,上面的函数假定主键是未知的(因此不是 get_values 的一部分)
  • sa_sessionmakerSession 对象的工厂(参见 文档
  • model 接受 SQLAlchemy 声明式元数据(即,“表”,请参见 文档
  • 实现上需要 Python >= 3.9。如果您的环境需要先前版本的 Python,请用 create_kwargs = {**get_values, **update_values} 替换 create_kwargs = get_values | update_values

2
这段代码存在竞态条件,因为它在Python中处理get_or_create而不是在数据库中处理。使用这段代码最终会引发sqlalchemy.exc.IntegrityError异常。 - Alan Hamlett

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接