如何在使用joblib进行并行执行时使用tqdm?

73

我想使用joblib并行运行一个函数,并等待所有并行节点完成,就像这个例子一样:

from math import sqrt
from joblib import Parallel, delayed
Parallel(n_jobs=2)(delayed(sqrt)(i ** 2) for i in range(10))

但是,我想要执行过程可以像tqdm一样在单个进度条中显示,显示已完成的作业数量。

你会如何做到这一点?


也许这个网站可以帮到你:http://danshiebler.com/2016-09-14-parallel-progress-bar/ - Tejas Shetty
请查看下面 niedakh 的翻译! - mat.viguier
9个回答

59

只需将 range(10) 放入 tqdm(...) 中即可!对你来说,这可能看起来太好以至于不可信,但它确实有效(在我的机器上):

from math import sqrt
from joblib import Parallel, delayed  
from tqdm import tqdm  
result = Parallel(n_jobs=2)(delayed(sqrt)(i ** 2) for i in tqdm(range(100000)))

20
只有在过程开始时,而不是完成时,才会显示进度:Parallel(n_jobs=10)(delayed(time.sleep)(i ** 2) for i in tqdm(range(10))) - Davidmh
2
它可以工作,但不适用于字符串列表等... 我也尝试过使用 iter 包装列表... - curious95
@curious95 尝试将列表放入生成器中,以下方法对我有效:`from math import sqrt from joblib import Parallel, delayed import multiprocessing
from tqdm import tqdm
rng = range(100000) rng = ['a','b','c','d'] for j in range(20): rng += rng def get_rng(): i = 0 for i in range(len(rng)): yield rng[i]result = Parallel(n_jobs=2)(delayed(sqrt)(len(i) ** 2) for i in tqdm(get_rng()))`
- tyrex
2
在另一个问题中,有一个非常优雅的解决方案来解决这个问题。 - Christian Steinmeyer
5
这行代码行不通,tqdm 会立即到达 100%。 - anilbey

48

我创建了pqdm,这是一个使用并发 futures 的并行 tqdm 包装器,让你轻松完成任务,欢迎尝试!

安装方法:

pip install pqdm

并使用

from pqdm.processes import pqdm
# If you want threads instead:
# from pqdm.threads import pqdm

args = [1, 2, 3, 4, 5]
# args = range(1,6) would also work

def square(a):
    return a*a

result = pqdm(args, square, n_jobs=2)

很不幸,这对我来说失败了。我不确定为什么,但看起来 pqdm 不会等到函数调用结束。我现在没有时间创建一个 MWE。还是谢谢你的努力(和 +1)。 - Yair Daon
@YairDaon 或许可以尝试使用有界执行器,尝试在 pqdm 中添加 bounded=True - niedakh
它能在列表推导式中工作吗? - Sterling
有趣的库。您能告诉我们它与在 https://github.com/swansonk14/p_tqdm 找到的p_tqdm有何不同或更好吗? 似乎后者更为发达。 - xApple
pqdm做了类似的事情,但pqdm不依赖于pathos,您可以轻松地交换tqdm变体(例如使用slack_tqdm或discord_tqdm而不是主要的tqdm.auto)。不确定您所说的更发达是什么意思,我喜欢简单任务的简单工具,因此pqdm尽可能轻量级,尽可能少依赖。 - niedakh
显示剩余2条评论

26
修改nth的出色答案以允许动态标志使用TQDM或不使用,并提前指定总数,以便状态栏正确填充。
from tqdm.auto import tqdm
from joblib import Parallel

class ProgressParallel(Parallel):
    def __init__(self, use_tqdm=True, total=None, *args, **kwargs):
        self._use_tqdm = use_tqdm
        self._total = total
        super().__init__(*args, **kwargs)

    def __call__(self, *args, **kwargs):
        with tqdm(disable=not self._use_tqdm, total=self._total) as self._pbar:
            return Parallel.__call__(self, *args, **kwargs)

    def print_progress(self):
        if self._total is None:
            self._pbar.total = self.n_dispatched_tasks
        self._pbar.n = self.n_completed_tasks
        self._pbar.refresh()

对于任何像我一样想知道的人,将此作为一个可选功能添加到Joblib本身已在问题跟踪器上讨论过:https://github.com/joblib/joblib/issues/972 - shadowtalker

18

如上所述,仅包装传递给joblib.Parallel()的可迭代对象的解决方案并不能真正地监视执行进度。因此,我建议子类化Parallel并覆盖print_progress()方法,方法如下:

import joblib
from tqdm.auto import tqdm

class ProgressParallel(joblib.Parallel):
    def __call__(self, *args, **kwargs):
        with tqdm() as self._pbar:
            return joblib.Parallel.__call__(self, *args, **kwargs)

    def print_progress(self):
        self._pbar.total = self.n_dispatched_tasks
        self._pbar.n = self.n_completed_tasks
        self._pbar.refresh()

