SQLAlchemy ON DUPLICATE KEY UPDATE

52
在SQLAlchemy中,是否有一种优美的方法来执行INSERT ... ON DUPLICATE KEY UPDATE操作?我的意思是类似于inserter.insert().execute(list_of_dictionaries)这样的语法?
11个回答

57

ON DUPLICATE KEY UPDATE post version-1.2 for MySQL

这个功能现在已经内置于SQLAlchemy中,仅适用于MySQL。下面是somada141的最佳解决方案: https://dev59.com/Lmw15IYBdhLWcg3wcbWy#48373874

ON DUPLICATE KEY UPDATE 在SQL语句中的使用

如果你想让生成的SQL实际包含ON DUPLICATE KEY UPDATE,最简单的方法是使用@compiles修饰符。

关于一个示例的代码(链接来自一个有关主题的好线程on reddit),可以在on github上找到:

from sqlalchemy.ext.compiler import compiles
from sqlalchemy.sql.expression import Insert

@compiles(Insert)
def append_string(insert, compiler, **kw):
    s = compiler.visit_insert(insert, **kw)
    if 'append_string' in insert.kwargs:
        return s + " " + insert.kwargs['append_string']
    return s


my_connection.execute(my_table.insert(append_string = 'ON DUPLICATE KEY UPDATE foo=foo'), my_values)

请注意,在这种方法中,您必须手动创建append_string。您可能可以更改append_string函数,使其自动将插入字符串更改为带有“ON DUPLICATE KEY UPDATE”字符串的插入,但由于懒惰,我不会在此处执行该操作。
ORM中的“ON DUPLICATE KEY UPDATE”功能
SQLAlchemy在其ORM层中不提供与“ON DUPLICATE KEY UPDATE”或“MERGE”或任何其他类似功能的接口。尽管如此,它具有session.merge()函数,只有在关键字是主键时才能复制该功能。 session.merge(ModelObject)首先通过发送SELECT查询(或在本地查找)检查是否存在具有相同主键值的行。如果存在,则设置一个标志,指示ModelObject已经在数据库中,并且SQLAlchemy应该使用UPDATE查询。请注意,merge比这要复杂得多,但它很好地复制了具有主键的功能。
但是,如果您想要在非主键(例如另一个唯一键)上使用 ON DUPLICATE KEY UPDATE 功能怎么办?不幸的是,SQLAlchemy没有这样的功能。相反,您必须创建类似于Django的 get_or_create()函数。 另一个StackOverflow答案涵盖了它,我将在此处方便起见粘贴修改后的可工作版本。
def get_or_create(session, model, defaults=None, **kwargs):
    instance = session.query(model).filter_by(**kwargs).first()
    if instance:
        return instance
    else:
        params = dict((k, v) for k, v in kwargs.iteritems() if not isinstance(v, ClauseElement))
        if defaults:
            params.update(defaults)
        instance = model(**params)
        return instance

1
请注意,在Postgres上,append_string代码是无法正常工作的(在9.5版本中引入了新的ON CONFLICT [IGNORE|UPDATE]功能),因为ORM会自动将RETURNING {primary key}附加到插入语句中,这会导致SQL无效。 - Fake Name
这里的 foo=foo 部分是在做什么,我在自己的表格中应该用什么来替换 foo - nhinkle
append_string not work get SAWarning: Can't validate argument 'append_string'; can't locate any SQLAlchemy dialect named 'append' % (k, dialect_name) - wyx
1
请注意,get_or_create示例在并发系统上会出现竞态条件。相反,您应该先尝试插入,捕获键重复的异常并查询结果。 - Korenz

37

我应该提到,自从v1.2版本发布以来,SQLAlchemy的“核心”已经有了一个内置的解决方案,可以在此处查看(下面是复制的片段):http://docs.sqlalchemy.org/en/latest/dialects/mysql.html#insert-on-duplicate-key-update-upsert

from sqlalchemy.dialects.mysql import insert

insert_stmt = insert(my_table).values(
    id='some_existing_id',
    data='inserted value')

on_duplicate_key_stmt = insert_stmt.on_duplicate_key_update(
    data=insert_stmt.inserted.data,
    status='U'
)

conn.execute(on_duplicate_key_stmt)

