为什么遍历一个大 Django QuerySet 会消耗大量的内存?

135

所提到的表格大约包含一千万行。

for event in Event.objects.all():
    print event

这导致内存使用逐渐增加到约4GB左右,此时行会快速打印。而第一行打印之前的长时间延迟让我感到惊讶——我原以为它几乎可以立即打印出来。

我还尝试过Event.objects.iterator(),它的行为方式相同。

我不明白Django正在加载内存中的内容或者为什么会这样做。我期望Django在数据库层面上遍历结果,这意味着结果会以大致恒定的速率被打印出来(而不是等待很长时间后一次性全部打印出来)。

我的理解有误吗?

(我不知道是否相关,但我正在使用PostgreSQL。)


11
在较小的计算机上,这甚至可能会导致 Django shell 或服务器立即显示 "Killed"。 - Stefano
10个回答

128
Nate C 差不多就对了。
来自文档

You can evaluate a QuerySet in the following ways:

  • Iteration. A QuerySet is iterable, and it executes its database query the first time you iterate over it. For example, this will print the headline of all entries in the database:

    for e in Entry.objects.all():
        print e.headline
    
当你第一次进入循环并获得查询集的迭代形式时,会检索出你的1000万行数据,这时你需要等待Django加载数据库行并为每个对象创建对象,然后才能返回可迭代结果。之后,你就拥有了所有内存中的结果,并可以开始迭代。
从文档中的阅读来看,iterator() 只是绕过了QuerySet的内部缓存机制。我认为它可能会做一些逐个处理的事情,但相应地,这将需要对你的数据库进行1000万次单独的访问。也许并不是那么理想。
高效地迭代大型数据集仍然是我们没有完全掌握的事情,但是有一些代码片段或许可以为你的目的提供帮助:

2
感谢 @eternicode 的出色回答。最终,我们转向原始 SQL 进行所需的数据库级迭代。 - davidchambers
3
@eternicode 很好的回答,我也遇到了这个问题。自那时以来,Django 有任何相关更新吗? - Zólyomi István
12
自Django 1.11以来的文档表明,iterator()函数确实使用了服务器端游标。 - Jeff C Johnson

58

可能不是最快或最高效的方法,但作为一个现成的解决方案,为什么不使用django核心的Paginator和Page对象呢?具体文档请参考:https://docs.djangoproject.com/en/dev/topics/pagination/

可以这样使用:

from django.core.paginator import Paginator
from djangoapp.models import model

paginator = Paginator(model.objects.all(), 1000) # chunks of 1000, you can 
                                                 # change this to desired chunk size

for page in range(1, paginator.num_pages + 1):
    for row in paginator.page(page).object_list:
        # here you can do whatever you want with the row
    print "done processing page %s" % page

7
自发布后,现在可以进行一些小的改进。Paginator现在拥有page_range属性,以避免样板代码。如果要寻求最小内存开销,可以使用object_list.iterator(),它不会填充查询集缓存。然后需要使用prefetch_related_objects进行预取。 - Ken Colton

42

Django的默认行为是在查询评估时缓存整个QuerySet的结果。您可以使用QuerySet的iterator方法来避免此缓存:

for event in Event.objects.all().iterator():
    print event

https://docs.djangoproject.com/en/stable/ref/models/querysets/#iterator

iterator() 方法评估查询集并直接从结果读取,而不在 QuerySet 级别上进行缓存。该方法能够提高性能,并且在需要访问一次大量对象的情况下可以显著减少内存使用。请注意,缓存仍然在数据库级别上执行。

对我来说,使用 iterator() 可以减少内存使用,但仍然比我预期的高。使用 mpaf 建议的 paginator 方法可以使用更少的内存,但对于我的测试案例而言速度会慢 2-3 倍。

from django.core.paginator import Paginator

def chunked_iterator(queryset, chunk_size=10000):
    paginator = Paginator(queryset, chunk_size)
    for page in range(1, paginator.num_pages + 1):
        for obj in paginator.page(page).object_list:
            yield obj

for event in chunked_iterator(Event.objects.all()):
    print event

9
对于大量记录,数据库游标的性能表现更好。在Django中确实需要使用原始SQL,但是Django游标与SQL游标不同。
对于您的情况,Nate C建议使用LIMIT - OFFSET方法可能已经足够了。对于大量数据,它比游标慢,因为它必须一遍又一遍地运行相同的查询,并且必须跳过越来越多的结果。

4
弗兰克,那绝对是一个好观点,但能否看到一些代码细节来推动解决方案呢?;-) (嗯,这个问题现在已经很老了……) - Stefano

9

Django没有很好的解决方案来从数据库中获取大型数据项。

import gc
# Get the events in reverse order
eids = Event.objects.order_by("-id").values_list("id", flat=True)

for index, eid in enumerate(eids):
    event = Event.object.get(id=eid)
    # do necessary work with event
    if index % 100 == 0:
       gc.collect()
       print("completed 100 items")

