使用asyncio创建tqdm进度条

13

我正在尝试使用asyncio收集任务来创建tqdm进度条。

希望在每个任务完成时逐步更新进度条。尝试了以下代码:

import asyncio
import tqdm
import random

async def factorial(name, number):
    f = 1
    for i in range(2, number+1):
        await asyncio.sleep(random.random())
        f *= i
    print(f"Task {name}: factorial {number} = {f}")

async def tq(flen):
    for _ in tqdm.tqdm(range(flen)):
        await asyncio.sleep(0.1)

async def main():
    # Schedule the three concurrently

    flist = [factorial("A", 2),
        factorial("B", 3),
        factorial("C", 4)]

    await asyncio.gather(*flist, tq(len(flist)))

asyncio.run(main())

...但这只是完成tqdm进度条,然后处理阶乘。

有没有办法在每个asyncio任务完成后让进度条移动?

5个回答

17

截至tqdm版本4.48.0, 可以使用tqdm.asyncio.tqdm.as_completed()

import tqdm.asyncio
...
for f in tqdm.asyncio.tqdm.as_completed(flist):
    await f

现在你甚至可以使用 tqdm.asyncio.gather(*flist)。也就是说,只需要在常规的 asyncio.gather 函数前加上 tqdm 前缀即可。 - Danferno
1
@Danferno,这是tqdm.asyncio模块中 tqdm_asyncio 类的 gather 类方法。使用方法如下:首先要从tqdm.asyncio模块导入tqdm_asyncio,然后再使用tqdm_asyncio.gather(*flist) - SolomidHero
tqdm_asyncio.gather(*flist)的效果非常好!非常感谢@SolomidHero。 - prrao

8

现在,我对asyncho不是特别熟悉,但我曾经成功地在Python的多进程中使用tqdm。 以下代码更改似乎可以同时更新进度条和打印结果,这可能足以让您开始。

responses = [await f
                 for f in tqdm.tqdm(asyncio.as_completed(flist), total=len(flist))]

在您的main定义中,上述代码应替换为await asyncio.gather(*flist, tq(len(flist)))

如需更多信息,请参考asyncio aiohttp progress bar with tqdm

若只需打印进度条一次并更新它,可以执行以下操作,将进度条的描述更新为包含您的消息:

import asyncio
import tqdm


async def factorial(name, number):
    f = 1
    for i in range(2, number + 1):
        await asyncio.sleep(1)
        f *= i
    return f"Task {name}: factorial {number} = {f}"

async def tq(flen):
    for _ in tqdm.tqdm(range(flen)):
        await asyncio.sleep(0.1)


async def main():
    # Schedule the three concurrently

    flist = [factorial("A", 2),
             factorial("B", 3),
             factorial("C", 4)]

    pbar = tqdm.tqdm(total=len(flist))
    for f in asyncio.as_completed(flist):
        value = await f
        pbar.set_description(value)
        pbar.update()

if __name__ == '__main__':
    asyncio.run(main())

这个可以 @Dragos,但是它会在每个阶乘之后分三个步骤打印进度条。有没有一种方法可以保持tqdm进度条的覆盖,而不是分为三个步骤? - reservoirinvest
@reservoirinvest 我已经更新了我的答案,以便只有一个进度条。这意味着您的打印语句将成为进度条上的描述。我认为一开始有3个进度条的原因是打印操作,根据我之前使用tqdm时看到的情况。 - afterburner

4

在Dragos的代码中进行了一些小改动,使用了pbar格式并使用tqdm.write()几乎得到了我想要的结果,如下所示:

import asyncio
import random

import tqdm


async def factorial(name, number):
    f = 1
    for i in range(2, number + 1):
        await asyncio.sleep(random.random())
        f *= i
    return f"Task {name}: factorial {number} = {f}"

async def tq(flen):
    for _ in tqdm.tqdm(range(flen)):
        await asyncio.sleep(0.1)


async def main():

    flist = [factorial("A", 2),
             factorial("B", 3),
             factorial("C", 4)]

    pbar = tqdm.tqdm(total=len(flist), position=0, ncols=90)
    for f in asyncio.as_completed(flist):
        value = await f
        pbar.set_description(desc=value, refresh=True)
        tqdm.tqdm.write(value)
        pbar.update()

if __name__ == '__main__':
    asyncio.run(main())

3

正如Danferno在他的评论中所说,现在有一种更简单的解决方案,您只需使用tqdm提供的gather函数替换asyncio gather即可。您的代码可以改为:

import asyncio
from tqdm.asyncio import tqdm
import random

async def factorial(name, number):
    f = 1
    for i in range(2, number+1):
        await asyncio.sleep(random.random())
        f *= i
    print(f"Task {name}: factorial {number} = {f}")


async def main():
    # Schedule the three concurrently

    flist = [factorial("A", 2),
        factorial("B", 3),
        factorial("C", 4)]

    await tqdm.gather(*flist)

asyncio.run(main())

0

这里是一个关于TQDM的异步包装器,以返回有序结果:

import asyncio
from typing import Any, Coroutine, Iterable, List, Tuple

from tqdm import tqdm


async def aprogress(tasks: Iterable[Coroutine], **pbar_kws: Any) -> List[Any]:
    """Runs async tasks with a progress bar and returns an ordered result."""

    if not tasks:
        return []

    async def tup(idx: int, task: Coroutine) -> Tuple[int, Any]:
        """Returns the index and result of a task."""
        return idx, await task

    _tasks = [tup(i, t) for i, t in enumerate(tasks)]
    pbar = tqdm(asyncio.as_completed(_tasks), total=len(_tasks), **pbar_kws)
    res = [await t for t in pbar]
    return [r[1] for r in sorted(res, key=lambda r: r[0])]


if __name__ == "__main__":

    import random

    async def test(idx: int) -> Tuple[int, int]:
        sleep = random.randint(0, 5)
        await asyncio.sleep(sleep)
        return idx, sleep

    _tasks = [test(i) for i in range(10)]
    _res = asyncio.run(aprogress(_tasks, desc="pbar test"))
    print(_res)

完整源代码


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接