Python多进程-理解每个进程的进度的最佳方法

3
我想了解我的进程进展情况。目前我使用的方法不太有效。以下是一个 mwe:
import time
from multiprocessing import Pool as ProcessPool
import progressbar
import random

def some_random_calculation(n):
    with progressbar.ProgressBar(max_value=n) as bar:
        for i in range(0,n):
            time.sleep(1)
            bar.update(i)

if __name__=='__main__':

    arguments = [random.randint(4,10) for i in range(4)]

    pool = ProcessPool(4)
    results = pool.map_async(some_random_calculation, arguments)
    print(results.get())
    pool.close() 
    pool.join()

在这种情况下,我正在使用progressbar2,但是当有超过1个进程时,输出会在同一行上不断更新: 从图片可以看出,进度条以排序顺序呈现,因为在第一个进度条结束后,其他进程会创建一个新的进度条。当有多个进程时,单个进度条会在同一行上更新。
我正在寻找解决方法,希望能够动态更新n个进度条。然而,也许有更聪明的方法来获取不同进程的进度。有什么建议吗?

3
应该使用一个调节器更新输出,而不是每个线程分别更新。 - Torxed
我认为这个问题,但是我不知道该去哪里寻找。此外,我想限制“主要”和“子进程”之间的连接。@Torxed - Guido Muscioni
1个回答

1
这并不完美,如果您想要完全正确,主题相当复杂。但有一件事是确定的,您应该从子进程外部监视进度。
最快、可能也是最简单的方法是拥有一个调用函数返回状态的功能,而外部的管理者可以让用户了解进展情况。它可能看起来像这样:
import os, signal
from threading import Thread, enumerate as t_enumerate
from time import time, sleep
from random import random

clear = lambda: os.system('cls' if os.name=='nt' else 'clear')

def sig_handler(signal, frame):
    for t in t_enumerate():
        if t.getName() != 'MainThread':
            t.stop()
    exit(0)
signal.signal(signal.SIGINT, sig_handler)

class worker(Thread):
    def __init__(self, init_value=0):
        Thread.__init__(self)
        self.init_value = init_value
        self.progress = 0
        self.run_state = True
        self.start() # Start ourselves instead of from outside.

    def poll(self):
        return self.progress

    def stop(self):
        self.run_state = False

    def run(self):
        main_thread = None
        for t in t_enumerate():
            if t.getName() == 'MainThread':
                main_thread = t
                break

        while main_thread and self.run_state and main_thread.isAlive():
            for i in range(0, 100):
                self.init_value *= i
                self.progress = i
                sleep(random())
            break # Yea kinda unessecary while loop. meh..

workers = [worker(0) for i in range(4)]

while len(t_enumerate()) > 1:
    clear()
    for index, worker_handle in enumerate(workers):
        progress = worker_handle.poll()
        print(f'Thread {index} is at {progress}/100.')
    sleep(1)

另一种方法是每个线程在打印前获取线程池上的锁。但这会增加复杂性,首先,它们都需要在打印时同步,以便它们不会随意获取锁来打印,而您可能正在输出过程的其他部分中打印其他内容。或者它们会以错误的顺序打印,或者您需要跟踪应该回溯重写的哪一行...
可能会有一个更好的答案的线程专家,但这是我的建议。只需添加轮询函数,进行组合状态更新,并使用非常有限的处理能力调用每个线程。除非您有成千上万个线程,否则多次调用不会对性能产生任何影响。

谢谢您的回答,我在运行它时遇到了问题。它会闪现一些错误信息,然后打印出 Thread {index} is at 0/100 - Guido Muscioni
@GuidoMuscioni 你在使用Python2吗?移除 clear() 来查看错误信息。 - Torxed
变量名有问题,我已经修复了。现在它可以正常工作了,我认为没有太多其他的选择。我想加上分隔线,但是它们仍然会一起打印出来。 - Guido Muscioni
@GuidoMuscioni 是的,注意到了,编辑得不错。我不知道在我执行代码和复制粘贴之间如何发生这种变化,但显然是我的问题。可能有点累了呵呵。你可以打印进度条,但应该创建一个包装类或使用你在示例中提到的那个:progressbar.ProgressBar() - Torxed

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接