内存高效的内置SqlAlchemy迭代器/生成器?

106

我有一个包含约10万条记录的MySQL表,使用SqlAlchemy进行接口交互。 我发现对于这个表的大型子集的查询将会消耗太多内存,尽管我认为我正在使用一种内置的生成器来智能地获取数据集的小块:

for thing in session.query(Things):
    analyze(thing)
为了避免这种情况,我发现我必须构建自己的迭代器,分块处理数据:
lastThingID = None
while True:
    things = query.filter(Thing.id < lastThingID).limit(querySize).all()
    if not rows or len(rows) == 0: 
        break
    for thing in things:
        lastThingID = row.id
        analyze(thing)

这是正常的吗?对于SA内置发生器,我是否遗漏了什么?

回答这个问题的答案似乎表明不应该期望内存消耗。


我有一个非常相似的东西,只是它产生了“thing”。比所有其他解决方案都要好。 - iElectric
3
是Thing.id > lastThingID吗?“rows”是什么意思? - synergetic
7个回答

129
大多数DBAPI实现在获取行时完全缓冲它们 - 所以通常,在SQLAlchemy ORM甚至获得一个结果之前,整个结果集都已经在内存中了。
但是,Query的工作方式是默认情况下完全加载给定的结果集,然后再返回您的对象。这里的原理涉及到不仅仅是简单的SELECT语句的查询。例如,在连接到其他可能在一个结果集中多次返回相同对象标识的表(与急切加载一起很常见)时,需要将整个行集放在内存中,以便可以返回正确的结果,否则集合等可能只有部分填充。
因此,Query通过yield_per()提供了更改此行为的选项。此调用将导致Query按批次生成行,其中您给出批处理大小。正如文档所述,如果您没有对集合进行任何类型的急切加载,则此选项才是合适的,因此基本上是如果您确实知道自己在做什么。此外,如果底层DBAPI预缓冲行,则仍将存在该内存开销,因此该方法的扩展性仅比不使用它略好。
我很少使用yield_per(); 相反,我使用更好的窗口函数方法来代替您上面提到的LIMIT方法。 LIMIT和OFFSET有一个巨大的问题,即非常大的OFFSET值会导致查询变得越来越慢,因为N个OFFSET会使其遍历N行 - 这就像执行相同的查询五十次而不是一次,每次读取越来越多的行。通过窗口函数方法,我预先获取了一组“窗口”值,这些值引用我想要选择的表的块。然后,我发出单独的SELECT语句,每个语句每次从其中一个窗口中拉取。

窗口函数方法在wiki上,我使用它取得了巨大的成功。

还请注意:并非所有数据库都支持窗口函数;您需要Postgresql、Oracle或SQL Server。在我看来,至少使用Postgresql绝对是值得的 - 如果您正在使用关系型数据库,那么最好使用最好的。


1
yield_per()选项始终存在,当您确信发出的查询与提供部分结果集兼容时,可以使用它。我花了几天时间进行马拉松式的尝试,试图在所有情况下启用此行为,但总是会出现一些模糊不清的边缘情况,直到您的程序使用其中之一失败。特别是,不能假定依赖于排序。像往常一样,我欢迎实际的代码贡献。 - zzzeek
1
主要问题在于需要考虑到每种可能的使用情况,包括当相同的类组多次不相关地出现在结果中时,以及如何执行上述“检查连接结果结束”的逻辑,该逻辑必须递归地工作。一旦将行为变得隐式化,那么您就需要承担所有未来的错误。如果我有一个开发团队的资源支持,也许这是可以支持的,但是它变得非常复杂,而收益却不大。 - zzzeek
@zzzeek感谢您的解释,建议的窗口函数方法可能对我不起作用,因为在浏览数据结果时,有些行可能会更改(在两个SELECT之间),但我需要当第一个SELECT进来时实际的数据。我无法将所有结果加载到内存中,因为它太大了。所以我正在测试yield_per,看起来内存仍然受控制。我想使用yield_per函数,并尝试理解它是否适合我的情况。是否有更详细的说明示例,说明何时以及为什么连接或子查询会成为问题? - schatten
@zzzeek 我已经阅读了这个线程和手册页面好几次,但仍然不明白查询何时以及为什么可能不适合使用yield_per。谢谢。 - schatten
1
由于我正在使用Postgres,看起来可以使用可重复读取的只读事务,并在该事务中运行所有窗口查询。 - schatten
显示剩余10条评论

51

我不是数据库专家,但当使用SQLAlchemy作为简单的Python抽象层(即不使用ORM查询对象)时,我想出了一种令人满意的解决方案,可以查询一个拥有3亿行数据表而不会导致内存使用过大...

这里有一个虚拟例子:

from sqlalchemy import create_engine, select

conn = create_engine("DB URL...").connect()
q = select([huge_table])

