INSERT ... ON DUPLICATE KEY UPDATE
操作?我的意思是类似于inserter.insert().execute(list_of_dictionaries)
这样的语法?INSERT ... ON DUPLICATE KEY UPDATE
操作?我的意思是类似于inserter.insert().execute(list_of_dictionaries)
这样的语法?ON DUPLICATE KEY UPDATE
post version-1.2 for MySQL这个功能现在已经内置于SQLAlchemy中,仅适用于MySQL。下面是somada141的最佳解决方案: https://dev59.com/Lmw15IYBdhLWcg3wcbWy#48373874
ON DUPLICATE KEY UPDATE
在SQL语句中的使用如果你想让生成的SQL实际包含ON DUPLICATE KEY UPDATE
,最简单的方法是使用@compiles
修饰符。
关于一个示例的代码(链接来自一个有关主题的好线程on reddit),可以在on github上找到:
from sqlalchemy.ext.compiler import compiles
from sqlalchemy.sql.expression import Insert
@compiles(Insert)
def append_string(insert, compiler, **kw):
s = compiler.visit_insert(insert, **kw)
if 'append_string' in insert.kwargs:
return s + " " + insert.kwargs['append_string']
return s
my_connection.execute(my_table.insert(append_string = 'ON DUPLICATE KEY UPDATE foo=foo'), my_values)
session.merge(ModelObject)
首先通过发送SELECT
查询(或在本地查找)检查是否存在具有相同主键值的行。如果存在,则设置一个标志,指示ModelObject已经在数据库中,并且SQLAlchemy应该使用UPDATE
查询。请注意,merge比这要复杂得多,但它很好地复制了具有主键的功能。 ON DUPLICATE KEY UPDATE
功能怎么办?不幸的是,SQLAlchemy没有这样的功能。相反,您必须创建类似于Django的 get_or_create()
函数。 另一个StackOverflow答案涵盖了它,我将在此处方便起见粘贴修改后的可工作版本。def get_or_create(session, model, defaults=None, **kwargs):
instance = session.query(model).filter_by(**kwargs).first()
if instance:
return instance
else:
params = dict((k, v) for k, v in kwargs.iteritems() if not isinstance(v, ClauseElement))
if defaults:
params.update(defaults)
instance = model(**params)
return instance
我应该提到,自从v1.2版本发布以来,SQLAlchemy的“核心”已经有了一个内置的解决方案,可以在此处查看(下面是复制的片段):http://docs.sqlalchemy.org/en/latest/dialects/mysql.html#insert-on-duplicate-key-update-upsert
from sqlalchemy.dialects.mysql import insert
insert_stmt = insert(my_table).values(
id='some_existing_id',
data='inserted value')
on_duplicate_key_stmt = insert_stmt.on_duplicate_key_update(
data=insert_stmt.inserted.data,
status='U'
)
conn.execute(on_duplicate_key_stmt)
values
也接受dict
对象的list
。 - shebadata
?是否类似于 data={'field_1'='value1'}
。谢谢。 - Houman我的方式
import typing
from datetime import datetime
from sqlalchemy.dialects import mysql
class MyRepository:
def model(self):
return MySqlAlchemyModel
def upsert(self, data: typing.List[typing.Dict]):
if not data:
return
model = self.model()
if hasattr(model, 'created_at'):
for item in data:
item['created_at'] = datetime.now()
stmt = mysql.insert(getattr(model, '__table__')).values(data)
for_update = []
for k, v in data[0].items():
for_update.append(k)
dup = {k: getattr(stmt.inserted, k) for k in for_update}
stmt = stmt.on_duplicate_key_update(**dup)
self.db.session.execute(stmt)
self.db.session.commit()
使用方法:
myrepo.upsert([
{
"field11": "value11",
"field21": "value21",
"field31": "value31",
},
{
"field12": "value12",
"field22": "value22",
"field32": "value32",
},
])
根据phsource的回答,针对使用MySQL并完全覆盖相同键数据而不执行DELETE
语句的特定用例,可以使用以下@compiles
修饰的插入表达式:
from sqlalchemy.ext.compiler import compiles
from sqlalchemy.sql.expression import Insert
@compiles(Insert)
def append_string(insert, compiler, **kw):
s = compiler.visit_insert(insert, **kw)
if insert.kwargs.get('on_duplicate_key_update'):
fields = s[s.find("(") + 1:s.find(")")].replace(" ", "").split(",")
generated_directive = ["{0}=VALUES({0})".format(field) for field in fields]
return s + " ON DUPLICATE KEY UPDATE " + ",".join(generated_directive)
return s
OR REPLACE
。 def bulk_insert(self,objects,table):
#table: Your table class and objects are list of dictionary [{col1:val1, col2:vale}]
for counter,row in enumerate(objects):
inserter = table.__table__.insert(prefixes=['OR IGNORE'], values=row)
try:
self.db.execute(inserter)
except Exception as E:
print E
if counter % 100 == 0:
self.db.commit()
self.db.commit()
有一个更简单的解决方案:
from sqlalchemy.ext.compiler import compiles
from sqlalchemy.sql.expression import Insert
@compiles(Insert)
def replace_string(insert, compiler, **kw):
s = compiler.visit_insert(insert, **kw)
s = s.replace("INSERT INTO", "REPLACE INTO")
return s
my_connection.execute(my_table.insert(replace_string=""), my_values)
REPLACE INTO
和INSERT ... ON DUPLICATE KEY UPDATE
有不同的功能。 - Dennis S HennenInnoDB
(或任何其他事务性引擎)表上使用这种解决方案通常没有什么用处,因为它会受到大多数FOREIGN KEY
约束的限制。 - NaltharialORM
使用基于on_duplicate_key_update
的upset
函数
class Model():
__input_data__ = dict()
def __init__(self, **kwargs) -> None:
self.__input_data__ = kwargs
self.session = Session(engine)
def save(self):
self.session.add(self)
self.session.commit()
def upsert(self, *, ingore_keys = []):
column_keys = self.__table__.columns.keys()
udpate_data = dict()
for key in self.__input_data__.keys():
if key not in column_keys:
continue
else:
udpate_data[key] = self.__input_data__[key]
insert_stmt = insert(self.__table__).values(**udpate_data)
all_ignore_keys = ['id']
if isinstance(ingore_keys, list):
all_ignore_keys =[*all_ignore_keys, *ingore_keys]
else:
all_ignore_keys.append(ingore_keys)
udpate_columns = dict()
for key in self.__input_data__.keys():
if key not in column_keys or key in all_ignore_keys:
continue
else:
udpate_columns[key] = insert_stmt.inserted[key]
on_duplicate_key_stmt = insert_stmt.on_duplicate_key_update(
**udpate_columns
)
# self.session.add(self)
self.session.execute(on_duplicate_key_stmt)
self.session.commit()
class ManagerAssoc(ORM_Base, Model):
def __init__(self, **kwargs):
self.id = idWorker.get_id()
column_keys = self.__table__.columns.keys()
udpate_data = dict()
for key in kwargs.keys():
if key not in column_keys:
continue
else:
udpate_data[key] = kwargs[key]
ORM_Base.__init__(self, **udpate_data)
Model.__init__(self, **kwargs, id = self.id)
....
# you can call it as following:
manager_assoc.upsert()
manager.upsert(ingore_keys = ['manager_id'])
LAST_INSERT_ID
,这取决于你的innodb自动增量设置和你的表是否有唯一键可能会很有用。这里提供代码以便参考,但如果你觉得有用,请给作者点个赞。from app import db
from sqlalchemy import func
from sqlalchemy.dialects.mysql import insert
def upsert(model, insert_dict):
"""model can be a db.Model or a table(), insert_dict should contain a primary or unique key."""
inserted = insert(model).values(**insert_dict)
upserted = inserted.on_duplicate_key_update(
id=func.LAST_INSERT_ID(model.id), **{k: inserted.inserted[k]
for k, v in insert_dict.items()})
res = db.engine.execute(upserted)
return res.lastrowid
2023年2月更新:SQLAlchemy 2版本最近发布,支持MySQL方言中的on_duplicate_key_update
。非常感谢SQLAlchemy项目的Federico Caselli,在https://github.com/sqlalchemy/sqlalchemy/discussions/9328的讨论中帮助我开发了示例代码。
请参见https://stackoverflow.com/a/75538576/1630244
如果在此处两次发布相同答案(?)是可以的,请看我的小型自包含代码示例:
import sqlalchemy as db
import sqlalchemy.dialects.mysql as mysql
from sqlalchemy import delete, select, String
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
class Base(DeclarativeBase):
pass
class User(Base):
__tablename__ = "foo"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(String(30))
engine = db.create_engine('mysql+mysqlconnector://USER-NAME-HERE:PASS-WORD-HERE@localhost/SCHEMA-NAME-HERE')
conn = engine.connect()
# setup step 0 - ensure the table exists
Base().metadata.create_all(bind=engine)
# setup step 1 - clean out rows with id 1..5
del_stmt = delete(User).where(User.id.in_([1, 2, 3, 4, 5]))
conn.execute(del_stmt)
conn.commit()
sel_stmt = select(User)
users = list(conn.execute(sel_stmt))
print(f'Table size after cleanout: {len(users)}')
# setup step 2 - insert 4 rows
ins_stmt = mysql.insert(User).values(
[
{"id": 1, "name": "x"},
{"id": 2, "name": "y"},
{"id": 3, "name": "w"},
{"id": 4, "name": "z"},
]
)
conn.execute(ins_stmt)
conn.commit()
users = list(conn.execute(sel_stmt))
print(f'Table size after insert: {len(users)}')
# demonstrate upsert
ups_stmt = mysql.insert(User).values(
[
{"id": 1, "name": "xx"},
{"id": 2, "name": "yy"},
{"id": 3, "name": "ww"},
{"id": 5, "name": "new"},
]
)
ups_stmt = ups_stmt.on_duplicate_key_update(name=ups_stmt.inserted.name)
# if you want to see the compiled result
# x = ups_stmt.compile(dialect=mysql.dialect())
# print(x.string, x.construct_params())
conn.execute(ups_stmt)
conn.commit()
users = list(conn.execute(sel_stmt))
print(f'Table size after upsert: {len(users)}')
我只是使用了普通的SQL语句:
insert_stmt = "REPLACE INTO tablename (column1, column2) VALUES (:column_1_bind, :columnn_2_bind) "
session.execute(insert_stmt, data)
append_string
代码是无法正常工作的(在9.5版本中引入了新的ON CONFLICT [IGNORE|UPDATE]
功能),因为ORM会自动将RETURNING {primary key}
附加到插入语句中,这会导致SQL无效。 - Fake Namefoo=foo
部分是在做什么,我在自己的表格中应该用什么来替换foo
? - nhinkleappend_string
not work getSAWarning: Can't validate argument 'append_string'; can't locate any SQLAlchemy dialect named 'append' % (k, dialect_name)
- wyx