多进程:使用tqdm显示进度条

224
为了让我的代码更加“pythonic”且更快,我使用multiprocessing和一个map函数来将a)函数和b)迭代范围发送给它。但是,针对嵌入式方案(即直接在范围上调用tqdm.tqdm(range(0, 30)) )无法与多进程一起使用(如下面的代码所示)。进度条显示从0到100%(当Python读取代码时?),但它不会指示Map功能的实际进度。如何显示指示'map'函数步骤的进度条?
from multiprocessing import Pool
import tqdm
import time

def _foo(my_number):
   square = my_number * my_number
   time.sleep(1)
   return square 

if __name__ == '__main__':
   p = Pool(2)
   r = p.map(_foo, tqdm.tqdm(range(0, 30)))
   p.close()
   p.join()

欢迎提供任何帮助或建议...


你能发布进度条的代码片段吗? - Alex
7
对于正在寻找.starmap()解决方案的人:这里提供了一个修补程序,可以添加.istarmap()Pool,它也能够与tqdm一起使用。 - Darkonaut
11个回答

241

使用 imap 替换 map,它会返回一个已处理值的迭代器。

from multiprocessing import Pool
import tqdm
import time

def _foo(my_number):
   square = my_number * my_number
   time.sleep(1)
   return square 

if __name__ == '__main__':
   with Pool(2) as p:
      r = list(tqdm.tqdm(p.imap(_foo, range(30)), total=30))

28
list()语句将等待迭代器结束。由于tqdm无法确定迭代的长度,因此还需要total=。 - hkyi
35
starmap() 有类似的解决方案吗? - mr.tarsa
4
for i in tqdm.tqdm(...): pass 可能更直观,相比之下 list(tqdm.tqdm) - savfod
7
当使用特定的chunk_size时,p.imap的行为会受到影响。能否让tqdm在每次迭代而不是每个块更新进度条? - huangbiubiu
6
我认为这个解决方案不能正常工作。它几乎一直停留在0%,突然间就跳到了100%。 - Carlos Souza
显示剩余8条评论

223

很抱歉晚了,但是如果你所需要的仅仅是一个并发映射表,我在 tqdm>=4.42.0 中添加了这个功能:

from tqdm.contrib.concurrent import process_map  # or thread_map
import time

def _foo(my_number):
   square = my_number * my_number
   time.sleep(1)
   return square 

if __name__ == '__main__':
   r = process_map(_foo, range(0, 30), max_workers=2)

参考资料: https://tqdm.github.io/docs/contrib.concurrent/https://github.com/tqdm/tqdm/blob/master/examples/parallel_bars.py

它支持max_workerschunksize,你还可以轻松地从process_map转换为thread_map


3
@Xudong process_map 函数创建、运行、关闭/合并并返回一个列表。 - casper.dcl
2
太好了!我很高兴找到它。还有一个问题,当我在jupyter笔记本中使用它时,它的效果不是很好。我知道有一个tqdm.notebook,有没有办法将它们合并? - jlconlin
2
这将无条件地复制迭代参数,而其他方法似乎是按需复制。 - Passer By
4
嗯。。。进度条卡在零的时候完成了。 - Sterling
5
@jlconlin @Vladimir Vargas ,今天在 Jupyter Notebook 中,如果我像这样执行 thread_map(fn, *iterables, tqdm_class=tqdm.notebook.tqdm, max_workers=12) 操作,我不会遇到任何问题。 - snooze92
显示剩余11条评论

101

找到解决方案。请注意!由于多进程,估计时间(每个循环迭代,总时间等)可能不稳定,但进度条完美工作。

注意:只有在Python 3.3+中才能使用Pool上下文管理器。

from multiprocessing import Pool
import time
from tqdm import *

def _foo(my_number):
   square = my_number * my_number
   time.sleep(1)
   return square 