proxy = conn.execution_options(stream_results=True).execute(q)

然后,我使用SQLAlchemy的fetchmany()方法在一个无限的while循环中迭代结果:

while 'batch not empty':  # equivalent of 'while True', but clearer
    batch = proxy.fetchmany(100000)  # 100,000 rows at a time

    if not batch:
        break

    for row in batch:
        # Do your stuff here...

proxy.close()

这种方法让我能够进行各种数据聚合,而不会有危险的内存开销。

注意: stream_results 适用于Postgres和 pyscopg2 适配器,但我猜它不会与任何DBAPI或数据库驱动程序一起工作...

这篇博客文章 中有一个有趣的用例启发了我上述的方法。


3
如果一个人正在使用Postgres或MySQL(与pymysql一起),在我看来,这应该是被接受的答案。 - Yuki Inoue
2
拯救了我的生命,我发现我的查询越来越慢。我已经在pyodbc上进行了上述仪器化(从SQL Server到Postgres),现在它运行得像梦一样顺畅。 - Ed Baker
2
这对我来说是最好的方法。由于我正在使用ORM,我需要将SQL编译成我的方言(Postgres),然后直接从连接(而不是会话)执行,如上所示。我在这个问题中找到了编译“how to”的方法:https://dev59.com/q2445IYBdhLWcg3w9Oss。速度的提高很大。从JOINS到SUBQUERIES的转换也大大提高了性能。还建议使用sqlalchemy_mixins,使用smart_query有助于构建最有效的查询。https://github.com/absent1706/sqlalchemy-mixins - Gustavo Gonçalves

17

我一直在研究如何使用SQLAlchemy实现高效的遍历/分页,并希望更新这个答案。

我认为你可以使用切片调用来正确限制查询范围,并且你可以高效地重复使用它。

示例:

window_size = 10  # or whatever limit you like
window_idx = 0
while True:
    start,stop = window_size*window_idx, window_size*(window_idx+1)
    things = query.slice(start, stop).all()
    if things is None:
        break
    for thing in things:
        analyze(thing)
    if len(things) < window_size:
        break
    window_idx += 1

这看起来非常简单和快速。我不确定.all()是否必要。我注意到第一次调用后速度有了很大的提升。 - hamx0r
@hamx0r 我知道这是一个旧评论,所以只是留下来供后人参考。如果没有.all()things变量就是一个不支持len()的查询。 - David
1
在底层,这会生成形如 SELECT * FROM tbl LIMIT x OFFSET y 的查询语句,因此对于大型结果集,随着偏移量的增加,它将变得越来越慢,正如被接受的答案所描述的那样。 - snakecharmerb

10

遵循Joel的回答精神,我使用以下内容:

WINDOW_SIZE = 1000
def qgen(query):
    start = 0
    while True:
        stop = start + WINDOW_SIZE
        things = query.slice(start, stop).all()
        if len(things) == 0:
            break
        for thing in things:
            yield thing
        start += WINDOW_SIZE

things = query.slice(start, stop).all()将在最后返回[],而while循环永远不会停止。 - Martin Reguly

4
使用LIMIT/OFFSET是不好的,因为您需要先找到所有{OFFSET}列,所以OFFSET越大,请求时间就越长。 对于具有大量数据的大型表,使用窗口查询也会导致糟糕的结果(您需要等待太长时间才能得到第一批结果,在我的情况下,这对分块Web响应来说并不好)。 最好的方法在此处给出https://dev59.com/6F8d5IYBdhLWcg3wnzNz#27169302。在我的情况下,我简单地使用日期时间字段上的索引解决了问题,并使用datetime>=previous_datetime获取下一个查询。很傻,因为我之前在不同情况下使用了该索引,但认为对于获取所有数据的窗口查询会更好。在我的情况下,我错了。

2
据我所知,第一种变体仍然会从表中获取所有元组(使用一个SQL查询),但在迭代时为每个实体构建ORM表示。因此,它比在迭代之前构建所有实体的列表更有效率,但您仍然需要将所有(原始)数据加载到内存中。
因此,在大型表上使用LIMIT似乎对我是个好主意。

0

如果您正在使用Postgres或支持游标的RDBMS,则可以非常简单地高效迭代大量行:

 with db_session() as session:
     for partition in session.stream(select(Thing.id)).partitions(1000):
         for item in partition:
             analyze(item)

这将创建一个向前的游标,以每批1000行获取结果,从而使服务器和客户端的内存使用最小化。


这是否是由SQLAlchemy ORM正常使用的Session对象?我得到了“'Session' object has no attribute 'stream'”错误。 - Matt
1
在这里,session = sessionmaker(class_=AsyncSession, future=True, ...)()。我正在使用Sqlalchemy主版本,1.4也应该支持这个。 - auxsvr

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接