如何使用SqlAlchemy执行upsert操作?

135

如果数据库中不存在一条记录,我希望它被创建,如果该记录已存在(主键已存在),则将其字段更新为当前状态。这通常称为upsert

下面这段不完整的代码片段可以实现此功能,但似乎有些笨重(特别是如果有更多列的情况下)。有没有更好/更佳的方法?

Base = declarative_base()
class Template(Base):
    __tablename__ = 'templates'
    id = Column(Integer, primary_key = True)
    name = Column(String(80), unique = True, index = True)
    template = Column(String(80), unique = True)
    description = Column(String(200))
    def __init__(self, Name, Template, Desc):
        self.name = Name
        self.template = Template
        self.description = Desc

def UpsertDefaultTemplate():
    sess = Session()
    desired_default = Template("default", "AABBCC", "This is the default template")
    try:
        q = sess.query(Template).filter_by(name = desiredDefault.name)
        existing_default = q.one()
    except sqlalchemy.orm.exc.NoResultFound:
        #default does not exist yet, so add it...
        sess.add(desired_default)
    else:
        #default already exists.  Make sure the values are what we want...
        assert isinstance(existing_default, Template)
        existing_default.name = desired_default.name
        existing_default.template = desired_default.template
        existing_default.description = desired_default.description
    sess.flush()

有没有更好或更简洁的方法来完成这个任务?类似下面这样的东西就很好:

sess.upsert_this(desired_default, unique_key = "name")

虽然unique_key参数显然是不必要的(ORM应该能够轻松地找出这一点),但我添加了它,因为SQLAlchemy倾向于仅使用主键。例如:我一直在研究是否可以使用Session.merge来实现这个目的,但这只适用于主键,在这种情况下,它是一个自增的id,对于此目的并不是非常有用。

一个简单的用例是当启动服务器应用程序时可能已经升级了其默认预期数据。即:对于此upsert,不存在并发问题。


3
如果name字段是唯一的,为什么不能将其设为主键(在这种情况下合并会起作用)?为什么需要一个单独的主键? - abbot
15
@abbot:我不想卷入关于 ID 字段的争论,但是……简单回答一下就是“外键”。更详细点说,虽然名称确实是唯一必需的主键,但存在两个问题。1)当一个模板记录被另一个表中的 5000 万条记录引用时,把该外键作为字符串字段是不可取的。索引整数更好,因此看起来没有意义的 id 列就有了用武之地。2)进一步说,如果使用字符串作为 FK,那么在名称更改时需要更新两个位置,这很麻烦且容易出现关系失效的问题。而 id 永远不会改变。 - Russ
你可以尝试一下新的(beta版)Python upsert库,它兼容psycopg2、sqlite3和MySQLdb。 - Seamus Abshere
11个回答

78

SQLAlchemy支持两种方法的ON CONFLICT, 分别是on_conflict_do_update()on_conflict_do_nothing()

文档中复制:

from sqlalchemy.dialects.postgresql import insert

stmt = insert(my_table).values(user_email='a@b.com', data='inserted data')
stmt = stmt.on_conflict_do_update(
    index_elements=[my_table.c.user_email],
    index_where=my_table.c.user_email.like('%@gmail.com'),
    set_=dict(data=stmt.excluded.data)
)
conn.execute(stmt)

3
MySQL也支持使用on_duplicate_key_update - Michael Berdyshev
2
这段代码我认为是可以的(答案已经有3年了),但也许Michael的评论适用于MySQL。一般来说,我的回答有点草率地得出了使用Postgres作为数据库的结论。这并不好,因为它并没有真正回答所问的通用问题。但基于我得到的赞数,我认为对某些人很有用,所以我把它留了下来。 - P.R.
2
为什么在 set_ 中有 excluded?set_=dict(data=stmt.excluded.data) - Shivam Chaurasia
2
顺便说一下,如果你只想更新所有被排除的列,stmt.excluded是一个ColumnCollection,它可以作为一个映射来使用,所以你可以简单地写成set_=stmt.excluded - Ugtar
2
顺便提一下,如果你只想更新所有被排除的列,stmt.excluded是一个ColumnCollection,它可以作为一个映射使用,所以你可以简单地写成set_=stmt.excluded - undefined
显示剩余5条评论

75

SQLAlchemy确实具有"保存或更新"行为,最近的版本已经将其内置到session.add中,但以前是独立的session.saveorupdate调用。 这不是"upsert",但这可能已经足够满足您的需求。

很好,您正在询问具有多个唯一键的类; 我认为这正是没有单一正确方法的原因。主键也是唯一键。 如果没有唯一约束条件,只有主键,那么这将是一个简单的问题:如果不存在具有给定ID的内容,或者ID为None,则创建一个新记录; 否则,在具有该主键的现有记录中更新所有其他字段。

