SQLAlchemy是否有Django中的get_or_create等效功能?

216

我想从数据库中获取一个对象,如果它已经存在(基于提供的参数),则获取它,否则创建它。

Django 的 get_or_create(或者源代码)可以实现这一功能。那么在 SQLAlchemy 中有没有等效的快捷方式呢?

目前,我的实现方式是这样的:

def get_or_create_instrument(session, serial_number):
    instrument = session.query(Instrument).filter_by(serial_number=serial_number).first()
    if instrument:
        return instrument
    else:
        instrument = Instrument(serial_number)
        session.add(instrument)
        return instrument

9
对于那些只想在对象不存在时添加对象的人,请查看 session.merge:https://dev59.com/E2ct5IYBdhLWcg3wKqa9#12298306 - Anton Tarasenko
11个回答

148

参考@WoLpH的解决方案,以下是对我有效的代码(简化版):

def get_or_create(session, model, **kwargs):
    instance = session.query(model).filter_by(**kwargs).first()
    if instance:
        return instance
    else:
        instance = model(**kwargs)
        session.add(instance)
        session.commit()
        return instance

有了这个功能,我可以获取或创建我的模型的任何对象。

假设我的模型对象是:

class Country(Base):
    __tablename__ = 'countries'
    id = Column(Integer, primary_key=True)
    name = Column(String, unique=True)

我想获取或创建我的对象,我写:

myCountry = get_or_create(session, Country, name=countryName)

4
如果你和我一样在搜索,以下是创建一行数据的正确解决方案,以防该行不存在。 - Spencer Rathbun
4
你不需要将新实例添加到会话中吗?否则,如果在调用代码中发出session.commit()命令,由于新实例未添加到会话中,因此不会发生任何事情。 - CadentOrange
1
谢谢您。我发现这非常有用,所以我为将来的使用创建了一个代码片段。https://gist.github.com/jangeador/e7221fc3b5ebeeac9a08 - jangeador
18
考虑到您将会把session作为参数传递,最好避免使用commit(或至少只使用flush)。这样可以将Session控制权交给方法调用者,避免过早的提交。另外,使用one_or_none()代替first()可能会更加安全一些。 - exhuma
我同意@exhuma的观点 - 在函数中提交可能会导致意外的冲突和/或性能问题。 - Yaakov Bressler

144

我知道的方法就是这样,没有捷径可走。

当然,你可以将其概括:

def get_or_create(session, model, defaults=None, **kwargs):
    instance = session.query(model).filter_by(**kwargs).one_or_none()
    if instance:
        return instance, False
    else:
        params = {k: v for k, v in kwargs.items() if not isinstance(v, ClauseElement)}
        params.update(defaults or {})
        instance = model(**params)
        try:
            session.add(instance)
            session.commit()
        except Exception:  # The actual exception depends on the specific database so we catch all exceptions. This is similar to the official documentation: https://docs.sqlalchemy.org/en/latest/orm/session_transaction.html
            session.rollback()
            instance = session.query(model).filter_by(**kwargs).one()
            return instance, False
        else:
            return instance, True

2020更新(仅适用于Python 3.9+)

这是一个更简洁的版本,使用了Python 3.9的新字典合并运算符(|=)

def get_or_create(session, model, defaults=None, **kwargs):
    instance = session.query(model).filter_by(**kwargs).one_or_none()
    if instance:
        return instance, False
    else:
        kwargs |= defaults or {}
        instance = model(**kwargs)
        try:
            session.add(instance)
            session.commit()
        except Exception:  # The actual exception depends on the specific database so we catch all exceptions. This is similar to the official documentation: https://docs.sqlalchemy.org/en/latest/orm/session_transaction.html
            session.rollback()
            instance = session.query(model).filter_by(**kwargs).one()
            return instance, False
        else:
            return instance, True

注意:

与 Django 版本类似,这将捕获重复键约束和类似错误。如果您的 get 或 create 不能保证返回单个结果,则仍可能导致竞争条件。

为了缓解部分问题,您需要在 session.commit() 后添加另一个 one_or_none() 风格的获取操作。除非您还使用 with_for_update() 或可串行化事务模式,否则仍无法完全保证不会出现竞争条件。


