多进程队列已满

16

我正在使用concurrent.futures实现多进程。 我收到了一个queue.Full错误,这很奇怪,因为我只分配了10个作业。

A_list = [np.random.rand(2000, 2000) for i in range(10)]

with ProcessPoolExecutor() as pool:
    pool.map(np.linalg.svd, A_list)

错误:

Exception in thread Thread-9:
Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/threading.py", line 921, in _bootstrap_inner
    self.run()
  File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/threading.py", line 869, in run
    self._target(*self._args, **self._kwargs)
  File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/concurrent/futures/process.py", line 251, in _queue_management_worker
    shutdown_worker()
  File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/concurrent/futures/process.py", line 209, in shutdown_worker
    call_queue.put_nowait(None)
  File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/queues.py", line 131, in put_nowait
    return self.put(obj, False)
  File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/queues.py", line 82, in put
    raise Full
queue.Full

如果您使用一个小得多的数组,是否会出现相同的错误? - 101
我在较小的数组上没有出现错误。最大的大小是约200x200。 - rohanp
1
如果由于工作进程崩溃而导致“池”破裂,那么调用shutdown_worker的失败只会运行一次-因此,您需要追踪的真正问题是为什么会发生这种情况。 - dano
2个回答

21

简短回答
我认为管道大小限制是根本原因。你所能做的就是将数据分成更小的块并进行迭代处理。这意味着你可能需要找到一种新的算法,可以在每次处理2000x2000数组的一小部分来找到奇异值组合。

详情
首先得明确一件事情:你正在处理大量信息。仅因为你只处理了十个项目并不意味着它是微不足道的。每个项目都是一个包含4,000,000个浮点数的2000x2000数组,每个浮点数通常是64位,因此每个数组大约需要244MB,再加上在Numpy的ndarrays中附带的其他数据。

ProcessPoolExecutor通过启动一个单独的线程来管理工作进程。管理线程使用一个multiprocesing.Queue将任务传递给工作进程,称为_call_queue。这些multiprocessing.Queue实际上只是管道的高级包装器,而你试图传递给工作进程的ndarrays可能太大而无法正确处理。

阅读Python Issue 8426可以看出,即使可以查找操作系统的一些名义上的管道大小限制,确定管道实际大小也很困难。有太多变量使它不简单。即使从队列中获取事物的顺序也可能导致底层管道中的竞争条件触发奇怪的错误。

我怀疑你的一个工作者正在从它的_call_queue获得不完整或损坏的对象,因为该队列的管道充满了巨大的对象。那个工作者以不洁的方式死亡,工作队列管理器检测到此故障,因此放弃工作并告诉其余的工作者退出。但它通过_call_queue向它们传递毒丸来完成这个过程,而仍然充满你的巨大ndarrays。这就是为什么你会得到完整的队列异常 - 你的数据填满了队列,然后管理线程尝试使用相同的队列将控制消息传递给其他工作者。

我认为这是在程序中混合数据和控制流可能存在的潜在危险的经典例子。你的大数据不仅阻塞了更多的数据被工作者接收,而且还阻塞了管理器与工作者之间的控制通信,因为它们使用相同的路径。

我无法重现你的失败,所以我不能确定所有这些是否正确。但是,你可以使用一个200x200的数组(~2.5MB)让这段代码工作似乎支持了这个理论。名义管道大小限制似乎以KB或最多几MB为单位,具体取决于操作系统和架构。如果消费者不断接收数据,则并非所有2.5MB都需要同时适合于该管道中,所以可以通过管道连续获得合理的数据上限。


12

我最近在调试一个Python3.6程序,该程序通过管道发送各种GB级别的数据时,偶然发现了这一问题。这就是我发现的(希望能节省其他人的时间!)。

skrrgwasme所说,如果队列管理器无法在发送毒丸时获取信号量,则会引发队列已满错误。对信号量的获取调用是非阻塞的,这会导致管理器失败(由于数据和控制流共享同一 Queue,因此无法发送“控制”命令)。请注意,上面的链接是针对Python 3.6.0的。

现在我想知道为什么我的队列管理器会发送毒丸。必须还有其他的故障!显然,某些异常情况已经发生(在其他子进程中?在父进程中?),而队列管理器正在尝试清理并关闭所有子进程。此时,我对查找根本原因很感兴趣。

调试根本原因

我最初尝试在子进程中记录所有异常,但显然那里没有发生明确的错误。来自问题3895

请注意,当结果在未反序列化时失败时,multiprocessing.Pool也会出现故障。

看起来,在py36中,多进程模块存在问题,它无法正确捕获和处理序列化错误。

不幸的是,由于时间限制,我没有亲自复制和验证问题,而是更喜欢采取行动点和更好的编程实践(不要通过管道发送所有数据:) 这里有一些想法:

  1. 尝试对应该通过管道运行的数据进行pickle。由于我的数据非常庞大(数百GB)且时间紧迫,我没有找到哪些记录无法序列化。
  2. 将调试器放入python3.6中并打印原始异常。

行动点

  1. 如果可能的话,请重新设计程序,使其通过管道发送更少的数据。

  2. 阅读问题3895后,看起来问题出现在pickling错误上。一个替代方案(也是良好的编程实践)可以使用不同的方式传输数据。例如,子进程可以写入文件并返回路径给父进程(这只会是一个小字符串,可能只有几个字节)。

  3. 等待未来的Python版本。显然,这已经在Python版本标签v3.7.0b3中修复了问题3895的上下文。 Full异常将在shutdown_worker处理。撰写本文时,当前的Python维护版本为3.6.5。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接