Python多进程池。当一个工作进程确定没有更多的工作需要完成时,如何退出脚本?

10
mp.set_start_method('spawn')
total_count = Counter(0)
pool = mp.Pool(initializer=init, initargs=(total_count,), processes=num_proc)    

pool.map(part_crack_helper, product(seed_str, repeat=4))
pool.close()
pool.join()

我有一个工作进程池,它需要执行一项任务并找到一个解决方案。因此,当其中一个工作进程找到解决方案时,我想停止所有进程。

我想过一种方法就是调用 sys.exit()。但似乎不起作用,因为其他进程在运行。

另一种方法是检查每个进程调用的返回值(part_crack_helper函数的返回值),并在该进程上调用terminate。然而,在使用map函数时,我不知道如何做到这一点。

我应该如何实现这个功能?


看一下 pool.apply_asyncpool.terminate - Bi Rico
2个回答

5
您可以使用 Pool.apply_async 中的回调函数。类似下面这样的代码可以帮助您完成工作。
from multiprocessing import Pool


def part_crack_helper(args):
    solution = do_job(args)
    if solution:
        return True
    else:
        return False


class Worker():
    def __init__(self, workers, initializer, initargs):
        self.pool = Pool(processes=workers, 
                         initializer=initializer, 
                         initargs=initargs)

    def callback(self, result):
        if result:
            print("Solution found! Yay!")
            self.pool.terminate()

    def do_job(self):
        for args in product(seed_str, repeat=4):
            self.pool.apply_async(part_crack_helper, 
                                  args=args, 
                                  callback=self.callback)

        self.pool.close()
        self.pool.join()
        print("good bye")


w = Worker(num_proc, init, [total_count])
w.do_job()

谢谢!还没有尝试过,但看起来很有前途! - whiteSkar

1
如果您愿意使用另一个库,您可以使用Pebble以以下方式解决它。这种解决方案的优点是您还可以指定超时时间。这意味着,如果有一个成功的工作程序或者时间用尽了,程序将结束:
from pebble import ProcessPool, ProcessExpired
from concurrent.futures import TimeoutError
import time

pool = ProcessPool()

def my_function(args):
    print("running " + str(args))
    time.sleep((args + 1) * 30)
    print("process won:" + str(args))
    return True


start_time = time.time()

future = pool.map(my_function, range(4), timeout=65)
iterator = future.result()

while True:
    try:
        result = next(iterator)
        if result:
            pool.stop()
            pool.join(timeout=0)
            break
    except StopIteration:
        break
    except TimeoutError as error:
        print("function took longer than %d seconds" % error.args[1])
    except ProcessExpired as error:
        print("%s. Exit code: %d" % (error, error.exitcode))
    except Exception as error:
        print("function raised %s" % error)
        print(error.traceback)  # Python's traceback of remote process

print("whole time: " + str(time.time() - start_time))

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接