if __name__ == '__main__':
    with Pool(processes=2) as p:
        max_ = 30
        with tqdm(total=max_) as pbar:
            for _ in p.imap_unordered(_foo, range(0, max_)):
                pbar.update()

5
这里是否需要第二个/内部的tqdm调用? - shadowtalker
7
问题中返回的"r"代表了 _foo(my_number) 的输出结果是什么? - Likak
9
starmap() 是否有类似的解决方案? - mr.tarsa
4
@shadowtalker - 似乎不需要分号就能工作。无论如何,“imap_unordered”在这里非常关键,它提供了最佳性能和最佳进度条估计。 - Tomasz Gandor
4
这个解决方案怎样获取结果? - Carlos Souza
显示剩余3条评论

36

您可以使用p_tqdm代替。

https://github.com/swansonk14/p_tqdm

from p_tqdm import p_map
import time

def _foo(my_number):
   square = my_number * my_number
   time.sleep(1)
   return square 

if __name__ == '__main__':
   r = p_map(_foo, list(range(0, 30)))

1
这个非常好用,并且非常容易进行“pip安装”。对于我大部分的需求,它正在取代tqdm。 - crypdick
谢谢Victor ;) - Gabriel Romon
3
p_tqdm 仅限于 multiprocessing.Pool,不适用于线程。 - pateheo
1
@VictorWang,是的,请在num_cpus中使用它,就像这样=> p_map(_foo, list(range(0, 30)), num_cpus=5) - Sreekant Shenoy
1
@VandanRevanur 您可以将kwargs传递给p_tqdm,并将它们转发到tqdm,就像这样:https://github.com/swansonk14/p_tqdm/issues/5 - Victor Quach
显示剩余2条评论

10

基于Xavi Martínez的回答,我编写了函数imap_unordered_bar。它可以与imap_unordered以相同的方式使用,唯一的区别是会显示一个处理进度条。

from multiprocessing import Pool
import time
from tqdm import *

def imap_unordered_bar(func, args, n_processes = 2):
    p = Pool(n_processes)
    res_list = []
    with tqdm(total = len(args)) as pbar:
        for i, res in tqdm(enumerate(p.imap_unordered(func, args))):
            pbar.update()
            res_list.append(res)
    pbar.close()
    p.close()
    p.join()
    return res_list

def _foo(my_number):
    square = my_number * my_number
    time.sleep(1)
    return square 

if __name__ == '__main__':
    result = imap_unordered_bar(_foo, range(5))

4
如需更新同一行,请使用此代码,它将在每个步骤上重绘该行的内容:print('\r' + "更新后的内容", end='')请注意,'\r' 能够将光标移至行首,以便在更新行时覆盖旧内容。 - misantroop
在我的情况下(Windows/Powershell)的解决方案是:Colorama。 - misantroop
1
“pbar.close()不需要,因为在with的终止时它会自动关闭”,就像Sagar在@scipy的答案中所做的评论一样。 - Tejas Shetty

7

对于使用apply_async方法的进度条,我们可以使用以下代码,如下面链接中所建议的:

https://github.com/tqdm/tqdm/issues/484

import time
import random
from multiprocessing import Pool
from tqdm import tqdm

def myfunc(a):
    time.sleep(random.random())
    return a ** 2

pool = Pool(2)
pbar = tqdm(total=100)

def update(*a):
    pbar.update()

for i in range(pbar.total):
    pool.apply_async(myfunc, args=(i,), callback=update)
pool.close()
pool.join()

7
import multiprocessing as mp
import tqdm


iterable = ... 
num_cpu = mp.cpu_count() - 2 # dont use all cpus.


def func():
    # your logic
    ...


if __name__ == '__main__':
    with mp.Pool(num_cpu) as p:
        list(tqdm.tqdm(p.imap(func, iterable), total=len(iterable)))

1

根据“user17242583”的回答,我创建了以下函数。它应该和Pool.map一样快,并且结果总是有序的。此外,您可以传递任意数量的参数给您的函数,而不仅仅是单个可迭代对象。