然而,当存在其他唯一约束时,这种简单方法会出现逻辑问题。 如果要"upsert"对象,并且您对象的主键与现有记录匹配,但另一个唯一列匹配不同的记录,那么该怎么办? 同样,如果主键不匹配任何现有记录,但另一个唯一列匹配现有记录,那么怎么办? 也许有一个特定情况的正确答案,但我认为通常没有唯一正确的答案。

这就是为什么没有内置的"upsert"操作的原因。应用程序必须定义在每种特定情况下意味着什么。


33

现今,SQLAlchemy 提供了两个有用的函数on_conflict_do_nothingon_conflict_do_update。这些函数很有用,但要使用它们就需要从ORM接口转到更底层的接口——SQLAlchemy Core

虽然这两个函数让使用SQLAlchemy语法进行upsert变得不那么困难,但是这些函数远不能提供完整的即插即用的upsert解决方案。

我经常遇到的通用用例是在单个SQL查询/会话执行中上插入大块行。我通常遇到两个问题:

例如,缺少我们已经习惯的更高级别的ORM功能。你不能使用ORM对象,而是必须在插入时提供ForeignKey

我正在使用我编写的函数来处理这两个问题:

def upsert(session, model, rows):
    table = model.__table__
    stmt = postgresql.insert(table)
    primary_keys = [key.name for key in inspect(table).primary_key]
    update_dict = {c.name: c for c in stmt.excluded if not c.primary_key}

    if not update_dict:
        raise ValueError("insert_or_update resulted in an empty update_dict")

    stmt = stmt.on_conflict_do_update(index_elements=primary_keys,
                                      set_=update_dict)

    seen = set()
    foreign_keys = {col.name: list(col.foreign_keys)[0].column for col in table.columns if col.foreign_keys}
    unique_constraints = [c for c in table.constraints if isinstance(c, UniqueConstraint)]
    def handle_foreignkeys_constraints(row):
        for c_name, c_value in foreign_keys.items():
            foreign_obj = row.pop(c_value.table.name, None)
            row[c_name] = getattr(foreign_obj, c_value.name) if foreign_obj else None

        for const in unique_constraints:
            unique = tuple([const,] + [row[col.name] for col in const.columns])
            if unique in seen:
                return None
            seen.add(unique)

        return row

    rows = list(filter(None, (handle_foreignkeys_constraints(row) for row in rows)))
    session.execute(stmt, rows)

12
只有支持原生ON CONFLICT语句的后端才能使用on_conflict功能。因此,只有PostgreSQL可以使用。 - cowbert
7
现在,SQLAlchemy也支持MySQL的"ON DUPLICATE KEY UPDATE"。 - Michael Berdyshev
@Nirlze 看起来缺失了,你提到的两个问题是什么? - Dang Huy Nguyen

14

我采用“三思而后行”的方法:

# first get the object from the database if it exists
# we're guaranteed to only get one or zero results
# because we're filtering by primary key
switch_command = session.query(Switch_Command).\
    filter(Switch_Command.switch_id == switch.id).\
    filter(Switch_Command.command_id == command.id).first()

# If we didn't get anything, make one
if not switch_command:
    switch_command = Switch_Command(switch_id=switch.id, command_id=command.id)

# update the stuff we care about
switch_command.output = 'Hooray!'
switch_command.lastseen = datetime.datetime.utcnow()

session.add(switch_command)
# This will generate either an INSERT or UPDATE
# depending on whether we have a new object or not
session.commit()

优点是这种方法可以与各种数据库兼容,且易于阅读。缺点是在以下场景中可能存在潜在的"竞争条件"(race condition):
  • 我们从数据库中查询一个switch_command,但没有找到
  • 我们创建一个switch_command
  • 另一个进程或线程创建了一个具有与我们相同主键的switch_command
  • 我们试图提交我们的switch_command

这个问题使用try/catch处理竞态条件。 - Ben
44
Upsert的整个目标是避免这里描述的竞态条件。 - sampierson
@sampierson 我知道,这就是为什么SQLAlchemy让它变得干净和可移植变得困难的原因...我在我的答案中强调了竞争条件。 - Ben

4
有多个答案,这里提供另一个答案(YAA)。其他答案由于涉及元编程而不太易读。这里有一个例子:
  • 使用SQLAlchemy ORM

  • 展示如何在使用on_conflict_do_nothing时,如果没有行则创建一行。

  • 展示如何在不创建新行的情况下更新现有行(如果有的话),使用on_conflict_do_update

  • 将表主键用作constraint

原问题和更多示例详见此处


