Django批处理/批量更新或创建?

47

我在数据库中有需要定期更新的数据。数据来源会返回当前可用的所有内容,因此会包括数据库中没有的新数据。

当我循环遍历源数据时,如果可能的话,我不想进行数千次单独的写操作。

是否有像update_or_create这样可以批量处理的方法?

一种想法是将update_or_create与手动事务结合使用,但我不确定它是否只是排队执行单独的写操作,还是会将其组合成一个SQL插入语句?

或者同样地,将带有循环内update_or_create的函数放在@commit_on_success()中,可以吗?

在循环过程中,我对数据除了翻译并保存到模型之外,没有进行任何其他操作。在循环过程中,不存在依赖于该模型的情况。


1
我认为大多数SQL服务器中没有单一的更新或创建查询。在postgres 9.5中有一个,但是Django不支持它。事务不会产生“单个”查询。它只是确保如果一个查询失败,则所有查询都将失败。实际上,它会减慢所有查询的速度。 - imposeren
1
更新:我之前对事务的理解是错误的。对于所有操作使用单个事务将加速您的写入速度。至少对于Postgres和SQLite来说是这样的:https://github.com/coderholic/django-cities/pull/85#issuecomment-125177370 - imposeren
6个回答

35
从Django 4.1开始,bulk_create方法通过update_conflicts支持upserts,这是update_or_create的批量等效单查询方式。
class Foo(models.Model):
    a = models.IntegerField(unique=True)
    b = models.IntegerField()

objects = [Foo(1, 1), Foo(1, 2)]

Foo.objects.bulk_create(
    objects, 
    update_conflicts=True,
    unique_fields=['a'],
    update_fields=['b'],
)

12

自从Django添加了bulk_update支持,现在这是有点可能的,尽管您需要为每个批次执行3个数据库调用(获取、批量创建和批量更新)。这里挑战在于如何为通用目的的函数创建一个良好的接口,因为您希望该函数既支持高效查询又支持更新。下面是我实现的一种方法,它专为具有许多公共标识键(可以为空)和一个在批次中变化的标识键的bulk update_or_create而设计。

这是在基本模型上实现的方法,但可以独立使用。这还假定基本模型在模型名为updated_on的auto_now时间戳;如果不是这种情况,则已对假定此处进行代码行的注释以便于修改。

为了将其分批使用,请在调用之前将更新分块。这也是绕过数据的一种方式,其中辅助标识符可以具有少量值之一,而无需更改接口。

class BaseModel(models.Model):
    updated_on = models.DateTimeField(auto_now=True)
    
    @classmethod
    def bulk_update_or_create(cls, common_keys, unique_key_name, unique_key_to_defaults):
        """
        common_keys: {field_name: field_value}
        unique_key_name: field_name
        unique_key_to_defaults: {field_value: {field_name: field_value}}
        
        ex. Event.bulk_update_or_create(
            {"organization": organization}, "external_id", {1234: {"started": True}}
        )
        """
        with transaction.atomic():
            filter_kwargs = dict(common_keys)
            filter_kwargs[f"{unique_key_name}__in"] = unique_key_to_defaults.keys()
            existing_objs = {
                getattr(obj, unique_key_name): obj
                for obj in cls.objects.filter(**filter_kwargs).select_for_update()
            }
            
            create_data = {
                k: v for k, v in unique_key_to_defaults.items() if k not in existing_objs
            }
            for unique_key_value, obj in create_data.items():
                obj[unique_key_name] = unique_key_value
                obj.update(common_keys)
            creates = [cls(**obj_data) for obj_data in create_data.values()]
            if creates:
                cls.objects.bulk_create(creates)

            # This set should contain the name of the `auto_now` field of the model
            update_fields = {"updated_on"}
            updates = []
            for key, obj in existing_objs.items():
                obj.update(unique_key_to_defaults[key], save=False)
                update_fields.update(unique_key_to_defaults[key].keys())
                updates.append(obj)
            if existing_objs:
                cls.objects.bulk_update(updates, update_fields)
        return len(creates), len(updates)

    def update(self, update_dict=None, save=True, **kwargs):
        """ Helper method to update objects """
        if not update_dict:
            update_dict = kwargs
        # This set should contain the name of the `auto_now` field of the model
        update_fields = {"updated_on"}
        for k, v in update_dict.items():
            setattr(self, k, v)
            update_fields.add(k)
        if save:
            self.save(update_fields=update_fields)

