`multiprocessing`和`concurrent.futures`中的最大工作进程数

4
在Python 3.8中,concurrent.futures.ProcessPoolExecutor已更新以限制在Windows上可使用的最大工作程序(进程)数量为61。关于原因,请参见此处此处,但据我所知:
  • 在Windows上,multiprocessing调用Windows API函数WaitForMultipleObjects,用于等待进程完成。它最多可以等待63个对象,减去结果队列读取器和线程唤醒读取器,因此限制为61。(即Windows使用一个线程跟踪进程)。

(请参见此SO问题

然而,multiprocessing仍然使用os.cpu_count()。它首先抛出一个Value Error,但随后继续使用我的CPU核心的100%。例如:

Exception in thread Thread-N:
Traceback (most recent call last):
  File "C:\Users\username\AppData\Local\Programs\Python\Python38\lib\threading.py", line 932, in _bootstrap_inner
    self.run()
  File "C:\Users\username\AppData\Local\Programs\Python\Python38\lib\threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "C:\Users\username\AppData\Local\Programs\Python\Python38\lib\multiprocessing\pool.py", line 519, in _handle_workers       
    cls._wait_for_updates(current_sentinels, change_notifier)
  File "C:\Users\username\AppData\Local\Programs\Python\Python38\lib\multiprocessing\pool.py", line 499, in _wait_for_updates     
    wait(sentinels, timeout=timeout)
  File "C:\Users\username\AppData\Local\Programs\Python\Python38\lib\multiprocessing\connection.py", line 879, in wait
    ready_handles = _exhaustive_wait(waithandle_to_obj.keys(), timeout)
  File "C:\Users\username\AppData\Local\Programs\Python\Python38\lib\multiprocessing\connection.py", line 811, in _exhaustive_wait
    res = _winapi.WaitForMultipleObjects(L, False, timeout)
ValueError: need at most 63 handles, got a sequence of length 98

我的机器有96个核心。这个“错误”真的是一个错误吗?如果不是,我应该只使用multiprocessing模块而不是concurrent.futures模块吗?后者会限制我的CPU使用率在61个核心。

编辑:我怀疑这是一个错误,因为我认为multiprocessing将继续等待抛出错误的进程完成。如果我不限制核心数,情况似乎是这样的(程序在CPU使用率下降后就会停滞)。但是,我不确定这是否真的是一个错误。

1个回答

2

你的问题非常好。看起来代码中似乎是一个无法恢复的错误。但对我来说,ThreadPoolExecutor 中有限制 Windows 下线程池大小为 61 的代码,而 multiprocessing.Pool 类中没有强制限制似乎是不可理解的。无论如何,你可以用以下程序轻松检查。如果它没有打印 Done! 并挂起,那么肯定存在问题,如果你使用 multiprocessing.Pool,则应该明确限制池的大小:

import multiprocessing

def worker(x):
    return x ** 2

def main():
    pool = multiprocessing.Pool(96)
    results = pool.map(worker, range(96))
    assert len(results) == 96
    pool.close()
    pool.join()
    print('Done!')

if __name__ == '__main__':
    main()

但是你的程序卡住了这一事实相当明显,上面的程序也会卡住,我怀疑你甚至无法到达assert语句。无论如何,使用大于61的池大小都不可靠。


非常感谢!我同意。这是一个有趣的“问题”,因为我不确定为什么 multiprocessing.Pool 没有相应地进行调整。 - A. Hendry

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接