import sqlalchemy as sa
import sqlalchemy.orm as orm
from sqlalchemy import text
from sqlalchemy.dialects.postgresql import insert
from sqlalchemy.orm import Session

class PairState(Base):

    __tablename__ = "pair_state"

    # This table has 1-to-1 relationship with Pair
    pair_id = sa.Column(sa.ForeignKey("pair.id"), nullable=False, primary_key=True, unique=True)
    pair = orm.relationship(Pair,
                        backref=orm.backref("pair_state",
                                        lazy="dynamic",
                                        cascade="all, delete-orphan",
                                        single_parent=True, ), )


    # First raw event in data stream
    first_event_at = sa.Column(sa.TIMESTAMP(timezone=True), nullable=False, server_default=text("TO_TIMESTAMP(0)"))

    # Last raw event in data stream
    last_event_at = sa.Column(sa.TIMESTAMP(timezone=True), nullable=False, server_default=text("TO_TIMESTAMP(0)"))

    # The last hypertable entry added
    last_interval_at = sa.Column(sa.TIMESTAMP(timezone=True), nullable=False, server_default=text("TO_TIMESTAMP(0)"))

    @staticmethod
    def create_first_event_if_not_exist(dbsession: Session, pair_id: int, ts: datetime.datetime):
        """Sets the first event value if not exist yet."""
        dbsession.execute(
            insert(PairState).
            values(pair_id=pair_id, first_event_at=ts).
            on_conflict_do_nothing()
        )

    @staticmethod
    def update_last_event(dbsession: Session, pair_id: int, ts: datetime.datetime):
        """Replaces the the column last_event_at for a named pair."""
        # Based on the original example of https://dev59.com/xarka4cB1Zd3GeqPib1o#49917004
        dbsession.execute(
            insert(PairState).
            values(pair_id=pair_id, last_event_at=ts).
            on_conflict_do_update(constraint=PairState.__table__.primary_key, set_={"last_event_at": ts})
        )

    @staticmethod
    def update_last_interval(dbsession: Session, pair_id: int, ts: datetime.datetime):
        """Replaces the the column last_interval_at for a named pair."""
        dbsession.execute(
            insert(PairState).
            values(pair_id=pair_id, last_interval_at=ts).
            on_conflict_do_update(constraint=PairState.__table__.primary_key, set_={"last_interval_at": ts})
        )

一种有趣的方法是将功能添加到模型中。我想知道是否可以将其移动到基础模型中,以便其他模型可以继承,并在所有模型中默认可用。 - Liquidgenius
1
我能够根据我的情况进行调整,使用会话和execute。需要from sqlalchemy.dialects.postgresql import insert。谢谢! - Ahmed Fasih

3
下面的内容对于我来说在使用Redshift数据库时运行良好,并且也适用于组合主键约束。
来源:this 只需要对函数def start_engine()中创建SQLAlchemy引擎进行一些修改即可。
from sqlalchemy import Column, Integer, Date ,Metadata
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.dialects.postgresql import insert
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.dialects import postgresql

Base = declarative_base()

def start_engine():
    engine = create_engine(os.getenv('SQLALCHEMY_URI', 
    'postgresql://localhost:5432/upsert'))
     connect = engine.connect()
    meta = MetaData(bind=engine)
    meta.reflect(bind=engine)
    return engine


class DigitalSpend(Base):
    __tablename__ = 'digital_spend'
    report_date = Column(Date, nullable=False)
    day = Column(Date, nullable=False, primary_key=True)
    impressions = Column(Integer)
    conversions = Column(Integer)

    def __repr__(self):
        return str([getattr(self, c.name, None) for c in self.__table__.c])


def compile_query(query):
    compiler = query.compile if not hasattr(query, 'statement') else 
  query.statement.compile
    return compiler(dialect=postgresql.dialect())


def upsert(session, model, rows, as_of_date_col='report_date', no_update_cols=[]):
    table = model.__table__

    stmt = insert(table).values(rows)

    update_cols = [c.name for c in table.c
                   if c not in list(table.primary_key.columns)
                   and c.name not in no_update_cols]

    on_conflict_stmt = stmt.on_conflict_do_update(
        index_elements=table.primary_key.columns,
        set_={k: getattr(stmt.excluded, k) for k in update_cols},
        index_where=(getattr(model, as_of_date_col) < getattr(stmt.excluded, as_of_date_col))
        )

    print(compile_query(on_conflict_stmt))
    session.execute(on_conflict_stmt)


session = start_engine()
upsert(session, DigitalSpend, initial_rows, no_update_cols=['conversions'])

