使用SQLAlchemy的临时表

15

我正尝试使用SQLAlchemy创建一个临时表,并将其与现有表连接。目前为止,我的代码如下:

engine = db.get_engine(db.app, 'MY_DATABASE')
df = pd.DataFrame({"id": [1, 2, 3], "value": [100, 200, 300], "date": [date.today(), date.today(), date.today()]})
temp_table = db.Table('#temp_table',
                      db.Column('id', db.Integer),
                      db.Column('value', db.Integer),
                      db.Column('date', db.DateTime))
temp_table.create(engine)
df.to_sql(name='tempdb.dbo.#temp_table',
          con=engine,
          if_exists='append',
          index=False)
query = db.session.query(ExistingTable.id).join(temp_table, temp_table.c.id == ExistingTable.id)
out_df = pd.read_sql(query.statement, engine)
temp_table.drop(engine)
return out_df.to_dict('records')

由于to_sql执行的插入语句未被运行,导致没有返回任何结果(我认为这是因为它们使用sp_prepexec运行,但我不完全确定)。

然后我尝试直接编写SQL语句(CREATE TABLE #temp_table...INSERT INTO #temp_table...SELECT [id] FROM...),然后运行pd.read_sql(query, engine),但出现了以下错误消息:

此结果对象不返回行。 它已自动关闭。

我猜这是因为该语句不仅仅是SELECT

我该如何解决这个问题(两种方法都可以,虽然第一种方法更好,因为它避免了硬编码SQL)。 明确地说,我不能修改现有数据库中的架构 - 它是供应商数据库。


ExistingTable 中是否有任何记录? - Azat Ibrakov
@AzatIbrakov 是的。我实际上将它更改为左连接,并添加了 temp_table.c.date 以确保。我得到了在 date 列中有 None 的行。 - Kris Harper
我在sqlite上进行了测试,它会导致问题。 - Azat Ibrakov
@AzatIbrakov 嗯,可以改成“日期”或其他可行的名称。实际上,该值并不重要。不过我不确定sqllite如何处理临时表。 - Kris Harper
我认为这是因为它们是使用sp_prepexec运行的。这是有道理的。在存储过程中创建的临时表,包括对sp_executesql、sp_prepexec等的调用,与在顶层批处理中创建的临时表不同,并且将在调用结束时自动销毁。 - David Browne - Microsoft
显示剩余7条评论
3个回答

25

如果要插入到临时表的记录数较少/适中,一种可能的方法是使用 literal subqueryvalues CTE,而不是创建临时表。

# MODEL
class ExistingTable(Base):
    __tablename__ = 'existing_table'
    id = sa.Column(sa.Integer, primary_key=True)
    name = sa.Column(sa.String)
    # ...

假设还要将以下数据插入到temp表中:

Assume also following data is to be inserted into temp table:

# This data retrieved from another database and used for filtering
rows = [
    (1, 100, datetime.date(2017, 1, 1)),
    (3, 300, datetime.date(2017, 3, 1)),
    (5, 500, datetime.date(2017, 5, 1)),
]
创建一个包含该数据的CTE或子查询:
stmts = [
    # @NOTE: optimization to reduce the size of the statement:
    # make type cast only for first row, for other rows DB engine will infer
    sa.select([
        sa.cast(sa.literal(i), sa.Integer).label("id"),
        sa.cast(sa.literal(v), sa.Integer).label("value"),
        sa.cast(sa.literal(d), sa.DateTime).label("date"),
    ]) if idx == 0 else
    sa.select([sa.literal(i), sa.literal(v), sa.literal(d)])  # no type cast

    for idx, (i, v, d) in enumerate(rows)
]
subquery = sa.union_all(*stmts)

# Choose one option below.
# I personally prefer B because one could reuse the CTE multiple times in the same query
# subquery = subquery.alias("temp_table")  # option A
subquery = subquery.cte(name="temp_table")  # option B

使用所需的连接和过滤条件创建最终查询:

query = (
    session
    .query(ExistingTable.id)
    .join(subquery, subquery.c.id == ExistingTable.id)
    # .filter(subquery.c.date >= XXX_DATE)
)

# TEMP: Test result output
for res in query:
    print(res)    

最后,获取pandas数据框:

out_df = pd.read_sql(query.statement, engine)
result = out_df.to_dict('records')

哈,我刚在今天上班路上就想做这件事了。我会试一试并让你知道的。 - Kris Harper
1
性能方面,这个解决方案有多好?Union all 对我来说似乎非常沉重...需要测试一下,但如果有关于性能的一些想法就太棒了... - Alexander B.
为什么要使用 UNION ALL? - user749127
使用UNION ALL而不是仅使用UNION,以避免数据库引擎将重复项合并为一个。可以在此代码中执行此操作。有关更多信息,请参见https://dev59.com/qnVD5IYBdhLWcg3wNY1Z#49928。 我认为这个解决方案并不“繁重”,因为它明确适用于不是过多的记录数。 - van
@van 你认为“小/中等”记录数量是多少?我想使用临时表(并使用csv进行COPY FROM)或CTE来实现大约2500行的复杂插入。对于此操作,使用INSERT INTO VALUES非常缓慢。 - freethebees
@freethebees,对你来说最好的方法是对解决方案进行分析并查看其是否有效。此外,自回答发布以来,sqalalchemy应该有更优化的方式来插入多个数据与其他结构。 - van

1
你可以尝试使用另一种解决方案 - 进程关键表
进程关键表只是一个永久性表格,用作临时表格。为了允许进程同时使用该表格,该表格具有额外的列来标识进程。最简单的方法是全局变量@@spid(@@spid是SQL Server中的进程ID)。

...

一个可替代进程键的选择是使用GUID(数据类型uniqueidentifier)。

http://www.sommarskog.se/share_data.html#prockeyed


你是在建议在tempdb中创建这个表吗?我认为无论哪种方式,都会遇到我在问题的第二部分遇到的相同问题,即read_sql不返回任何行。 - Kris Harper
这个表应该在你的数据库(MY_DATABASE)中创建,而不是临时数据库。虽然不是很好,但应该可以工作。 - Mikhail Lobanov
我没有权限在那个数据库中创建表。那是一个供应商的数据库。 - Kris Harper

0
为什么不使用 SQLAlchemy values 表达式呢?这非常简单,正如文档中所述:

...列表达式和实际数据对于Values来说是分开的。构造函数通常接收列表达式,比如column()构造函数,然后通过Values.data()方法传递数据,数据以列表形式传递,可以多次调用该方法以添加更多数据...

文档还提供了一个很好的示例:

from sqlalchemy import column
from sqlalchemy import values

value_expr = values(
    column('id', Integer),
    column('name', String),
    name="my_values"
).data(
    [(1, 'name1'), (2, 'name2'), (3, 'name3')]
)

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接