SQLAlchemy在查询[INFORMATION_SCHEMA].[TABLES]时插入数据卡住了。

5

我有一个使用SQLAlchemy将数据插入到MS SQL Server DB的Python进程。当Python进程运行时,它会在插入期间挂起。我打开了SQLAlchemy日志以获得更多信息。我发现它挂起的原因是SQLAlchemy似乎正在请求整个数据库的表架构信息:

2020-10-30 08:12:07 [11444:6368] sqlalchemy.engine.base.Engine._execute_context(base.py:1235) INFO: SELECT [INFORMATION_SCHEMA].[TABLES].[TABLE_NAME] 
FROM [INFORMATION_SCHEMA].[TABLES] 
WHERE [INFORMATION_SCHEMA].[TABLES].[TABLE_SCHEMA] = CAST(? AS NVARCHAR(max)) AND [INFORMATION_SCHEMA].[TABLES].[TABLE_TYPE] = CAST(? AS NVARCHAR(max)) ORDER BY [INFORMATION_SCHEMA].[TABLES].[TABLE_NAME]
2020-10-30 08:12:07 [11444:6368] sqlalchemy.engine.base.Engine._execute_context(base.py:1240) INFO: ('dbo', 'BASE TABLE')

我数据库中有其他“东西”正在进行,包括一些未完成的事务,我的猜测是由于某种原因查询 [INFORMATION_SCHEMA].[TABLES] 会创建一些死锁或阻塞。

我也读到过 (这里),[INFORMATION_SCHEMA].[TABLES] 是一个视图,不会引起死锁,这与我猜测导致此问题的情况相矛盾。

我的问题是:我能否更改 SQLAlchemy 的配置/设置,以便它首先不进行此查询?

更新1:插入的 Python 代码如下:

with sqlalchemy.create_engine("mssql+pyodbc:///?odbc_connect=%s" % params).connect() as connection:
    # df is a Pandas DataFrame
    df.to_sql(name=my_table, con=connection, if_exists='append', index=False)

请注意,当我在其他没有进行数据库事务的时间运行Python脚本时,代码可以正常工作。在这些情况下,日志会立即继续列出数据库中的所有表格:
2020-10-30 08:13:03 [11444:6368] sqlalchemy.engine.base.Engine._execute_context(base.py:1235) INFO: SELECT [INFORMATION_SCHEMA].[TABLES].[TABLE_NAME] 
FROM [INFORMATION_SCHEMA].[TABLES] 
WHERE [INFORMATION_SCHEMA].[TABLES].[TABLE_SCHEMA] = CAST(? AS NVARCHAR(max)) AND [INFORMATION_SCHEMA].[TABLES].[TABLE_TYPE] = CAST(? AS NVARCHAR(max)) ORDER BY [INFORMATION_SCHEMA].[TABLES].[TABLE_NAME]
2020-10-30 08:13:03 [11444:6368] sqlalchemy.engine.base.Engine._execute_context(base.py:1240) INFO: ('dbo', 'BASE TABLE')
2020-10-30 08:13:03 [11444:6368] sqlalchemy.engine.base.Engine._init_metadata(result.py:810) DEBUG: Col ('TABLE_NAME',)
2020-10-30 08:13:03 [11444:6368] sqlalchemy.engine.base.Engine.process_rows(result.py:1260) DEBUG: Row ('t_table1',)
2020-10-30 08:13:03 [11444:6368] sqlalchemy.engine.base.Engine.process_rows(result.py:1260) DEBUG: Row ('t_table2',)
...

更新2: 显然,当在一个未提交的事务中创建一个表或其他对象时,查询[INFORMATION_SCHEMA].[TABLES]将被阻塞(来源)。有没有人熟悉SQLAlchemy的内部结构,以建议如何防止它首先进行此查询?

更新3:在将此问题发布到SQLAlchemy github后(问题链接),SQLAlchemy开发人员确认[INFORMATION_SCHEMA].[TABLES]的查询实际上是由Pandas函数 to_sql()引起的。

因此,我的新问题是,有没有人知道如何在Pandas的to_sql()函数中禁用此行为?我查看了文档,但找不到任何有助于解决这个问题的东西。