使用示例:

class Event(BaseModel):
    organization = models.ForeignKey(Organization)
    external_id = models.IntegerField(unique=True)
    started = models.BooleanField()


organization = Organization.objects.get(...)
updates_by_external_id = {
    1234: {"started": True},
    2345: {"started": True},
    3456: {"started": False},
}
Event.bulk_update_or_create(
    {"organization": organization}, "external_id", updates_by_external_id
)

可能出现的竞争条件

上述代码利用了事务和select-for-update来防止更新时的竞争条件。但是,如果两个线程或进程试图创建具有相同标识符的对象,则可能存在插入时的竞争条件。

简单的解决方法是确保您的common_keys和unique_key的组合是一个由数据库强制执行的唯一性约束(这是此函数的预期使用方式)。这可以通过unique_key引用具有unique=True字段的字段,或者将unique_key与子集common_keys结合起来,并通过UniqueConstraint强制实施唯一性来实现。通过数据库强制实施唯一性保护,如果多个线程尝试执行冲突创建,则除了一个以外的所有线程都将失败,并显示IntegrityError。由于封闭事务,失败的线程不会执行任何更改,可以安全地重试或忽略它们(一个失败的冲突创建可能只被视为第一个创建,然后立即被覆盖)。

如果无法利用唯一性约束,则需要实现自己的并发控件或锁定整个表


非常好!我做了类似的事情,但设计得不像你的那么好。不过我遇到了一个竞态条件的问题。如果您部署了两个此应用程序的实例,并且每个实例都使用相同的数据进行调用,则第一个可能会在第二个执行其SELECT之后进行INSERT,导致任何唯一字段上的完整性错误。最终我锁定了表以防止这种情况发生。 - c6754
1
@c6754 这个竞态条件是为什么这个实现在事务内并且对所有数据使用 select_for_update 的原因。 - Zags
@Zags 也许我的评论是针对PostgreSQL的。但是在PostgreSQL中,select_for_update仅锁定已经存在的行,而不是整个表。因此,您不会像我上面描述的那样被阻止创建相同的行两次。 - c6754
1
@c6754 或许这是用例的不同。当我使用它时,通常会选择数据库唯一字段(例如表的主键),因此其中一个更新将通过违反唯一性约束而失败。你的情况不是这样吗? - Zags
@Zags 问题不在于更新,而在于插入。当有两个应用程序实例运行时,如果两者几乎同时收到相同的请求,则最快的一个有机会在较慢的一个执行现有对象的SELECT之后但较慢的INSERT之前插入新行。因此,较慢的批量创建将由于重复的PK引发IntegrityError,但此处未处理,因此可能会导致500错误。我想可以通过使用“ignore_conflicts”来防止这种情况,但是这样可能会错过更新。 - c6754

