在Python中使用click.progressbar和multiprocessing

9

我有一个大列表需要处理,但是需要花费一定的时间,因此我将其分成4个部分,并使用一些函数对每个部分进行多进程处理。即使使用四核处理器,它仍需要一些时间才能运行,所以我想在函数中添加一些进度条,以便它可以告诉我每个处理器在处理列表中的位置。

我的梦想是拥有像这样的东西:

erasing close atoms, cpu0  [######..............................]  13%
erasing close atoms, cpu1  [#######.............................]  15%
erasing close atoms, cpu2  [######..............................]  13%
erasing close atoms, cpu3  [######..............................]  14%

随着函数循环的进展,每个条形图都会移动。但实际上,我得到了一种连续的流:

enter image description here

等等,充满了我的终端窗口。

这是调用该函数的主要Python脚本:

from eraseCloseAtoms import *
from readPDB import *
import multiprocessing as mp
from vectorCalc import *

prot, cell = readPDB('file')
atoms = vectorCalc(cell)

output = mp.Queue()

# setup mp to erase grid atoms that are too close to the protein (dmin = 2.5A)
cpuNum = 4
tasks = len(atoms)
rangeSet = [tasks / cpuNum for i in range(cpuNum)]
for i in range(tasks % cpuNum):
    rangeSet[i] += 1

rangeSet = np.array(rangeSet)

processes = []
for c in range(cpuNum):
    na, nb = (int(np.sum(rangeSet[:c] + 1)), int(np.sum(rangeSet[:c + 1])))
    processes.append(mp.Process(target=eraseCloseAtoms, args=(prot, atoms[na:nb], cell, 2.7, 2.5, output)))

for p in processes:
    p.start()

results = [output.get() for p in processes]

for p in processes:
    p.join()

atomsNew = results[0] + results[1] + results[2] + results[3]

以下是函数eraseCloseAtoms()的内容:
import numpy as np
import click


def eraseCloseAtoms(protein, atoms, cell, spacing=2, dmin=1.4, output=None):
    print 'just need to erase close atoms'

    if dmin > spacing:
        print 'the spacing needs to be larger than dmin'
        return

    grid = [int(cell[0] / spacing), int(cell[1] / spacing), int(cell[2] / spacing)]

    selected = list(atoms)
    with click.progressbar(length=len(atoms), label='erasing close atoms') as bar:
        for i, atom in enumerate(atoms):
            bar.update(i)
            erased = False
            coord = np.array(atom[6])

            for ix in [-1, 0, 1]:
                if erased:
                    break
                for iy in [-1, 0, 1]:
                    if erased:
                        break
                    for iz in [-1, 0, 1]:
                        if erased:
                            break
                        for j in protein:
                            protCoord = np.array(protein[int(j)][6])
                            trueDist = getMinDist(protCoord, coord, cell, vectors)
                            if trueDist <= dmin:
                                selected.remove(atom)
                                erased = True
                                break
    if output is None:
        return selected
    else:
        output.put(selected)

1
这里有一个示例仓库,你可能会感兴趣:https://github.com/aaren/multi_progress - Nick Chammas
5个回答

7

1
是的,tqdm最近已经添加了对并行进度条的支持,正如OP在这里[http://stackoverflow.com/questions/22811162/python-mulitprocessing-queue-isnt-keeping-all-the-workers-busy/37169157#37169157]和[https://dev59.com/SHA75IYBdhLWcg3wdIv7#37499872]所要求的那样,而且不使用curses或GUI,只使用标准控制字符。 - gaborous

6

我看到你的代码有两个问题。

第一个问题解释了为什么你的进度条经常显示100%而不是它们的真实进度。你调用了bar.update(i)来将进度条的进度向前推进了i步,但我认为你想更新一步。更好的方法是将可迭代对象传递给progressbar函数,让它自动进行更新:

with click.progressbar(atoms, label='erasing close atoms') as bar:
    for atom in bar:
        erased = False
        coord = np.array(atom[6])

        # ...

然而,由于你的代码存在第二个问题,这仍然无法处理同时迭代多个进程的情况,每个进程都有自己的进度条。 click.progressbar文档指出了以下限制:

不能进行任何打印操作,否则进度条将被意外破坏。

这意味着每当其中一个进度条更新时,它会破坏所有其他活动进度条。
我认为这没有简单的解决方法。交互式地更新多行控制台输出非常困难(你基本上需要使用curses或类似的“控制台GUI”库,并得到操作系统的支持)。click模块没有这种能力,它只能更新当前行。你最好的希望可能是扩展click.progressbar设计以在列中输出多个进度条,例如:
CPU1: [######      ] 52%   CPU2: [###        ] 30%    CPU3: [########  ] 84%

这需要相当数量的代码才能实现(特别是当更新来自多个进程时),但并非完全不切实际。

1

对于后来的任何人。我创建了这个,似乎工作得很好。它相当最小地覆盖了click.ProgressBar,尽管我不得不为方法底部的几行代码覆盖整个方法。这是使用\x1b[1A\x1b[2K在重写进度条之前清除它们,因此可能会受到环境的影响。

#!/usr/bin/env python
import time
from typing import Dict

import click
from click._termui_impl import ProgressBar as ClickProgressBar, BEFORE_BAR
from click._compat import term_len


class ProgressBar(ClickProgressBar):
    def render_progress(self, in_collection=False):
        # This is basically a copy of the default render_progress with the addition of in_collection
        # param which is only used at the very bottom to determine how to echo the bar
        from click.termui import get_terminal_size

        if self.is_hidden:
            return

        buf = []
        # Update width in case the terminal has been resized
        if self.autowidth:
            old_width = self.width
            self.width = 0
            clutter_length = term_len(self.format_progress_line())
            new_width = max(0, get_terminal_size()[0] - clutter_length)
            if new_width < old_width:
                buf.append(BEFORE_BAR)
                buf.append(" " * self.max_width)
                self.max_width = new_width
            self.width = new_width

        clear_width = self.width
        if self.max_width is not None:
            clear_width = self.max_width

        buf.append(BEFORE_BAR)
        line = self.format_progress_line()
        line_len = term_len(line)
        if self.max_width is None or self.max_width < line_len:
            self.max_width = line_len

        buf.append(line)
        buf.append(" " * (clear_width - line_len))
        line = "".join(buf)
        # Render the line only if it changed.

        if line != self._last_line and not self.is_fast():
            self._last_line = line
            click.echo(line, file=self.file, color=self.color, nl=in_collection)
            self.file.flush()
        elif in_collection:
            click.echo(self._last_line, file=self.file, color=self.color, nl=in_collection)
            self.file.flush()


class ProgressBarCollection(object):
    def __init__(self, bars: Dict[str, ProgressBar], bar_template=None, width=None):
        self.bars = bars
        if bar_template or width:
            for bar in self.bars.values():
                if bar_template:
                    bar.bar_template = bar_template
                if width:
                    bar.width = width

    def __enter__(self):
        self.render_progress()
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.render_finish()

    def render_progress(self, clear=False):
        if clear:
            self._clear_bars()
        for bar in self.bars.values():
            bar.render_progress(in_collection=True)

    def render_finish(self):
        for bar in self.bars.values():
            bar.render_finish()

    def update(self, bar_name: str, n_steps: int):
        self.bars[bar_name].make_step(n_steps)
        self.render_progress(clear=True)

    def _clear_bars(self):
        for _ in range(0, len(self.bars)):
            click.echo('\x1b[1A\x1b[2K', nl=False)


def progressbar_collection(bars: Dict[str, ProgressBar]):
    return ProgressBarCollection(bars, bar_template="%(label)s  [%(bar)s]  %(info)s", width=36)


@click.command()
def cli():
    with click.progressbar(length=10, label='bar 0') as bar:
        for i in range(0, 10):
            time.sleep(1)
            bar.update(1)
    click.echo('------')
    with ProgressBar(iterable=None, length=10, label='bar 1', bar_template="%(label)s  [%(bar)s]  %(info)s") as bar:
        for i in range(0, 10):
            time.sleep(1)
            bar.update(1)
    click.echo('------')
    bar2 = ProgressBar(iterable=None, length=10, label='bar 2')
    bar3 = ProgressBar(iterable=None, length=10, label='bar 3')
    with progressbar_collection({'bar2': bar2, 'bar3': bar3}) as bar_collection:
        for i in range(0, 10):
            time.sleep(1)
            bar_collection.update('bar2', 1)
        for i in range(0, 10):
            time.sleep(1)
            bar_collection.update('bar3', 1)


if __name__ == "__main__":
    cli()

0

也许不是你梦想中的方式,但你可以使用imap_unorderedclick.progressbar与多进程集成。

import multiprocessing as mp
import click
import time


def proc(arg):
    time.sleep(arg)
    return True

def main():
    p = mp.Pool(4)
    args = range(4)
    results = p.imap_unordered(proc, args)
    with click.progressbar(results, length=len(args)) as bar:
        for result in bar:
            pass

if __name__ == '__main__:
    main()

0

如果您可以接受只有一个进度条,那么类似这样的代码就可以工作:

import click
import threading
import numpy as np

reallybiglist = []
numthreads = 4

def myfunc(listportion, bar):
    for item in listportion:
        # do a thing
        bar.update(1)

with click.progressbar(length=len(reallybiglist), show_pos=True) as bar:
    threads = []
    for listportion in np.split(reallybiglist, numthreads):
        thread = threading.Thread(target=myfunc, args=(listportion, bar))
        thread.start()
        threads.append(thread)

    for thread in threads:
        thread.join()

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接