11
不需要安装额外的包。您可以在contrib.concurrent中使用tqdm的原生支持: https://tqdm.github.io/docs/contrib.concurrent/
from tqdm.contrib.concurrent import process_map
# If you want threads instead:
# from tqdm.contrib.concurrent import thread_map
import time

args = range(5)

def square(a):
    time.sleep(a)
    return a*a

result = process_map(square, args, max_workers=2)

1
我不知道为什么没有赞。这是做这种事情的最佳答案之一。 - El Stun
1
迄今为止最简单的解决方案 - jerpint

7
这里有一个可能的解决方法:
def func(x):
    time.sleep(random.randint(1, 10))
    return x

def text_progessbar(seq, total=None):
    step = 1
    tick = time.time()
    while True:
        time_diff = time.time()-tick
        avg_speed = time_diff/step
        total_str = 'of %n' % total if total else ''
        print('step', step, '%.2f' % time_diff, 
              'avg: %.2f iter/sec' % avg_speed, total_str)
        step += 1
        yield next(seq)

all_bar_funcs = {
    'tqdm': lambda args: lambda x: tqdm(x, **args),
    'txt': lambda args: lambda x: text_progessbar(x, **args),
    'False': lambda args: iter,
    'None': lambda args: iter,
}

def ParallelExecutor(use_bar='tqdm', **joblib_args):
    def aprun(bar=use_bar, **tq_args):
        def tmp(op_iter):
            if str(bar) in all_bar_funcs.keys():
                bar_func = all_bar_funcs[str(bar)](tq_args)
            else:
                raise ValueError("Value %s not supported as bar type"%bar)
            return Parallel(**joblib_args)(bar_func(op_iter))
        return tmp
    return aprun

aprun = ParallelExecutor(n_jobs=5)

a1 = aprun(total=25)(delayed(func)(i ** 2 + j) for i in range(5) for j in range(5))
a2 = aprun(total=16)(delayed(func)(i ** 2 + j) for i in range(4) for j in range(4))
a2 = aprun(bar='txt')(delayed(func)(i ** 2 + j) for i in range(4) for j in range(4))
a2 = aprun(bar=None)(delayed(func)(i ** 2 + j) for i in range(4) for j in range(4))

这是一个漫步,但进度条只在任务被分派时更新。更好的时间更新进度条是任务完成的时间。 - Vision

6
我创建了 tqdm_joblib 来解决这个问题。
安装:pip install tqdm-joblib 从自述文件中:
简单的代码片段可以从https://dev59.com/FF8f5IYBdhLWcg3wJf0H#58936697复制,以供简单重用。
from joblib import Parallel, delayed
from tqdm_joblib import tqdm_joblib

with tqdm_joblib(desc="My calculation", total=10) as progress_bar:
    Parallel(n_jobs=16)(delayed(sqrt)(i**2) for i in range(10))

这绝对是最简单的方法。作为建议,您可以编辑您的答案并提及如何安装它:从https://pypi.org/project/tqdm-joblib/安装`pip install tqdm-job`。 - Henry Navarro

2
如果你的问题由许多部分组成,可以将这些部分分成k个子组,每个子组并行运行,并在之间更新进度条,从而导致进度的k次更新。
下面是文档中的示例:
>>> with Parallel(n_jobs=2) as parallel:
...    accumulator = 0.
...    n_iter = 0
...    while accumulator < 1000:
...        results = parallel(delayed(sqrt)(accumulator + i ** 2)
...                           for i in range(5))
...        accumulator += sum(results)  # synchronization barrier
...        n_iter += 1

https://pythonhosted.org/joblib/parallel.html#reusing-a-pool-of-workers


3
这如何回答关于“单个进度条”的问题? - Nikos Alexandris
1
这绝对不是关于进度条的问题的答案。 - ForceBru

1
其他答案,包括 user394430nth 的课程对我都没有用。
但是来自similar questionthis answer完美地解决了我的问题。为了方便重新发布。
import contextlib
import joblib
from tqdm import tqdm

@contextlib.contextmanager
def tqdm_joblib(tqdm_object):
    """Context manager to patch joblib to report into tqdm progress bar given as argument"""
    class TqdmBatchCompletionCallback(joblib.parallel.BatchCompletionCallBack):
        def __call__(self, *args, **kwargs):
            tqdm_object.update(n=self.batch_size)
            return super().__call__(*args, **kwargs)

    old_batch_callback = joblib.parallel.BatchCompletionCallBack
    joblib.parallel.BatchCompletionCallBack = TqdmBatchCompletionCallback
    try:
        yield tqdm_object
    finally:
        joblib.parallel.BatchCompletionCallBack = old_batch_callback
        tqdm_object.close()

然后将其包装为上下文管理器。
from math import sqrt
from joblib import Parallel, delayed

with tqdm_joblib(tqdm(desc="My calculation", total=10)) as progress_bar:
    Parallel(n_jobs=16)(delayed(sqrt)(i**2) for i in range(10))

与以下版本兼容:
  • joblib - 1.2.0
  • tqdm - 4.64.1
  • python - 3.9.13

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接