1
是的,我应该澄清一下。上面只适用于MySQL,但是例如Postgres现在已经有了这样的功能,使用方法详见 http://docs.sqlalchemy.org/en/latest/dialects/postgresql.html#sqlalchemy.dialects.postgresql.dml.Insert.on_conflict_do_update 和 http://docs.sqlalchemy.org/en/latest/dialects/postgresql.html#sqlalchemy.dialects.postgresql.dml.Insert.on_conflict_do_nothing。 - somada141
5
如果需要,这也适用于值数组。这意味着values也接受dict对象的list - sheba
能否请您举个具体的例子,说明如何填充 data?是否类似于 data={'field_1'='value1'}。谢谢。 - Houman
2
这对批量插入更新也适用吗?因为我还没有成功地让它工作。 - Kailegh
1
我有一个唯一索引和一个自增的id主键。在我的情况下,id一直在自增,如何解决这个问题? - M.Abulsoud
显示剩余8条评论

2

我的方式

import typing
from datetime import datetime
from sqlalchemy.dialects import mysql

class MyRepository:

    def model(self):
        return MySqlAlchemyModel

    def upsert(self, data: typing.List[typing.Dict]):
        if not data:
            return
        model = self.model()
        if hasattr(model, 'created_at'):
            for item in data:
                item['created_at'] = datetime.now()

        stmt = mysql.insert(getattr(model, '__table__')).values(data)
        for_update = []
        for k, v in data[0].items():
            for_update.append(k)

        dup = {k: getattr(stmt.inserted, k) for k in for_update}
        stmt = stmt.on_duplicate_key_update(**dup)
        self.db.session.execute(stmt)
        self.db.session.commit()

使用方法:

myrepo.upsert([
    {
        "field11": "value11",
        "field21": "value21",
        "field31": "value31",
    },
    {
        "field12": "value12",
        "field22": "value22",
        "field32": "value32",
    },
])

2

根据phsource的回答,针对使用MySQL并完全覆盖相同键数据而不执行DELETE语句的特定用例,可以使用以下@compiles修饰的插入表达式:

from sqlalchemy.ext.compiler import compiles
from sqlalchemy.sql.expression import Insert

@compiles(Insert)
def append_string(insert, compiler, **kw):
    s = compiler.visit_insert(insert, **kw)
    if insert.kwargs.get('on_duplicate_key_update'):
        fields = s[s.find("(") + 1:s.find(")")].replace(" ", "").split(",")
        generated_directive = ["{0}=VALUES({0})".format(field) for field in fields]
        return s + " ON DUPLICATE KEY UPDATE " + ",".join(generated_directive)
    return s

这个例子没有很好地转义字段值。你应该使用内置的转义方法:https://dev59.com/hV8f5IYBdhLWcg3wB-7E#25107658 - phsource
1
请注意,在此示例中,我们使用原始“INSERT”中的字段值覆盖字段(指字段名称而不是值),因此不需要转义。显然,使用现在已成为ORM功能的方法更好(除非与“INSERT FROM SELECT”一起使用,因为它不能按预期工作)。 - sheba

1
这取决于您。如果想要替换,则在前缀中传递OR REPLACE
  def bulk_insert(self,objects,table):
    #table: Your table class and objects are list of dictionary [{col1:val1, col2:vale}] 
    for counter,row in enumerate(objects):
        inserter = table.__table__.insert(prefixes=['OR IGNORE'], values=row)
        try:
            self.db.execute(inserter)
        except Exception as E:
            print E
        if counter % 100 == 0:
            self.db.commit()                    
    self.db.commit()

这里的提交间隔可以更改,以加快或减慢速度。

1

有一个更简单的解决方案:

from sqlalchemy.ext.compiler import compiles
from sqlalchemy.sql.expression import Insert

@compiles(Insert)
def replace_string(insert, compiler, **kw):
    s = compiler.visit_insert(insert, **kw)
    s = s.replace("INSERT INTO", "REPLACE INTO")
    return s

my_connection.execute(my_table.insert(replace_string=""), my_values)

12
保重。REPLACE INTOINSERT ... ON DUPLICATE KEY UPDATE有不同的功能。 - Dennis S Hennen
3
值得注意的是,该操作会删除行,因此在InnoDB(或任何其他事务性引擎)表上使用这种解决方案通常没有什么用处,因为它会受到大多数FOREIGN KEY约束的限制。 - Naltharial
它与MySql一起工作得很好。话虽如此,该表上没有任何外键。 - algarecu

1

ORM 使用基于on_duplicate_key_updateupset函数

class Model():
    __input_data__ = dict()

    def __init__(self, **kwargs) -> None:
        self.__input_data__ = kwargs
        self.session = Session(engine)

    def save(self):
        self.session.add(self)
        self.session.commit()
    
    def upsert(self, *, ingore_keys = []):
        column_keys = self.__table__.columns.keys()

        udpate_data = dict()
        for key in self.__input_data__.keys():
            if key not in column_keys:
                continue
            else:
                udpate_data[key] = self.__input_data__[key]

        insert_stmt = insert(self.__table__).values(**udpate_data)

        all_ignore_keys = ['id']
        if isinstance(ingore_keys, list):
            all_ignore_keys =[*all_ignore_keys, *ingore_keys]
        else:
            all_ignore_keys.append(ingore_keys)

        udpate_columns = dict()
        for key in self.__input_data__.keys():
            if key not in column_keys or key in all_ignore_keys:
                continue
            else:
                udpate_columns[key] = insert_stmt.inserted[key]
        
        on_duplicate_key_stmt = insert_stmt.on_duplicate_key_update(
            **udpate_columns
        )
        # self.session.add(self)
        self.session.execute(on_duplicate_key_stmt)
        self.session.commit()


