如何在concurrent.futures中使用tqdm?

95

我有一个多线程函数,我想使用tqdm来显示状态栏。是否有一种简单的方法可以在ThreadPoolExecutor中显示状态栏呢?让我感到困惑的是它的并行化部分。

import concurrent.futures

def f(x):
    return f**2

my_iter = range(1000000)

def run(f,my_iter):
    with concurrent.futures.ThreadPoolExecutor() as executor:
        function = list(executor.map(f, my_iter))
    return results

run(f, my_iter) # wrap tqdr around this function?

2
你可以使用 from tqdm.contrib.concurrent import process_map,参见 https://dev59.com/8lgQ5IYBdhLWcg3w-44d#59905309 - dina
6个回答

132

您可以将tqdm包装在executor的周围,如下所示,以跟踪进度:

list(tqdm(executor.map(f, iter), total=len(iter)))

这是您的示例:

import time  
import concurrent.futures
from tqdm import tqdm

def f(x):
    time.sleep(0.001)  # to visualize the progress
    return x**2

def run(f, my_iter):
    with concurrent.futures.ThreadPoolExecutor() as executor:
        results = list(tqdm(executor.map(f, my_iter), total=len(my_iter)))
    return results

my_iter = range(100000)
run(f, my_iter)

结果是这样的:

16%|██▏           | 15707/100000 [00:00<00:02, 31312.54it/s]

2
谢谢!关键似乎是在tqdm周围加上list(),为什么会这样呢? - dreamflasher
5
那种行为是因为tqdm在执行时运行。Executor.map本身只是一个生成器。 - R4h4
4
这样,你将无法立即获得输出!所以你必须等待整个进程完成,直到你看到完整的结果! - αԋɱҽԃ αмєяιcαη
1
在tqdm中,total参数非常重要。如果没有它,我们无法看到整体进度。 - jdhao
1
为了按顺序获取结果(并相应地更新tqdm),请使用multiprocessing.pool.ThreadPool.imap而不是concurrent.futures.ThreadPoolExecutor.map(它具有一些注意事项)。 - ddelange
显示剩余3条评论

77

接受的答案存在问题,因为ThreadPoolExecutor.map函数必须按照可用结果的顺序生成结果。所以如果第一次调用myfunc恰好是最后一个完成的调用,进度条将会一次性从0%跳到100%,只有当所有调用都完成时才会这样。更好的方法是使用ThreadPoolExecutor.submitas_completed:

import time
import concurrent.futures
from tqdm import tqdm

def f(x):
    time.sleep(0.001)  # to visualize the progress
    return x**2

def run(f, my_iter):
    l = len(my_iter)
    with tqdm(total=l) as pbar:
        # let's give it some more threads:
        with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
            futures = {executor.submit(f, arg): arg for arg in my_iter}
            results = {}
            for future in concurrent.futures.as_completed(futures):
                arg = futures[future]
                results[arg] = future.result()
                pbar.update(1)
    print(321, results[321])

my_iter = range(100000)
run(f, my_iter)

打印:

321 103041

这只是大致的想法。根据my_iter的类型,可能无法直接使用len函数对其进行操作,而不事先将其转换为列表。主要是要使用submitas_completed


1
只是想提一下,通过轻微的修改(将代码移动到 def main() 中),这段代码同样适用于 ProcessPoolExecutor,如果 f(x) 实际上进行了计算,那么它可以更快,因为它不受全局解释器锁的影响。 - leopold.talirz
4
有人刚问了我,这里是为ProcessPoolExecutor调整的示例代码 https://gist.github.com/ltalirz/9220946c5c9fd920a1a2d81ce7375c47 - leopold.talirz
2
这会阻止进度条的时间更新,有没有办法修复它? - Miguel Pinheiro
只需调用update(0)即可更新时间。 - Miguel Pinheiro
因为我花了太长时间才弄明白这个问题:确保你的as_completed()调用在执行器上下文中。捂脸 - undefined
显示剩余3条评论

6

我认为最简单的方法是:

with ThreadPoolExecutor(max_workers=20) as executor:
    results = list(tqdm(executor.map(myfunc, range(len(my_array))), total=len(my_array)))

3

我尝试了这个例子,但进度条仍然失败了,然后我发现了这篇文章,它似乎是一个简短的有用方法:

def tqdm_parallel_map(fn, *iterables):
    """ use tqdm to show progress"""
    executor = concurrent.futures.ProcessPoolExecutor()
    futures_list = []
    for iterable in iterables:
        futures_list += [executor.submit(fn, i) for i in iterable]
    for f in tqdm(concurrent.futures.as_completed(futures_list), total=len(futures_list)):
        yield f.result()


def multi_cpu_dispatcher_process_tqdm(data_list, single_job_fn):
    """ multi cpu dispatcher """
    output = []
    for result in tqdm_parallel_map(single_job_fn, data_list):
        output += result
    return output

1

我发现使用tqdmupdate()方法更直观,我们保持人类可读的结构:

with tqdm(total=len(mylist)) as progress:                         
    with ThreadPoolExecutor() as executor:
        for __ in executor.map(fun, mylist):
            progress.update() # We update the progress bar each time that a job finish

由于我不关心fun的输出,所以我使用__作为临时变量。


在Python中,下划线(_)是一个可丢弃的变量,而不是双下划线(__)。 - undefined
@Emsi 每个有效的变量名都可以是一个临时变量。此外,Python会将shell中的最后一个返回值存储在_中。这在调试程序时有时很有用,所以我总是使用__ - undefined
按照惯例,使用下划线(_)是因为它经常被覆盖,因此不会永久保留,也不会导致垃圾回收问题。请参考: https://stackoverflow.com/questions/5893163/what-is-the-purpose-of-the-single-underscore-variable-in-python - undefined

0
只是对已接受答案的一个补充:
# works
with concurrent.futures.ThreadPoolExecutor() as executor:
   futures = executor.map(f, my_iter)
   result = list(tqdm(futures), total=len(my_iter))

# does NOT work (only updates at the very end)
with concurrent.futures.ThreadPoolExecutor() as executor:
   futures = executor.map(f, my_iter)
result = list(tqdm(futures), total=len(my_iter))

有道理,但我可能不是唯一一个尝试过第二种方法的人...

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接