在从 SQL 数据库读取大型关系到 pandas dataframe 时,如果能够有进度条就更好了,因为元组数量是静态已知的,可以估计 I/O 速率。看起来 tqdm 模块有一个函数 tqdm_pandas,可以在列上映射函数时报告进度,但默认情况下调用它不会像这样报告 I/O 进度。是否可以使用 tqdm 在调用 pd.read_sql 时制作进度条?
编辑:答案有误 - chunksize
对数据库操作没有影响。请参见下面的评论。
您可以使用chunksize
参数来执行以下操作:
chunks = pd.read_sql('SELECT * FROM table', con=conn, chunksize=100)
df = pd.DataFrame()
for chunk in tqdm(chunks):
df = pd.concat([df, chunk])
我认为这也会使用更少的内存。
read_sql
完成加载整个数据集之后进行的。 在这里,tqdm
所做的就是测量pd.concat
操作的进度。 - Stevenexecution_options = {'stream_results':True}
,那么它就不会一次性加载整个内容。相反,数据库将一次流式传输一个块,物理内存限制就不会困扰你了。 - Oshan我想提供一种替代性答案,之前的解决方案只是提供了 concat 操作的状态。还有一些"最佳猜测"的工作,但我认为这个解决方案离可行解决方案更近了一些。
摘要
总体思路是每秒钟检查函数执行的状态,并填充一个状态栏,达到任意定义的秒数上限。这些值会受到影响,如果执行时间超过了定义的最大时间,那么就会移除状态栏。
asyncio.create_task
来生成一个任务。使用 asyncio.to_thread
将 pandas 的读取方法包装为协程。async def sql_handler(query, conn):
progress = tqdm(desc="Executing SQL Query",total=8)
task = asyncio.create_task(
asyncio.to_thread(pd.read_sql, sql=query, con=connection)
)
while True:
status = task.done()
progress.update()
if status:
return task.result()
await asyncio.sleep(1)
async def main():
await self.sql_handler(query=..., conn=...)
if __name__ == "__main__":
asyncio.run(main())
完整示例
这是一个更完整的示例,使用sqlalchemy从snowflake数据库中读取,并包括一些设置以帮助获得所需的输出
import asyncio
import pandas as pd
from pandas import DataFrame
from snowflake.sqlalchemy import URL
from sqlalchemy import create_engine
from sqlalchemy.engine import Engine
from tqdm import tqdm
async def sql_handler(query: str, connection: Engine) -> DataFrame:
progress = tqdm(
desc="Executing SQL Query",
ncols=100,
bar_format="{desc}: {bar}[{elapsed}]",
total=8,
)
task = asyncio.create_task(
asyncio.to_thread(pd.read_sql, sql=query, con=connection)
)
while True:
status = task.done()
progress.update()
if status:
return task.result()
# With progress.total = 8 -> Check status every 1 second for 8 seconds
await asyncio.sleep(1)
async def main() -> None:
conn = create_engine(
URL(account=...,
user=...,
password=...,
warehouse=...,
database=...,
schema=...,
)
)
query = "SELECT * FROM table"
results_data_frame = await self.sql_handler(query, conn)
print(results_data_frame.to_dict(orient="records")
if __name__ == "__main__":
asyncio.run(main())
输出
Executing SQL Query: █████████████████████████████████████████ [00:03]
可以的!
扩展答案这里和Alex的答案,包括tqdm,我们得到:
# get total number or rows
q = f"SELECT COUNT(*) FROM table"
total_rows = pd.read_sql_query(q, conn).values[0, 0]
# note that COUNT implementation should not download the whole table.
# some engine will prefer you to use SELECT MAX(ROWID) or whatever...
# read table with tqdm status bar
q = f"SELECT * FROM table"
rows_in_chunk = 1_000
chunks = pd.read_sql_query(q, conn, chunksize=rows_in_chunk)
df = tqdm(chunks, total=total_rows/rows_in_chunk)
df = pd.concat(df)
输出示例:
39%|███▉ | 99/254.787 [01:40<02:09, 1.20it/s]
pd.read_sql_query
会将所有数据块全部加载到内存中。tqdm
仅显示连接过程的进度。 - Oshan
pandas
只是将查询分派给数据库然后等待响应,所以无法提供中间反馈,直到整个结果集到达之前都不会有任何中间反馈。 - Michael Griffiths