在GitHub上讨论此处 - Gord Thompson
@Joe - “只是为了明确,当你测试这个时,你在同一个数据库上有一个开放的事务,在该事务中创建了一个新表?” - 是的,完全正确。 - Gord Thompson
有没有人找到解决这个问题的方法?我们因为这个问题卡了几天了。 - PlunkettBoy
我目前还没有找到更好的解决方案,只能临时使用 monkey patching Pandas。 - Joe
当一个独立的进程正在创建大型列存储索引时,也会遇到这个问题。它会锁定 information_schema 并阻止 to_sql 运行。 - Gabe
显示剩余9条评论
3个回答

1
在调用 to_sql 前执行 set transaction isolation level read uncommitted。

1

我对SQLAlchemy不是很熟悉,但我可以告诉你关于这个问题的Pandas方面。

Pandas会自动创建一个新表,如果该表不存在。它确定表是否存在的方式是调用SQL Alchemy中的has_table()has_table()的工作方式是查询信息模式。(至少在MySQL和MSSQL中是这样工作的。)

实现细节

以下是我在Pandas和SQLAlchemy中跟踪此逻辑时发现的内容。我们从pandas/io/sql.py中的to_sql()开始。

        table = SQLTable(
            name,
            self,
            frame=frame,
            index=index,
            if_exists=if_exists,
            index_label=index_label,
            schema=schema,
            dtype=dtype,
        )
        table.create()

SQLTable.create() 在这里被定义:

class SQLTable(PandasObject):
    [...]
    def create(self):
        if self.exists():
            if self.if_exists == "fail":
                raise ValueError(f"Table '{self.name}' already exists.")
            elif self.if_exists == "replace":
                self.pd_sql.drop_table(self.name, self.schema)
                self._execute_create()
            elif self.if_exists == "append":
                pass
            else:
                raise ValueError(f"'{self.if_exists}' is not valid for if_exists")
        else:
            self._execute_create()

请注意它无条件地调用了exists()。在SQLTable.exists()内部,您会发现这个:
    def exists(self):
        return self.pd_sql.has_table(self.name, self.schema)

这最终在SQLAlchemy中调用has_table()https://docs.sqlalchemy.org/en/13/core/internals.html#sqlalchemy.engine.default.DefaultDialect.has_table 对于MSSQL,在SQLAlchemy的sqlalchemy/dialects/mssql/base.py中实现:
    @_db_plus_owner
    def has_table(self, connection, tablename, dbname, owner, schema):
        if tablename.startswith("#"):  # temporary table
            [...]
        else:
            tables = ischema.tables

            s = sql.select(tables.c.table_name).where(
                sql.and_(
                    tables.c.table_type == "BASE TABLE",
                    tables.c.table_name == tablename,
                )
            )

            if owner:
                s = s.where(tables.c.table_schema == owner)

            c = connection.execute(s)

            return c.first() is not None

(ischema 是 information_schema 的缩写,这段代码在该表上运行了一个 select 语句。)

如何解决

我没有看到一个好的、简单的方法来解决这个问题。Pandas 假设 has_table() 是一项廉价操作。MSSQL 不遵循这个假设。无论 if_exists 设置为什么,Pandas 都会在 to_sql() 中调用 has_table()

不过,我可以想到一个 hacky 的方法来解决这个问题。如果你 monkey-patch pandas.io.sql.SQLTable.create(),使其成为一个 no-op,那么你就可以欺骗 Pandas 认为表已经存在了。这样做的缺点是 Pandas 不会自动创建表。


2
我认为你基本上是在正确的轨道上,但问题发生在插入之后。当Pandas to_sql检查表是否可能存在大小写敏感性问题(请查看:#check for potentially case sensitivity issues(GH7815) in to_sql)时,它最终会调用SQLAlchemy中的函数get_table_names,这最终导致了阻塞。 - Joe
1
这里有一个相关的(尽管已关闭)GitHub问题:https://github.com/pandas-dev/pandas/issues/36542 - Gabe
这是一个 GitHub 存储库链接,无法直接提供翻译内容。请提供您要翻译的具体文本,我将尽力帮助您进行翻译。 - Gabe

0

我创建了这个问题以便跟踪

这只会发生在表名中有大写字母的情况下。`MyTable`会卡住,而`mytable`会成功完成。
这是Pandas还是SQLAlchemy需要纠正的问题?
解决方法
有一个解决方法 - 在SQLAlchemy引擎中将`Transaction Isolation Level`设置为`Read Uncommitted`。

https://docs.sqlalchemy.org/en/20/dialects/mssql.html#transaction-isolation-level

engine = sa.create_engine("mssql+pyodbc:///?odbc_connect={}".format(params)
                          , fast_executemany=True
                           , isolation_level="READ UNCOMMITTED"
                          )

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接