我的应用程序正在使用SQLAlchemy的作用域会话和声明性样式。这是一个Web应用程序,许多数据库插入操作是由任务调度程序Celery执行的。
通常,在决定插入对象时,我的代码可能会执行以下操作:
from schema import Session
from schema.models import Bike
pk = 123 # primary key
bike = Session.query(Bike).filter_by(bike_id=pk).first()
if not bike: # no bike in DB
new_bike = Bike(pk, "shiny", "bike")
Session.add(new_bike)
Session.commit()
这里的问题是,由于许多操作都是通过异步工作者完成的,因此可能会出现一个工作人员正在插入带有id=123的,而另一个工作人员正在检查其是否存在的情况。在这种情况下,第二个工作者将尝试插入具有相同主键的行,而SQLAlchemy将引发IntegrityError。
我无法找到一种不错的解决方法,除了替换Session.commit()
:
'''schema/__init__.py'''
from sqlalchemy.orm import scoped_session, sessionmaker
Session = scoped_session(sessionmaker())
def commit(ignore=False):
try:
Session.commit()
except IntegrityError as e:
reason = e.message
logger.warning(reason)
if not ignore:
raise e
if "Duplicate entry" in reason:
logger.info("%s already in table." % e.params[0])
Session.rollback()
现在,我已经将所有的Session.commit
替换为schema.commit(ignore=True)
,这样即使行没有再次插入也没关系。
对我来说,这似乎非常脆弱,因为涉及到字符串比较。顺便提一下,当出现IntegrityError
时,它看起来像这样:
(IntegrityError) (1062, "Duplicate entry '123' for key 'PRIMARY'")
如果我插入的主键是像 Duplicate entry is a cool thing
这样的字符串,那么我可能会错过一些实际上不是因为重复主键而引发的IntegrityError
。
有更好的方法吗?要保持我正在使用的干净的SQLAlchemy方法(而不是开始编写字符串语句等)。
数据库是MySQL(尽管在单元测试中,我喜欢使用SQLite,并且不想用任何新方法来阻碍这种能力)。
谢谢!