values_list 可以用来获取数据库中所有的id,然后逐个获取每个对象。随着时间的推移,内存中会创建大量的对象,在退出for循环之前不会被垃圾回收。上面的代码在消耗100个元素后进行手动垃圾回收。


流式HttpResponse可以是一个解决方案吗?https://dev59.com/jm_Xa4cB1Zd3GeqP2InG - ratata
2
然而,这将导致数据库中的命中次数与循环次数相等,我很担心。 - raratiru

8
这是来自文档: http://docs.djangoproject.com/en/dev/ref/models/querysets/ 数据库活动直到你执行某些操作以评估queryset才会发生。
因此,当运行“print event”时,查询被触发(根据您的命令,这是全表扫描),并加载结果。 请求所有对象,并且没有方法可以获取第一个对象而不获取所有对象。
但是,如果你做类似这样的事情:
Event.objects.all()[300:900]

然后它会在内部向SQL添加偏移量和限制。 http://docs.djangoproject.com/en/dev/topics/db/queries/#limiting-querysets

7

7

在查询集被迭代之前会消耗大量内存,因为整个查询的所有数据库行都会一次性处理成对象,这可能需要大量的处理时间,具体取决于行数。

您可以将查询集分成更小的可消化块。我称这种模式为“分段处理”。以下是我在管理命令中使用的带有进度条的实现方式,首先需要执行pip3 install tqdm安装。

from tqdm import tqdm


def spoonfeed(qs, func, chunk=1000, start=0):
    """
    Chunk up a large queryset and run func on each item.

    Works with automatic primary key fields.

    chunk -- how many objects to take on at once
    start -- PK to start from

    >>> spoonfeed(Spam.objects.all(), nom_nom)
    """
    end = qs.order_by('pk').last()
    progressbar = tqdm(total=qs.count())
    if not end:
        return
    while start < end.pk:
        for o in qs.filter(pk__gt=start, pk__lte=start+chunk):
            func(o)
            progressbar.update(1)
        start += chunk
    progressbar.close()

使用此功能,您需要编写一个函数来对您的对象进行操作:

def set_population(town):
    town.population = calculate_population(...)
    town.save()

然后在您的查询集上运行该函数:
spoonfeed(Town.objects.all(), set_population)

1
看起来这将被构建到1.12中,使用iterate(chunk_size=1000)。 - Kevin Parker
是的,现在有Model.objects.all().iterator(chunk_size=1000),尽管我听到了一些关于它的抱怨。https://nextlinklabs.com/insights/django-big-data-iteration - fmalina

3

这里提供一种包括len和count的解决方案:

class GeneratorWithLen(object):
    """
    Generator that includes len and count for given queryset
    """
    def __init__(self, generator, length):
        self.generator = generator
        self.length = length

    def __len__(self):
        return self.length

    def __iter__(self):
        return self.generator

    def __getitem__(self, item):
        return self.generator.__getitem__(item)

    def next(self):
        return next(self.generator)

    def count(self):
        return self.__len__()

def batch(queryset, batch_size=1024):
    """
    returns a generator that does not cache results on the QuerySet
    Aimed to use with expected HUGE/ENORMOUS data sets, no caching, no memory used more than batch_size

    :param batch_size: Size for the maximum chunk of data in memory
    :return: generator
    """
    total = queryset.count()

    def batch_qs(_qs, _batch_size=batch_size):
        """
        Returns a (start, end, total, queryset) tuple for each batch in the given
        queryset.
        """
        for start in range(0, total, _batch_size):
            end = min(start + _batch_size, total)
            yield (start, end, total, _qs[start:end])

    def generate_items():
        queryset.order_by()  # Clearing... ordering by id if PK autoincremental
        for start, end, total, qs in batch_qs(queryset):
            for item in qs:
                yield item

    return GeneratorWithLen(generate_items(), total)

使用方法:

events = batch(Event.objects.all())
len(events) == events.count()
for event in events:
    # Do something with the Event

0

我通常使用原始的MySQL查询语句来完成这种任务,而不是使用Django ORM。

MySQL支持流模式,因此我们可以安全快速地循环遍历所有记录,而不会出现内存错误。

import MySQLdb
db_config = {}  # config your db here
connection = MySQLdb.connect(
        host=db_config['HOST'], user=db_config['USER'],
        port=int(db_config['PORT']), passwd=db_config['PASSWORD'], db=db_config['NAME'])
cursor = MySQLdb.cursors.SSCursor(connection)  # SSCursor for streaming mode
cursor.execute("SELECT * FROM event")
while True:
    record = cursor.fetchone()
    if record is None:
        break
    # Do something with record here

cursor.close()
connection.close()

参考:

  1. 从MySQL检索数百万行
  2. MySQL结果集流式处理与一次性获取整个JDBC ResultSet的性能对比

1
你仍然可以使用Django ORM来生成查询。只需在执行中使用结果的queryset.query即可。 - Pol

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接