class ManagerAssoc(ORM_Base, Model):
    def __init__(self, **kwargs):
        self.id = idWorker.get_id()
        column_keys = self.__table__.columns.keys()
        udpate_data = dict()
        for key in kwargs.keys():
            if key not in column_keys:
                continue
            else:
                udpate_data[key] = kwargs[key]
        ORM_Base.__init__(self, **udpate_data)
        Model.__init__(self, **kwargs, id = self.id)

   ....
# you can call it as following:
manager_assoc.upsert()
manager.upsert(ingore_keys = ['manager_id'])


抱歉如果这显得过于挑剔,也许你想将“udpate”更正为“update”? - chrisinmtown

1
其他回答已经涵盖了这个问题,但我想提供另一个关于mysql的好例子,我在this gist中发现了它。这也包括使用LAST_INSERT_ID,这取决于你的innodb自动增量设置和你的表是否有唯一键可能会很有用。这里提供代码以便参考,但如果你觉得有用,请给作者点个赞。
from app import db
from sqlalchemy import func
from sqlalchemy.dialects.mysql import insert

def upsert(model, insert_dict):
    """model can be a db.Model or a table(), insert_dict should contain a primary or unique key."""
    inserted = insert(model).values(**insert_dict)
    upserted = inserted.on_duplicate_key_update(
        id=func.LAST_INSERT_ID(model.id), **{k: inserted.inserted[k]
                               for k, v in insert_dict.items()})
    res = db.engine.execute(upserted)
    return res.lastrowid

1

2023年2月更新:SQLAlchemy 2版本最近发布,支持MySQL方言中的on_duplicate_key_update。非常感谢SQLAlchemy项目的Federico Caselli,在https://github.com/sqlalchemy/sqlalchemy/discussions/9328的讨论中帮助我开发了示例代码。

请参见https://stackoverflow.com/a/75538576/1630244

如果在此处两次发布相同答案(?)是可以的,请看我的小型自包含代码示例:

import sqlalchemy as db
import sqlalchemy.dialects.mysql as mysql
from sqlalchemy import delete, select, String
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column


class Base(DeclarativeBase):
    pass


class User(Base):
    __tablename__ = "foo"
    id: Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str] = mapped_column(String(30))


engine = db.create_engine('mysql+mysqlconnector://USER-NAME-HERE:PASS-WORD-HERE@localhost/SCHEMA-NAME-HERE')
conn = engine.connect()

# setup step 0 - ensure the table exists
Base().metadata.create_all(bind=engine)

# setup step 1 - clean out rows with id 1..5
del_stmt = delete(User).where(User.id.in_([1, 2, 3, 4, 5]))
conn.execute(del_stmt)
conn.commit()
sel_stmt = select(User)
users = list(conn.execute(sel_stmt))
print(f'Table size after cleanout: {len(users)}')

# setup step 2 - insert 4 rows
ins_stmt = mysql.insert(User).values(
    [
        {"id": 1, "name": "x"},
        {"id": 2, "name": "y"},
        {"id": 3, "name": "w"},
        {"id": 4, "name": "z"},
    ]
)
conn.execute(ins_stmt)
conn.commit()
users = list(conn.execute(sel_stmt))
print(f'Table size after insert: {len(users)}')

# demonstrate upsert
ups_stmt = mysql.insert(User).values(
    [
        {"id": 1, "name": "xx"},
        {"id": 2, "name": "yy"},
        {"id": 3, "name": "ww"},
        {"id": 5, "name": "new"},
    ]
)
ups_stmt = ups_stmt.on_duplicate_key_update(name=ups_stmt.inserted.name)
# if you want to see the compiled result
# x = ups_stmt.compile(dialect=mysql.dialect())
# print(x.string, x.construct_params())
conn.execute(ups_stmt)
conn.commit()

users = list(conn.execute(sel_stmt))
print(f'Table size after upsert: {len(users)}')

0

我只是使用了普通的SQL语句:

insert_stmt = "REPLACE INTO tablename (column1, column2) VALUES (:column_1_bind, :columnn_2_bind) "
session.execute(insert_stmt, data)

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接