如何重用多进程池?

7
下面是我现在的代码。它似乎运行得很好。然而,我并不完全理解它。我认为如果没有 .join(),我会冒着代码在池完成执行之前进入下一个 for 循环的风险。难道我们不需要那三行被注释掉的代码吗?
另一方面,如果我选择使用 .close().join() 的方式,有没有办法重新打开那个已关闭的池,而不是每次都使用 Pool(6)
import multiprocessing as mp
import random as rdm
from statistics import stdev, mean
import time


def mesh_subset(population, n_chosen=5):
    chosen = rdm.choices(population, k=n_chosen)
    return mean(chosen)


if __name__ == '__main__':
    population = [x for x in range(20)]
    N_iteration = 10
    start_time = time.time()
    pool = mp.Pool(6)
    for i in range(N_iteration):
        print([round(x,2) for x in population])
        print(stdev(population))
        # pool = mp.Pool(6)
        population = pool.map(mesh_subset, [population]*len(population))
        # pool.close()
        # pool.join()
    print('run time:', time.time() - start_time)
1个回答

9

建立一个工作池是相对昂贵的,因此应该(如果可能的话)仅在脚本开始时完成一次。

pool.map 命令会阻塞直到所有任务都完成。在完成后,它会返回结果列表。除非 mesh_subset 已被调用并为每个输入返回了结果,否则它无法执行此操作。相比之下,像 pool.apply_async 这样的方法不会阻塞。 apply_async 返回一个 ApplyResult 对象,其中包含一个 get 方法,该方法会阻塞直到从工作进程获取结果。

pool.close 设置工作处理程序的状态 为关闭。这会导致处理程序向工作进程发送终止信号(signal the workers)

pool.join 会阻塞直到所有工作进程已终止。

因此,在使用工作池时,不需要在完成后调用pool.closepool.join,也不应该这样做。一旦工作进程收到终止信号(通过pool.close),就没有办法“重新打开”它们。您需要启动一个新的工作池。


在您的情况下,由于您希望循环等待直到所有任务完成,因此使用pool.apply_async而不是pool.map没有任何优势。但是,如果使用pool.apply_async,您可以通过调用get来获得与之前相同的结果,而无需关闭和重新启动池:

# you could do this, but using pool.map is simpler
for i in range(N_iteration):
    apply_results = [pool.apply_async(mesh_subset, [population]) for i in range(len(population))]
    # the call to result.get() blocks until its worker process (running
    # mesh_subset) returns a value
    population = [result.get() for result in apply_results]

当循环完成时,len(population)不会改变。
如果你不希望每个循环都阻塞直到所有任务完成,你可以使用apply_asynccallback特性:
N_pop = len(population)
result = []
for i in range(N_iteration):
    for i in range(N_pop):
        pool.apply_async(mesh_subset, [population]),
                         callback=result.append)
pool.close()
pool.join()
print(result)

现在,当任何一个mesh_subset返回一个return_value时,将调用result.append(return_value)。对apply_async的调用不会阻塞,所以N_iteration * N_pop个任务一次性全部推入pool的任务队列中。但由于池子有6个工人,每次最多只有6个mesh_subset被执行。随着工人完成任务,最先完成任务的工人调用result.append(return_value)。因此,result中的值是无序的。这与pool.map不同,后者返回一个列表,其返回值与相应的参数列表的顺序相同。
除非出现异常,否则result最终将包含N_iteration * N_pop个返回值,一旦所有任务都完成。上面使用了pool.close()pool.join()来等待所有任务完成。

1
谢谢,@unutbu,我只是想测试一下自己是否理解了:如果我使用了pool.wait_async,但仍然不希望代码在所有进程完成运行之前超过某一行,那么我需要执行pool.closepool.join,在这种情况下,我需要在每个循环中重新构建池,即pool = mp.Pool(6)吗? - Indominus
为什么population最终会包含N_iteration * len(population)个值?在我的代码(和意图)中,每次迭代后人口的大小不会改变。每次迭代的操作是:随机抽取几个(默认为5个,请参见我的编辑),并取平均值形成新人口的一个成员,重复此过程,直到新人口的大小与当前人口相同。 - Indominus
我修改了上面的代码,所以回调现在调用result.append而不是population.append--以避免使用population.append会导致的竞争条件。mesh_subset可能会收到更改后的population - unutbu
1
我看到了你的建议修改,但不得不拒绝它,因为它不能按预期工作。由于 pool.apply_async 不会阻塞,调用 population = result 可能会将 population 分配给部分或空列表。 - unutbu
1
故事的寓意是:使用多进程实现高速运算的关键在于让工作进程尽可能地完成更多的工作,减少进程间通信的次数。如果工作进程完成的工作很少(即函数很快就结束了),而且进程间通信很频繁,那么你的多进程代码可能比单进程串行版本的计算还要慢。 - unutbu
显示剩余9条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接