from multiprocessing import Pool
from functools import partial
from tqdm import tqdm


def imap_tqdm(function, iterable, processes, chunksize=1, desc=None, disable=False, **kwargs):
    """
    Run a function in parallel with a tqdm progress bar and an arbitrary number of arguments.
    Results are always ordered and the performance should be the same as of Pool.map.
    :param function: The function that should be parallelized.
    :param iterable: The iterable passed to the function.
    :param processes: The number of processes used for the parallelization.
    :param chunksize: The iterable is based on the chunk size chopped into chunks and submitted to the process pool as separate tasks.
    :param desc: The description displayed by tqdm in the progress bar.
    :param disable: Disables the tqdm progress bar.
    :param kwargs: Any additional arguments that should be passed to the function.
    """
    if kwargs:
        function_wrapper = partial(_wrapper, function=function, **kwargs)
    else:
        function_wrapper = partial(_wrapper, function=function)

    results = [None] * len(iterable)
    with Pool(processes=processes) as p:
        with tqdm(desc=desc, total=len(iterable), disable=disable) as pbar:
            for i, result in p.imap_unordered(function_wrapper, enumerate(iterable), chunksize=chunksize):
                results[i] = result
                pbar.update()
    return results


def _wrapper(enum_iterable, function, **kwargs):
    i = enum_iterable[0]
    result = function(enum_iterable[1], **kwargs)
    return i, result

1
这里是我的建议,当你需要从并行执行的函数中获取结果时。这个函数会做一些事情(我有另一篇文章对其进行了更详细的解释),但关键点在于有一个待处理任务队列和一个已完成任务队列。当工作人员完成待处理队列中的每个任务时,他们将结果添加到已完成任务队列中。你可以使用tqdm进度条包装对已完成任务队列的检查。我不会在此处放置do_work()函数的实现,因为本文的重点是监视已完成任务队列,并在每次有结果时更新进度条。
def par_proc(job_list, num_cpus=None, verbose=False):

# Get the number of cores
if not num_cpus:
    num_cpus = psutil.cpu_count(logical=False)

print('* Parallel processing')
print('* Running on {} cores'.format(num_cpus))

# Set-up the queues for sending and receiving data to/from the workers
tasks_pending = mp.Queue()
tasks_completed = mp.Queue()

# Gather processes and results here
processes = []
results = []

# Count tasks
num_tasks = 0

# Add the tasks to the queue
for job in job_list:
    for task in job['tasks']:
        expanded_job = {}
        num_tasks = num_tasks + 1
        expanded_job.update({'func': pickle.dumps(job['func'])})
        expanded_job.update({'task': task})
        tasks_pending.put(expanded_job)

# Set the number of workers here
num_workers = min(num_cpus, num_tasks)

# We need as many sentinels as there are worker processes so that ALL processes exit when there is no more
# work left to be done.
for c in range(num_workers):
    tasks_pending.put(SENTINEL)

print('* Number of tasks: {}'.format(num_tasks))

# Set-up and start the workers
for c in range(num_workers):
    p = mp.Process(target=do_work, args=(tasks_pending, tasks_completed, verbose))
    p.name = 'worker' + str(c)
    processes.append(p)
    p.start()

# Gather the results
completed_tasks_counter = 0

with tqdm(total=num_tasks) as bar:
    while completed_tasks_counter < num_tasks:
        results.append(tasks_completed.get())
        completed_tasks_counter = completed_tasks_counter + 1
        bar.update(completed_tasks_counter)

for p in processes:
    p.join()

return results

0
tqdm发布了自己简单、优雅的APIs以支持并发。
我提供以下代码片段作为一个直观的示例,以说明多线程。
from tqdm.contrib.concurrent import thread_map

def f(row):
    x, y = row
    time.sleep(1)  # to visualize the progress

thread_map(f, [(x, y) for x, y in zip(range(1000), range(1000))])

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接