2
这使得可以通过字符串名称访问基础模型。"Original Answer"翻译成"最初的回答"。
def get_class_by_tablename(tablename):
  """Return class reference mapped to table.
  https://dev59.com/gWgt5IYBdhLWcg3w-ST1
  :param tablename: String with name of table.
  :return: Class reference or None.
  """
  for c in Base._decl_class_registry.values():
    if hasattr(c, '__tablename__') and c.__tablename__ == tablename:
      return c


sqla_tbl = get_class_by_tablename(table_name)

def handle_upsert(record_dict, table):
    """
    handles updates when there are primary key conflicts

    """
    try:
        self.active_session().add(table(**record_dict))
    except:
        # Here we'll assume the error is caused by an integrity error
        # We do this because the error classes are passed from the
        # underlying package (pyodbc / sqllite) SQLAlchemy doesn't mask
        # them with it's own code - this should be updated to have
        # explicit error handling for each new db engine

        # <update>add explicit error handling for each db engine</update> 
        active_session.rollback()
        # Query for conflic class, use update method to change values based on dict
        c_tbl_primary_keys = [i.name for i in table.__table__.primary_key] # List of primary key col names
        c_tbl_cols = dict(sqla_tbl.__table__.columns) # String:Col Object crosswalk

        c_query_dict = {k:record_dict[k] for k in c_tbl_primary_keys if k in record_dict} # sub-dict from data of primary key:values
        c_oo_query_dict = {c_tbl_cols[k]:v for (k,v) in c_query_dict.items()} # col-object:query value for primary key cols

        c_target_record = session.query(sqla_tbl).filter(*[k==v for (k,v) in oo_query_dict.items()]).first()

        # apply new data values to the existing record
        for k, v in record_dict.items()
            setattr(c_target_record, k, v)

1

由于生成的默认ID和引用存在问题,导致外键违规错误,例如:

update or delete on table "..." violates foreign key constraint
Key (id)=(...) is still referenced from table "...".

我们需要在更新字典时排除ID,否则它将始终生成新的默认值。
此外,该方法返回已创建/更新的实体。
from sqlalchemy.dialects.postgresql import insert # Important to use the postgresql insert


def upsert(session, data, key_columns, model):

    stmt = insert(model).values(data)
    
    # Important to exclude the ID for update!
    exclude_for_update = [model.id.name, *key_columns]
    update_dict = {c.name: c for c in stmt.excluded if c.name not in exclude_for_update}

    stmt = stmt.on_conflict_do_update(
        index_elements=key_columns,
        set_=update_dict
    ).returning(model)

    orm_stmt = (
        select(model)
        .from_statement(stmt)
        .execution_options(populate_existing=True)
    )

    return session.execute(orm_stmt).scalar()

例子:


class UpsertUser(Base):
    __tablename__ = 'upsert_user'
    id = Column(Id, primary_key=True, default=uuid.uuid4)
    name: str = Column(sa.String, nullable=False)
    user_sid: str = Column(sa.String, nullable=False, unique=True)
    house_admin = relationship('UpsertHouse', back_populates='admin', uselist=False)


class UpsertHouse(Base):
    __tablename__ = 'upsert_house'
    id = Column(Id, primary_key=True, default=uuid.uuid4)
    admin_id: Id = Column(Id, ForeignKey('upsert_user.id'), nullable=False)
    admin: UpsertUser = relationship('UpsertUser', back_populates='house_admin', uselist=False)

# Usage

upserted_user = upsert(session, updated_user, [UpsertUser.user_sid.name], UpsertUser)

注意:仅在PostgreSQL上进行了测试,但也可以适用于支持ON DUPLICATE KEY UPDATE的其他数据库,例如MySQL。

1

对于我来说,这适用于sqlite3和postgres。尽管它可能在合并的主键约束条件下失败,并且很有可能在添加额外的唯一性约束条件下失败。

    try:
        t = self._meta.tables[data['table']]
    except KeyError:
        self._log.error('table "%s" unknown', data['table'])
        return

    try:
        q = insert(t, values=data['values'])
        self._log.debug(q)
        self._db.execute(q)
    except IntegrityError:
        self._log.warning('integrity error')
        where_clause = [c.__eq__(data['values'][c.name]) for c in t.c if c.primary_key]
        update_dict = {c.name: data['values'][c.name] for c in t.c if not c.primary_key}
        q = update(t, values=update_dict).where(*where_clause)
        self._log.debug(q)
        self._db.execute(q)
    except Exception as e:
        self._log.error('%s: %s', t.name, e)

0
对于 SQLite,可以在定义 UniqueConstraint 时使用 sqlite_on_conflict ='REPLACE' 选项,并使用 sqlite_on_conflict_unique 来表示单列的唯一约束。然后 session.add 就会像 upsert 一样工作。请参阅官方文档 documentation

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接