在SQLAlchemy(声明式风格)中处理插入时的重复主键

47

我的应用程序正在使用SQLAlchemy的作用域会话和声明性样式。这是一个Web应用程序,许多数据库插入操作是由任务调度程序Celery执行的。

通常,在决定插入对象时,我的代码可能会执行以下操作:

from schema import Session
from schema.models import Bike

pk = 123 # primary key
bike = Session.query(Bike).filter_by(bike_id=pk).first()
if not bike: # no bike in DB
    new_bike = Bike(pk, "shiny", "bike")
    Session.add(new_bike)
    Session.commit()

这里的问题是,由于许多操作都是通过异步工作者完成的,因此可能会出现一个工作人员正在插入带有id=123的,而另一个工作人员正在检查其是否存在的情况。在这种情况下,第二个工作者将尝试插入具有相同主键的行,而SQLAlchemy将引发IntegrityError。

我无法找到一种不错的解决方法,除了替换Session.commit()

'''schema/__init__.py'''
from sqlalchemy.orm import scoped_session, sessionmaker
Session = scoped_session(sessionmaker())

def commit(ignore=False):
    try:
        Session.commit()
    except IntegrityError as e:
        reason = e.message
        logger.warning(reason)

        if not ignore:
            raise e

        if "Duplicate entry" in reason:
            logger.info("%s already in table." % e.params[0])
            Session.rollback()

现在,我已经将所有的Session.commit替换为schema.commit(ignore=True),这样即使行没有再次插入也没关系。

对我来说,这似乎非常脆弱,因为涉及到字符串比较。顺便提一下,当出现IntegrityError时,它看起来像这样:

(IntegrityError) (1062, "Duplicate entry '123' for key 'PRIMARY'")

如果我插入的主键是像 Duplicate entry is a cool thing 这样的字符串,那么我可能会错过一些实际上不是因为重复主键而引发的IntegrityError

有更好的方法吗?要保持我正在使用的干净的SQLAlchemy方法(而不是开始编写字符串语句等)。

数据库是MySQL(尽管在单元测试中,我喜欢使用SQLite,并且不想用任何新方法来阻碍这种能力)。

谢谢!


3
为生成主键,为什么不考虑使用自动递增呢?这样就不用担心这个问题了。 或者有特定的原因不这样做吗? - mata
有一个特定的原因(抱歉,这个例子有点琐碎)。 - Edwardr
7个回答

40
如果您使用session.merge(bike)而不是session.add(bike),则不会生成主键错误。 bike将根据需要检索、更新或创建。

9
如果你使用合并操作,但同时在不同的会话中执行两次合并操作,仍然可能会出现完整性错误。 - Sjoerd
1
这个答案在会话适合内存的情况下是不错的,但对于更大的查询就不那么好了。因此,如果您想添加的数据超出了内存限制,您不能只创建一堆会话然后将它们合并,对吧? - elplatt

10

你应该以相同的方式处理每个IntegrityError: 回滚事务,并可选择重新尝试。在发生IntegrityError后,有些数据库甚至不允许你执行更多操作。如果数据库允许,可以在两个冲突事务开始时对表或更细粒度的锁进行加锁。

使用with语句显式地开始一个事务,并在任何异常情况下自动提交(或回滚):

from schema import Session
from schema.models import Bike

session = Session()
with session.begin():
    pk = 123 # primary key
    bike = session.query(Bike).filter_by(bike_id=pk).first()
    if not bike: # no bike in DB
        new_bike = Bike(pk, "shiny", "bike")
        session.add(new_bike)

嗨。我并不是故意同时安排插入和检查的。问题在于该对象以临时方式由两个独立的进程创建。这并没有什么不妥之处,这就是应用程序的方式(实际上,这些对象不是自行车,而是时间)。但是,您关于运行单个工作程序的想法是正确的。我正在研究如何指定单个工作程序来管理所有与数据库相关的任务,这将提供我所需的同步性。从应用程序中进行插入不是一个选项。数据库位于远程机器上,我需要低于100毫秒的Web应用程序响应时间。 - Edwardr
设计几乎总是导致这些SQL问题。例如,您确定不能使数据库的主键自动递增并处理偶尔出现的“先前主键列的两行”结果吗? - joeforker
抱歉,我应该补充说明,为什么PK不是自动递增的有很好的理由。我只是不确定我是否同意。数据库由许多其他应用程序共享,包括使用相关表格。如果在你进行了一些尽职调查后,可能会有另一个进程/应用程序/人员插入一行,这为什么是不良设计呢?关键是你必须在你的应用程序中处理它。我的问题只是我能看到的唯一处理方式是通过字符串检查在SQLAlchemy中处理它,而且似乎并不特别健壮。 - Edwardr
哦,你只想处理完整性错误。如果出现任何完整性错误,你可能别无选择,只能回滚操作,查看数据库文档。你还期望其他类型的完整性错误吗?尝试在添加对象后使用session.flush来更早地获取错误。我相信错误的数字ID也作为单独的属性存在其中。 - joeforker
3
在上述代码中,你可能希望将 .first() 更改为 .one(),因为它应该是一个唯一字段。不管怎样,这可能不重要,因为答案中的代码引入了竞态条件。在检查记录是否存在和添加记录之间,另一个工作进程可能已经添加了记录。检查 IntegrityError 并在必要时回滚更安全。 - JosefAssad

