在多进程池中运行代码

3

如何处理multiprocessing.Pool()初始化错误?

简而言之,我使用:
pool = multiprocessing.Pool(os.cpu_count(), initializer=init)

def init(): 
   # stop everything if that fails with exception
   # init..

相关问题如何处理multiprocessing.Pool中的初始化错误?

但它没有展示如何干净地退出主进程。

是否可能从池中获取每个Process实例并在它们中的每一个上运行一个函数? 请注意,这不同于pool.map,因为我不知道我的函数将在哪个进程中运行。


这并不是Pool设计支持的内容,例如maxtasksperchild会导致池循环,因此进程不应保持稳定。每个进程是否可以单独失败?即为什么不只是将某些成功函数映射到几个进程上。 - Sam Mason
我该如何确保map在每个进程上运行? - user3599803
1个回答

0

使用队列来监听初始化失败怎么样?

例如:

from multiprocessing import Pool, Manager
from random import random

# an initialisation function that fails 10% of the time
def init(initdone):
    try:
        # fail sometimes!
        assert random() < 0.9
    except Exception as err:
        # record the failure
        initdone.put(err)
    finally:
        # record that initialisation was successful
        initdone.put(None)

# we need a manager to maintain the queue
with Manager() as manager:
    # somewhere to store initialisation state
    initresult = manager.Queue()

    # number of workers we want to wait for
    nprocs = 5

    with Pool(nprocs, initializer=init, initargs=(initresult,)) as pool:
        # wait for initializers to run
        for i in range(nprocs):
            res = initresult.get()
            # reraise if it failed (or whatever logic is appropriate)
            if res is not None:
                raise res

        # do something now we've got this pool set up
        print(sum(pool.map(int, range(20))))

我应该指出,multiprocessing 可以重新启动失败的工作进程,因此您可能会在队列中获得多于 nprocs 个条目。在 OP 的情况下,这可能是可以接受的,因为他们说他们想在这种情况下中止,但在其他情况下可能不是这样。

我想不到任何方式可以使队列中的条目少于 nprocs,如果您发现,请评论!请注意,我故意没有在 init 中捕获 BaseException,因为代码中的 multiprocessing(以及其他一些代码)已经处理了它,并且让它做正确的事情似乎是明智的。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接