使用Django QuerySet以分块方式处理数据库的最佳方法是什么?

6
我正在对数据库中的所有行进行批处理操作。这涉及选择每个模型并对其进行某些操作。将其拆分为块并逐块执行是有意义的。
我目前正在使用Paginator,因为它很方便。这意味着我需要对值进行排序,以便可以按顺序分页。这确实会生成具有orderlimit子句的SQL语句,对于每个块,我认为Postgres可能会对整个表进行排序(尽管我不能声称了解内部情况)。我所知道的是,数据库的CPU占用率达到了大约50%,我认为这太高了,只是在执行select。
在RDMBS/CPU友好的方式下迭代整个表格的最佳方法是什么?
假设在批量操作期间数据库的内容没有更改。
2个回答

7

根据您的描述,您实际上并不关心所处理的行的排序顺序。如果您在表中有主键(我期望是这样的!),那么这种粗略的分区方法将会更加快速

SELECT * FROM tbl WHERE id BETWEEN 0    AND 1000;
SELECT * FROM tbl WHERE id BETWEEN 1001 AND 2000;
...

这对于任何偏移量都是一样的,对于任何大小的表格也几乎是一样的。 检索主键的最小值和最大值,并根据此进行分区:

SELECT min(id), max(id) from tbl; -- then divide in suitable chunks

相较于:
SELECT * FROM tbl ORDER BY id LIMIT 1000;
SELECT * FROM tbl ORDER BY id LIMIT 1000 OFFSET 1000;
...

通常情况下,这种方法速度较慢,因为所有行都需要排序,并且随着偏移量和表格大小的增加,性能会进一步降低。


这假设记录在没有 sort 子句的情况下以相同的顺序返回。这正确吗?此外,如果我在我的 Meta 类中有默认排序,我是否可以在查询中将其删除? - Joe
@Joe: 基本上你会得到相同的记录,但是未排序。如果您的ID空间中存在间隙,则每次调用返回的记录数量可能会少于预期。而使用 LIMIT / OFFSET 您可以获取一定数量的排序行(除表中最后一个调用)。我不知道如何处理“Meta”类,但对于 LIMIT / OFFSET,您 需要 对行进行排序。 - Erwin Brandstetter
Erwin,非常抱歉我没有正确阅读你的答案。你确定这样更快吗?between子句只有在ID已经排序或每次执行整个表扫描时才能正常工作,是这样吗? - Joe
@Joe:表中没有自然顺序。当然,如果行的物理顺序与索引匹配,这可能会加快操作速度,因为需要读取的块较少。您可能会对CLUSTER感兴趣。如果id被索引,那么我的查询将导致索引扫描,但仅当您读取的块是表的一小部分时才会如此。查询规划器决定哪个更快:索引扫描还是序列扫描。只需使用EXPLAIN ANALYZE进行测试,亲自体验一下即可。 - Erwin Brandstetter

5
以下代码实现了Erwin在上面的答案(使用BETWEEN)的Django QuerySet:
一个用于任意Django QuerySet的实用函数如下所示。它默认假定“id”是用于between子句的合适字段。
def chunked_queryset(qs, batch_size, index='id'):
    """
    Yields a queryset split into batches of maximum size 'batch_size'.
    Any ordering on the queryset is discarded.
    """
    qs = qs.order_by()  # clear ordering
    min_max = qs.aggregate(min=models.Min(index), max=models.Max(index))
    min_id, max_id = min_max['min'], min_max['max']
    for i in range(min_id, max_id + 1, batch_size):
        filter_args = {'{0}__range'.format(index): (i, i + batch_size - 1)}
        yield qs.filter(**filter_args)

使用方法如下:

for chunk in chunked_queryset(SomeModel.objects.all(), 20):
    # `chunk` is a queryset
    for item in chunk:
        # `item` is a SomeModel instance
        pass

您可以改变界面,这样您就不需要额外的嵌套循环,而是可以使用for item in chunked_queryset(qs)

def chunked_queryset(qs, batch_size, index='id'):
    """
    Yields a queryset that will be evaluated in batches
    """
    qs = qs.order_by()  # clear ordering
    min_max = qs.aggregate(min=models.Min(index), max=models.Max(index))
    min_id, max_id = min_max['min'], min_max['max']
    for i in range(min_id, max_id + 1, batch_size):
        filter_args = {'{0}__range'.format(index): (i, i + batch_size - 1)}
        for item in qs.filter(**filter_args):
            yield item

如果先前的某些例行程序删除了某些记录并且那些ID不存在,则此方法将无法正确工作。此代码严格基于所有最小和最大ID之间存在的假设。如果有任何ID缺失,它将给出不规则大小的块。 - MohitC
@MohitC 是的,正如文档字符串所述:“将查询集分成最大大小为'batch_size'的批次。”没有确切大小的保证。 - spookylukey

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接