2
我认为在您阅读“session.Query(model.filter_by(**kwargs).first()”时,您应该阅读“session.Query(model.filter_by(**kwargs)).first()”。 - pkoch
3
需要在此处加锁,以防止其他线程在该线程有机会创建实例之前创建实例。 - EoghanM
2
@EoghanM: 通常您的会话将是线程局部的,所以这没有关系。SQLAlchemy会话并不意味着是线程安全的。 - Wolph
6
可能是另一个进程同时尝试创建相同的记录。看看Django对get_or_create的实现。它会检查完整性错误,并依赖于唯一约束的正确使用。 - Ivan Virabyan
1
@IvanVirabyan:我认为@EoghanM是在谈论会话实例。在这种情况下,应该在session.add块周围加上try...except IntegrityError: instance = session.Query(...) - Wolph
显示剩余18条评论

60

我一直在解决这个问题,最终得出了一个相当强大的解决方案:

def get_one_or_create(session,
                      model,
                      create_method='',
                      create_method_kwargs=None,
                      **kwargs):
    try:
        return session.query(model).filter_by(**kwargs).one(), False
    except NoResultFound:
        kwargs.update(create_method_kwargs or {})
        created = getattr(model, create_method, model)(**kwargs)
        try:
            session.add(created)
            session.flush()
            return created, True
        except IntegrityError:
            session.rollback()
            return session.query(model).filter_by(**kwargs).one(), False

我刚刚写了一篇很详细的博客文章,但是以下是我使用它的原因:

  1. 它将解压为一个元组,告诉您对象是否存在。这通常在工作流程中非常有用。

  2. 该函数允许使用带有@classmethod修饰的创建函数(以及特定于它们的属性)。

  3. 当您有多个连接到数据存储的进程时,该解决方案可以防止竞争条件。

编辑:如此博客文章所述,我已将session.commit()更改为session.flush()。请注意,这些决策是特定于使用的数据存储(在本例中为Postgres)。

编辑2:我已更新函数中的默认值为{},因为这是Python中的常见错误。感谢Nigel的评论!如果您想了解此常见错误,请查看此StackOverflow问题此博客文章


2
与Spencer所说的相比,这个解决方案更好,因为它可以防止竞态条件(通过提交/刷新会话来注意),并完美地模拟了Django的操作。 - kiddouk
@kiddouk 不,它并不完美地模仿。Django的get_or_create不是线程安全的,也不是原子性的。此外,Django的get_or_create如果实例被创建则返回True标志,否则返回False标志。 - Kar
1
@Kate,如果你看一下Django的get_or_create函数,它几乎做了完全相同的事情。这个解决方案也返回一个True/False标志来表示对象是被创建还是被获取,并且也不是原子性的。然而,线程安全和原子更新是数据库的问题,而不是Django、Flask或SQLAlchemy的问题,在这个解决方案和Django的解决方案中,都通过数据库上的事务来解决。 - erik
1
假设为新记录提供了非空字段的空值,它将引发IntegrityError。整个事情变得混乱不堪,现在我们不知道实际发生了什么,并且我们会收到另一个错误,即找不到记录。 - rajat
2
这个IntegrityError的情况应该返回False,因为这个客户端没有创建这个对象,对吗? - kevmitch
显示剩余5条评论

14

erik的出色答案的修改版answer

def get_one_or_create(session,
                      model,
                      create_method='',
                      create_method_kwargs=None,
                      **kwargs):
    try:
        return session.query(model).filter_by(**kwargs).one(), True
    except NoResultFound:
        kwargs.update(create_method_kwargs or {})
        try:
            with session.begin_nested():
                created = getattr(model, create_method, model)(**kwargs)
                session.add(created)
            return created, False
        except IntegrityError:
            return session.query(model).filter_by(**kwargs).one(), True
  • 使用嵌套事务仅回滚新项目的添加,而不是全部回滚(请参阅此答案以在SQLite中使用嵌套事务)。
  • 移动create_method。如果创建的对象具有关系,并且通过这些关系分配成员,则会自动将其添加到会话中。例如,创建一个具有user_iduser作为相应关系的book,然后在create_method中执行book.user=<user object>将会把book添加到会话中。这意味着create_method必须在with语句内部才能从可能的回滚中受益。请注意,begin_nested会自动触发刷新。

请注意,如果使用MySQL,则事务隔离级别必须设置为READ COMMITTED而不是REPEATABLE READ,否则此方法将无法正常工作。Django的get_or_create(以及此处)使用相同的策略,另请参阅Django文档


我喜欢这种方法避免了回滚不相关的更改,但是如果会话在同一事务中之前查询过该模型,则使用MySQL默认隔离级别REPEATABLE READ时,IntegrityError重新查询仍可能失败并出现NoResultFound。我能想到的最好的解决方案是在此查询之前调用session.commit(),但这也不理想,因为用户可能不希望这样做。参考答案没有这个问题,因为session.rollback()具有启动新事务的相同效果。 - kevmitch
哦,今天我学到了。把查询放在嵌套事务中会起作用吗?你说得对,在这个函数内部使用commit可能比使用rollback更糟糕,尽管对于特定的用例来说,它是可以接受的。 - Adversus
是的,在嵌套事务中放置初始查询至少使第二个查询有可能工作。但如果用户在同一事务中明确查询了模型,则仍将失败。我已经决定这是可以接受的,用户应该被警告不要这样做,或者捕获异常并决定是否自己commit()。如果我对代码的理解是正确的,这就是Django所做的。 - kevmitch
在django的文档中,他们建议使用READ COMMITTED,因此看起来他们并没有尝试处理这个问题。查看源代码证实了这一点。我不确定我是否理解了你的回复,你的意思是用户应该将他/她的查询放在嵌套事务中吗?我不清楚SAVEPOINT如何影响使用REPEATABLE READ进行读取。如果没有影响,那么情况似乎无法挽救,如果有影响,那么最后一个查询可能会被嵌套? - Adversus
关于“READ COMMITED”的内容很有趣,也许我应该重新考虑不去触碰数据库默认设置的决定。我已经测试过,在REPEATABLE READ中,从查询之前恢复一个SAVEPOINT会使得这个查询好像从未发生过一样。因此,我发现有必要在嵌套事务中将查询语句放在try子句中,以便在IntegrityError异常子句中的查询能够正常工作。 - kevmitch

8

这个SQLAlchemy的技巧非常好用和优雅。

首先要做的是定义一个函数,该函数将给定一个Session来使用,并将一个字典与Session()关联起来,以跟踪当前唯一键。

def _unique(session, cls, hashfunc, queryfunc, constructor, arg, kw):
    cache = getattr(session, '_unique_cache', None)
    if cache is None:
        session._unique_cache = cache = {}

    key = (cls, hashfunc(*arg, **kw))
    if key in cache:
        return cache[key]
    else:
        with session.no_autoflush:
            q = session.query(cls)
            q = queryfunc(q, *arg, **kw)
            obj = q.first()
            if not obj:
                obj = constructor(*arg, **kw)
                session.add(obj)
        cache[key] = obj
        return obj

利用此函数的一个示例是在混合中使用:
class UniqueMixin(object):
    @classmethod
    def unique_hash(cls, *arg, **kw):
        raise NotImplementedError()

    @classmethod
    def unique_filter(cls, query, *arg, **kw):
        raise NotImplementedError()

    @classmethod
    def as_unique(cls, session, *arg, **kw):
        return _unique(
                    session,
                    cls,
                    cls.unique_hash,
                    cls.unique_filter,
                    cls,
                    arg, kw
            )

最后创建唯一的get_or_create模型:

from sqlalchemy import Column, Integer, String, create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()

engine = create_engine('sqlite://', echo=True)

Session = sessionmaker(bind=engine)

class Widget(UniqueMixin, Base):
    __tablename__ = 'widget'

    id = Column(Integer, primary_key=True)
    name = Column(String, unique=True, nullable=False)

    @classmethod
    def unique_hash(cls, name):
        return name

    @classmethod
    def unique_filter(cls, query, name):
        return query.filter(Widget.name == name)

Base.metadata.create_all(engine)

session = Session()

w1, w2, w3 = Widget.as_unique(session, name='w1'), \
                Widget.as_unique(session, name='w2'), \
                Widget.as_unique(session, name='w3')
w1b = Widget.as_unique(session, name='w1')

assert w1 is w1b
assert w2 is not w3
assert w2 is not w1

session.commit()

这个方法更深入地探讨了这个想法,并提供了不同的方法,但我使用这种方法取得了巨大的成功。

3
如果只有一个SQLAlchemy Session对象可以修改数据库,我喜欢这个配方。我可能错了,但如果其他会话(无论是SQLAlchemy还是其他)同时修改数据库,我不认为它能保护免受由其他会话创建的对象在事务进行时可能出现的影响。在这种情况下,我认为依赖于session.add()后刷新并处理异常的解决方案,例如https://dev59.com/t3E85IYBdhLWcg3w64EA#21146492更可靠。 - TrilceAC

4

从语义上来说,最接近的可能是:

def get_or_create(model, **kwargs):
    """SqlAlchemy implementation of Django's get_or_create.
    """
    session = Session()
    instance = session.query(model).filter_by(**kwargs).first()
    if instance:
        return instance, False
    else:
        instance = model(**kwargs)
        session.add(instance)
        session.commit()
        return instance, True

不确定在sqlalchemy中依赖全局定义的Session是否规范,但Django版本没有使用连接,因此...

返回的元组包含实例和一个布尔值,指示该实例是否已创建(即如果我们从数据库中读取实例,则为False)。

Django的get_or_create通常用于确保全局数据可用,因此我尽可能早地提交。


只要使用 scoped_session 创建并跟踪会话,这应该是可行的,它应该实现线程安全的会话管理(这在2014年是否存在?)。 - cowbert

3

我稍微简化了@Kevin的解决方案,以避免将整个函数包装在if/else语句中。这样只有一个return,我认为更加清晰:

def get_or_create(session, model, **kwargs):
    instance = session.query(model).filter_by(**kwargs).first()

    if not instance:
        instance = model(**kwargs)
        session.add(instance)

    return instance

3

2

根据您采用的隔离级别,上述解决方案都可能无法奏效。我找到的最佳解决方案是以下形式的原始 SQL:

INSERT INTO table(f1, f2, unique_f3) 
SELECT 'v1', 'v2', 'v3' 
WHERE NOT EXISTS (SELECT 1 FROM table WHERE f3 = 'v3')

无论隔离级别和并行度如何,这都是具有事务安全性的。
注意:为了使其高效,最好为唯一列创建索引。

这创建了但没有获取。 - Andrew
“WHERE unique_f3” 不是 “WHERE f3”,对吗? - Andrew
是的,对不起打错字了。 - fcracker79
这个问题是否和以下链接所讨论的并发问题相同:https://dev59.com/j1IG5IYBdhLWcg3wkR1z#64070103 和 https://dev59.com/O2865IYBdhLWcg3wfeqU? - Andrew

2

我经常遇到的一个问题是,当一个字段有最大长度限制(比如说STRING(40)),而你想使用长度较大的字符串执行get or create时,上述方案会失败。

基于上述方案,这是我的解决方法:

from sqlalchemy import Column, String

def get_or_create(self, add=True, flush=True, commit=False, **kwargs):
    """

    Get the an entity based on the kwargs or create an entity with those kwargs.

    Params:
        add: (default True) should the instance be added to the session?
        flush: (default True) flush the instance to the session?
        commit: (default False) commit the session?
        kwargs: key, value pairs of parameters to lookup/create.

    Ex: SocialPlatform.get_or_create(**{'name':'facebook'})
        returns --> existing record or, will create a new record

    ---------

    NOTE: I like to add this as a classmethod in the base class of my tables, so that
    all data models inherit the base class --> functionality is transmitted across
    all orm defined models.

    """


    # Truncate values if necessary
    for key, value in kwargs.items():

        # Only use strings
        if not isinstance(value, str):
            continue

        # Only use if it's a column
        my_col = getattr(self.__table__.columns, key)

        if not isinstance(my_col, Column):
            continue

        # Skip non strings again here
        if not isinstance(my_col.type, String):
            continue

        # Get the max length
        max_len = my_col.type.length

        if value and max_len and len(value) > max_len:

            # Update the value
            value = value[:max_len]
            kwargs[key] = value

    # -------------------------------------------------

    # Make the query...
    instance = session.query(self).filter_by(**kwargs).first()

    if instance:
        return instance

    else:
        # Max length isn't accounted for here.
        # The assumption is that auto-truncation will happen on the child-model
        # Or directtly in the db
        instance = self(**kwargs)

    # You'll usually want to add to the session
    if add:
        session.add(instance)

    # Navigate these with caution
    if add and commit:
        try:
            session.commit()
        except IntegrityError:
            session.rollback()

    elif add and flush:
        session.flush()


    return instance

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接