tqdm能够与数据库读取一起使用吗?

13
在从 SQL 数据库读取大型关系到 pandas dataframe 时,如果能够有进度条就更好了,因为元组数量是静态已知的,可以估计 I/O 速率。看起来 tqdm 模块有一个函数 tqdm_pandas,可以在列上映射函数时报告进度,但默认情况下调用它不会像这样报告 I/O 进度。是否可以使用 tqdm 在调用 pd.read_sql 时制作进度条?

3
抱歉,由于pandas只是将查询分派给数据库然后等待响应,所以无法提供中间反馈,直到整个结果集到达之前都不会有任何中间反馈。 - Michael Griffiths
3个回答

8

编辑:答案有误 - chunksize 对数据库操作没有影响。请参见下面的评论。

您可以使用chunksize参数来执行以下操作:

chunks = pd.read_sql('SELECT * FROM table', con=conn, chunksize=100)

df = pd.DataFrame()
for chunk in tqdm(chunks):
    df = pd.concat([df, chunk])

我认为这也会使用更少的内存。


12
这不起作用的原因是块处理是在read_sql完成加载整个数据集之后进行的。 在这里,tqdm所做的就是测量pd.concat操作的进度。 - Steven
3
逐个连接块非常低效,通常应避免。 - Wei Qiu
那么,如果它不起作用,这怎么会被标记为被接受的答案呢? - jtlz2
1
我刚刚发现,如果我们在create_engine中传递execution_options = {'stream_results':True},那么它就不会一次性加载整个内容。相反,数据库将一次流式传输一个块,物理内存限制就不会困扰你了。 - Oshan

0

我想提供一种替代性答案,之前的解决方案只是提供了 concat 操作的状态。还有一些"最佳猜测"的工作,但我认为这个解决方案离可行解决方案更近了一些。

摘要

总体思路是每秒钟检查函数执行的状态,并填充一个状态栏,达到任意定义的秒数上限。这些值会受到影响,如果执行时间超过了定义的最大时间,那么就会移除状态栏。

  • 创建一个手动tqdm进度条,并根据需要调整设置。
  • 使用 asyncio.create_task 来生成一个任务。使用 asyncio.to_thread 将 pandas 的读取方法包装为协程。
  • 在 while 循环中检查任务状态,如果执行未完成则更新进度条,否则返回结果。
async def sql_handler(query, conn):
    progress = tqdm(desc="Executing SQL Query",total=8)
    task = asyncio.create_task(
        asyncio.to_thread(pd.read_sql, sql=query, con=connection)
    )

    while True:
        status = task.done()
        progress.update()
        if status:
            return task.result()
        await asyncio.sleep(1)

async def main():
    await self.sql_handler(query=..., conn=...)

if __name__ == "__main__":
    asyncio.run(main())

完整示例

这是一个更完整的示例,使用sqlalchemy从snowflake数据库中读取,并包括一些设置以帮助获得所需的输出

import asyncio
import pandas as pd
from pandas import DataFrame
from snowflake.sqlalchemy import URL
from sqlalchemy import create_engine
from sqlalchemy.engine import Engine
from tqdm import tqdm

async def sql_handler(query: str, connection: Engine) -> DataFrame:
    progress = tqdm(
        desc="Executing SQL Query",
        ncols=100,
        bar_format="{desc}: {bar}[{elapsed}]",
        total=8,
        )
    task = asyncio.create_task(
        asyncio.to_thread(pd.read_sql, sql=query, con=connection)
    )
    while True:
        status = task.done()
        progress.update()
        if status:
            return task.result()
        # With progress.total = 8 -> Check status every 1 second for 8 seconds
        await asyncio.sleep(1)

async def main() -> None:
    conn = create_engine(
        URL(account=...,
            user=...,
            password=...,
            warehouse=...,
            database=...,
            schema=...,
        )
    )
    query = "SELECT * FROM table"
    results_data_frame = await self.sql_handler(query, conn)
    print(results_data_frame.to_dict(orient="records")

if __name__ == "__main__":
    asyncio.run(main())

输出

Executing SQL Query: █████████████████████████████████████████      [00:03]

asyncio_tqdm


-1

可以的!

扩展答案这里和Alex的答案,包括tqdm,我们得到:

# get total number or rows
q = f"SELECT COUNT(*) FROM table"
total_rows = pd.read_sql_query(q, conn).values[0, 0]
# note that COUNT implementation should not download the whole table. 
# some engine will prefer you to use SELECT MAX(ROWID) or whatever...

# read table with tqdm status bar
q = f"SELECT * FROM table"
rows_in_chunk = 1_000
chunks = pd.read_sql_query(q, conn, chunksize=rows_in_chunk)
df = tqdm(chunks, total=total_rows/rows_in_chunk)
df = pd.concat(df)

输出示例:

39%|███▉      | 99/254.787 [01:40<02:09,  1.20it/s]

这个不起作用。pd.read_sql_query会将所有数据块全部加载到内存中。tqdm仅显示连接过程的进度。 - Oshan
我认为它确实有效,否则pd.read_sql_query中的chunksize是用于什么的... 根据chunsize的手册: 如果指定了值,返回一个迭代器,其中每个块包含的行数为chunksize。 在这段代码中,当pd.concat使用df时,Python首先在其中提取带有提取过程的tqdm,完成提取后再进行连接。 - lisrael1

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接