4

你需要使用下面提到的代码而不是使用 session.add(obj),这样会更加干净,而且你也不需要像提到的那样使用自定义提交函数。这将忽略冲突,但不仅限于重复键。

mysql:

 self.session.execute(insert(self.table, values=values, prefixes=['IGNORE']))

sqlite

self.session.execute(insert(self.table, values=values, prefixes=['OR IGNORE']))

3

我假设你的主键在某种程度上是自然的,这就是为什么你不能依赖于常规的自动增量技术。那么假设问题实际上是需要插入一些唯一列的问题,这更为常见。

如果你想要“尝试插入,部分回滚失败”的功能,你可以使用SAVEPOINT,使用SQLAlchemy时可以使用begin_nested()。接下来的rollback()或commit()仅作用于该SAVEPOINT,而不是正在进行的其他事情。

但是,总的来说,这种模式真的应该避免使用。你真正想要做的有三件事:1. 不要运行涉及需要插入的相同键的并发作业。2. 在处理并发键时以某种方式同步作业。3. 使用某些公共服务来生成此特定类型的新记录,由作业共享(或确保它们在作业运行之前全部设置好)。

如果你考虑一下,#2在任何高度隔离的情况下都会发生。打开两个postgresql会话。会话1:

test=> create table foo(id integer primary key);
NOTICE:  CREATE TABLE / PRIMARY KEY will create implicit index "foo_pkey" for table "foo"
CREATE TABLE
test=> begin;
BEGIN
test=> insert into foo (id) values (1);

会话2:

test=> begin;
BEGIN
test=> insert into foo(id) values(1);

您将看到的是,当具有主键1的行被锁定时,会出现第2个会话块。我不确定MySQL是否足够智能以执行此操作,但这是正确的行为。另一方面,如果您尝试插入不同的主键:
^CCancel request sent
ERROR:  canceling statement due to user request
test=> rollback;
ROLLBACK
test=> begin;
BEGIN
test=> insert into foo(id) values(2);
INSERT 0 1
test=> \q

这种情况下,即使不阻塞,IT技术仍然可以顺利进行。

关键是,如果您正在处理PK/UQ争用的情况,您的celery任务无论如何都会进行序列化,或者至少应该这样做。


1
使用以下代码,您应该能够做任何您想要的事情,不仅限于解决此问题。
class SessionWrapper(Session):
    def commit(self, ignore=True):
        try:
            super(SessionWrapper, self).commit()
        except IntegrityError as e:
            if not ignore:
                raise e
            message = e.args[0]
            if "Duplicate entry" in message:
                logging.info("Error while executing %s.\n%s.", e.statement, message)
        finally:
            super(SessionWrapper, self).close()


def session(self, auto_commit=False):
    session_factory = sessionmaker(class_=SessionWrapper, bind=self.engine, autocommit=auto_commit)
    return scoped_session(session_factory)

Session = session()
s1 = Session()

p = Test(test="xxx", id=1)
s1.add(p)
s1.commit()
s1.close()

0

只需逐个回滚并重试它们,就像那样简单:

try:
    self._session.bulk_insert_mappings(mapper, items)
    self._session.commit()
except IntegrityError:
    self._session.rollback()
    logger.info("bulk inserting rows failed, fallback to insert one-by-one")
    for item in items:
        try:
            self._session.execute(insert(mapper).values(**item))
            self._session.commit()
        except SQLAlchemyError as e:
            logger.error("Error inserting item: %s for %s", item, e)

0

在我的情况下,我在Python中创建了两个单独的字典,并将它们附加到一个列表中,以便稍后进行批量插入。我使用了一个名为item_base的字典,然后向其添加了一些字段,然后将新的字典添加到我的批量插入列表中。

我没有考虑到的是,在Python中,字典是可变对象,当您使用赋值clone = item_base或clone_2 = item_base时,实际上是在内存中创建对同一个字典的引用。您对clone或clone_2所做的任何修改也会影响item_base,因为它们都指向同一个基础字典。

通过使用import copy Python包并添加copy.copy(item_base)来设置我的克隆字典,我能够解决重复主键问题。希望这能帮助其他人避免我经历的头痛。

    def get_charges(self, my_set):
        charges = []
        start_date_str, end_date_str = self.get_start_end_date_str()
        
        for (id, products) in my_set.items():
            item_base = {}
            line_item = self.get_line_item(end_date_str)
                
            if line_item:
                item_base = dict(line_item.__dict__)
            else:
                item_base['start_date'] = start_date_str
                item_base['end_date'] =  end_date_str


            if products['987']:
                clone = copy.copy(item_base)
                clone[ 'amount'         ] = products['987']
                clone[ 'created_date'   ] = datetime.now()
                clone[ 'quantity'       ] = 1
                clone[ 'unit_price'     ] = products['987']
                charges.append( clone )

            if products['123']:
                clone = copy.copy(item_base)
                clone[ 'amount'         ] = products['123']
                clone[ 'created_date'   ] = datetime.now()
                clone[ 'quantity'       ] = 1
                clone[ 'unit_price'     ] = products['123']
                charges.append(clone)
                
        return charges

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接