3
批处理更新将成为一个upsert命令,就像@imposeren所说,Postgres 9.5提供了这个功能。我认为Mysql 5.7也有这个功能(请参阅http://dev.mysql.com/doc/refman/5.7/en/insert-on-duplicate.html),具体取决于您的需求。尽管如此,最简单的方法可能是使用数据库游标。没有问题,当ORM不足以满足要求时,可以使用它。以下伪代码应该可以工作,因此不要只是复制粘贴,但是概念在这里。
class GroupByChunk(object):
    def __init__(self, size):
        self.count = 0
        self.size = size
        self.toggle = False

    def __call__(self, *args, **kwargs):
        if self.count >= self.size:  # Allows for size 0
            self.toggle = not self.toggle
            self.count = 0
        self.count += 1
        return self.toggle

def batch_update(db_results, upsert_sql):
    with transaction.atomic():
        cursor = connection.cursor()   
        for chunk in itertools.groupby(db_results, GroupByChunk(size=1000)):
            cursor.execute_many(upsert_sql, chunk)

这里的假设是:

  • db_results 是某种结果迭代器,可以是列表或字典
  • db_results 中的结果可以直接用于原始 SQL exec 语句中
  • 如果任何批量更新失败,将回滚所有更新。如果您想将其移动到每个块中,请将 with 块向下推一点

2

0

我一直在使用@Zags的答案,我认为这是最好的解决方案。但是我想提醒他代码中的一个小问题。

        update_fields = {"updated_on"}
        updates = []
        for key, obj in existing_objs.items():
            obj.update(unique_key_to_defaults[key], save=False)
            update_fields.update(unique_key_to_defaults[key].keys())
            updates.append(obj)
        if existing_objs:
            cls.objects.bulk_update(updates, update_fields)

如果您正在使用auto_now=True字段,则在使用.update()或bulk_update()进行更新时,它们不会被更新,因为字段"auto_now"将随着.save()操作触发,您可以在文档中阅读到这一点。

如果您有一个auto_now字段F.e:updated_on,最好将其显式添加到unique_key_to_defaults字典中。

"unique_value" : {
        "field1.." : value...,
        "updated_on" : timezone.now()
    }...

0

如果您正在使用低于4版本的Django旧版,您可以应用我的解决方案;但是对于最新版本,您可以按照@LordElrond的建议进行操作。

模型:

from django.db import models

class Product(models.Model):
    RATINGS = (
        (1, '1 star'),
        (2, '2 stars'),
        (3, '3 stars'),
        (4, '4 stars'),
        (5, '5 stars'),
    )

    name = models.CharField(max_length=255, unique=True)
    rating = models.PositiveIntegerField(choices=RATINGS)

工具:

 class ModelUtils():
    def __init__(self, model, datasets, unique_column):
       self.model = model
       self.datasets = datasets
       self.unique_column = unique_column

    def update(self, dataset_ids):
       fields = list(self.datasets[0].keys())
       fields = [field for field in fields if field != 'id']
       existing_datasets = self.model.objects.filter(
           **{self.unique_column + '__in': dataset_ids}
       ).values(**fields)
       if existing_datasets:           
          self.model.objects.bulk_update(
             [self.model(**d) for d in existing_datasets], fields
          )
       existing_dataset_ids = [d[unique_column] for d in existing_datasets]
       return existing_dataset_ids


    def create(self, dataset_ids, existing_dataset_ids):
        new_dataset_ids = set(dataset_ids) - set(existing_dataset_ids)
        new_datasets = [d for d in self.datasets if d[self.unique_column] 
                        not in existing_dataset_ids]
        self.model.objects.bulk_create(
            [self.model(**d) for d in new_datasets]
        )


    def update_or_create(self):        
        dataset_ids = [d[self.unique_column] for d in self.datasets]
        existing_dataset_ids = self.update(dataset_ids)
        new_dataset_ids = self.create(dataset_ids, existing_dataset_ids)

你可以这样运行它:
datasets = [
    {'name': 'apple', 'rating': 1}, 
    {'name': 'orange', 'rating': 2},
    {'name': 'grapes', 'rating': 4},
    {'name': 'mango', 'rating': 3}
]

model_utils = ModelUtils(Product, datasets, 'name')
model_utils.update_or_create(Product, datasets, 'name')

在任何时候,总共会发生3个查询,而不是n个查询(例如,在循环中进行更新和创建时使用get_or_create)。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接