Django: 如何在事务中包装批量更新/插入操作?

5
这是我的用例:
我有多个并行运行的celery任务。
每个任务可以批量创建或更新许多对象。为此,我使用django-bulk
因此,我基本上使用一个非常方便的函数insert_or_update_many
1.它首先执行选择操作。 2.如果找到对象,则更新它们。 3.否则,它会创建它们。
但这引入了并发问题。例如:如果对象在步骤1期间不存在,则将其添加到稍后要插入的对象列表中。但在此期间可能会发生另一个Celery任务已经创建了该对象,并且当尝试执行批量插入(步骤3)时,我会收到重复输入错误的错误消息。
我想我需要将这三个步骤包装在一个“阻塞”块中。 我已经阅读了有关事务的文章,并尝试在with transaction.commit_on_success:块内包装步骤1、2、3。
with transaction.commit_on_success():
    cursor.execute(sql, parameters)
    existing = set(cursor.fetchall())
    if not skip_update:
        # Find the objects that need to be updated
        update_objects = [o for (o, k) in object_keys if k in existing]
        _update_many(model, update_objects, keys=keys, using=using)
    # Find the objects that need to be inserted.
    insert_objects = [o for (o, k) in object_keys if k not in existing]
    # Filter out any duplicates in the insertion
    filtered_objects = _filter_objects(con, insert_objects, key_fields)
    _insert_many(model, filtered_objects, using=using)

但这对我不起作用。我不确定自己是否完全理解了事务。基本上,我需要一个块,在其中可以放置多个操作,并确保没有其他进程或线程正在(写)访问我的数据库资源。
1个回答

10
我需要一个块,可以放置多个操作,确保没有其他进程或线程正在访问(写)我的数据库资源。
Django事务通常不会为您保证这一点。如果您来自计算机科学的其他领域,您自然会认为事务是以这种方式阻塞的,但在数据库世界中,有不同类型的锁,在不同的隔离级别下,每个数据库都有所不同。因此,要确保您的事务执行此操作,您需要了解事务、锁及其性能特征,以及您的数据库提供的控制机制。
然而,让一堆进程都尝试锁定表以进行竞争插入似乎不是一个好主意。如果冲突很少,您可以采用一种乐观锁定形式,如果事务失败,则重试。或者,您可以将所有这些celery任务定向到单个进程(如果您将获取表锁,那么并行化没有性能优势)。
我的建议是先忘掉批量操作,而是使用Django的update_or_create逐行处理。只要您的数据库有防止重复条目的约束(听起来确实是这样),这应该不会出现您上面描述的竞争条件。如果性能确实无法接受,则可以考虑更复杂的选项。
采用乐观并发方法意味着,与其通过获取表锁等方式来防止冲突,不如像正常操作一样进行,如果出现问题,则重试操作。在您的情况下,它可能看起来像这样:
while True:
    try:
        with transaction.atomic():
            # do your bulk insert / update operation
    except IntegrityError:
        pass
    else:
        break

如果遇到竞争条件,会导致IntegrityError错误,transaction.atomic()块将回滚已经进行的任何更改,while循环将强制重试事务(在这里,批量操作现在将查看新存在的行,并标记其进行更新而不是插入)。如果冲突很少,这种方法可以非常有效,但如果冲突频繁,则效果会非常差。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接