SQLAlchemy使用生成器执行批量操作

5
问题:对于Connection对象的execute函数,是否可以使用返回字典而不是字典列表的生成器来执行“executemany”插入操作?
详情:我正在通过核心表达式学习SQLAlchemy。作为一个测试,我有一个相当大的数据集,通过迭代器从文件中访问,我试图将其转移到PostgreSQL表中,但是逐个插入行非常缓慢(请参见下面的示例1)。根据文档,如果传递一个字典列表而不是单个字典,则Connnection对象的execute()函数将执行等效的executemany()。我进行了一些快速测试,确实对于插入组,这种方法要快得多。不幸的是,由于我的大型数据集,我无法在内存中创建完整的字典列表,因此出现了我的问题...
示例1:对于大量数据,以下(伪)代码非常缓慢
from sqlalchemy import MetaData, Table, Column

metadata = MetaData()
data = Table('data', metadata, Column...)

engine = sql.create_engine('postgresql://user:pass$@localhost/testdb')
metadata.create_all(engine)

conn = engine.connect()
ins = data.insert()
for datum in large_data_iterator:
    datum_dict = do_some_proc(datum)
    conn.execute(ins, datum_dict)

由于execute可以接受多个值,将最后的for循环替换为以下生成器版本会更好:
def datagen(iterator):
    for datum in large_data_iterator:
        datum_dict = do_some_proc(datum)
        yield datum_dict

conn = engine.connect()
ins = data.insert()
conn.execute(ins, datagen(large_data_iterator))

然而,这引发了以下异常:AttributeError: 'list' object has no attribute 'keys'。
有人知道是否可能使生成器版本工作吗?或者有更好的方法也是很好的。谢谢!
注意:我测试了一个修改后的生成器表达式,它将块作为字典列表生成(如下所示),并且比单个执行更快。但是,我不知道如何选择最佳块数,并且我担心增加了我的生成器代码的复杂性,可能会更容易出错。(但如果这是唯一的方法...)
def datagen(iterator):
    output = []
    N = 0
    for datum in large_data_iterator:
        datum_dict = do_some_proc(datum)
        output.append(datum_dict)
        N += 1
        if N == 100: # or whatever
            yield output
            N = 0
            output = []
    if output != []:
        yield output
1个回答

0

Connectionexecution_options,可以使用 stream_results 参数,但不幸的是在底部写着“该标志目前仅被 psycopg2 方言理解”,尽管还有其他支持流式传输的驱动程序(例如 oursql)。

在 sqlalchemy 完全支持它之前,您可以轻松编写一个帮助函数来将任何可迭代对象分成块,以避免修改生成器时出现错误。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接