多进程队列偶尔失败。Python中的错误?

9

Python中的multiprocessing.Queue会间歇性地出现故障,我不知道原因。这是Python中的一个错误还是我的脚本问题?

最小失败脚本

import multiprocessing
import time
import logging
import multiprocessing.util
multiprocessing.util.log_to_stderr(level=logging.DEBUG)

queue = multiprocessing.Queue(maxsize=10)

def worker(queue):
    queue.put('abcdefghijklmnop')

    # "Indicate that no more data will be put on this queue by the
    # current process." --Documentation
    # time.sleep(0.01)
    queue.close()

proc = multiprocessing.Process(target=worker, args=(queue,))
proc.start()

# "Indicate that no more data will be put on this queue by the current
# process." --Documentation
# time.sleep(0.01)
queue.close()

proc.join()

我正在Debian中使用CPython 3.6.6进行测试。 使用docker python:3.7.0-alpine也失败了。

docker run --rm -v "${PWD}/test.py:/test.py" \
    python:3-alpine python3 /test.py

上面的脚本有时候会因为BrokenPipeError而失败。

Traceback (most recent call last):
  File "/usr/lib/python3.6/multiprocessing/queues.py", line 240, in _feed
    send_bytes(obj)
  File "/usr/lib/python3.6/multiprocessing/connection.py", line 200, in send_bytes
    self._send_bytes(m[offset:offset + size])
  File "/usr/lib/python3.6/multiprocessing/connection.py", line 404, in _send_bytes
    self._send(header + buf)
  File "/usr/lib/python3.6/multiprocessing/connection.py", line 368, in _send
    n = write(self._handle, buf)
BrokenPipeError: [Errno 32] Broken pipe

测试工具

由于这是间歇性的,所以我编写了一个Shell脚本来多次调用它并计算失败次数。

#!/bin/sh
total=10

successes=0
for i in `seq ${total}`
do
    if ! docker run --rm -v "${PWD}/test.py:/test.py" python:3-alpine \
         python3 test.py 2>&1 \
         | grep --silent BrokenPipeError
    then
        successes=$(expr ${successes} + 1)
    fi
done
python3 -c "print(${successes} / ${total})"

这通常显示一些分数,例如0.2表示间歇性故障。
时间调整
如果我在queue.close()之前插入time.sleep(0.01),它就可以正常工作。我注意到在源代码中,写操作是在自己的线程中完成的。我认为如果写入线程仍在尝试写入数据并且所有其他线程关闭队列,则会导致错误。
调试日志
通过取消注释前几行,我可以跟踪执行失败和成功的情况。
失败:
[DEBUG/MainProcess] created semlock with handle 140480257941504
[DEBUG/MainProcess] created semlock with handle 140480257937408
[DEBUG/MainProcess] created semlock with handle 140480257933312
[DEBUG/MainProcess] Queue._after_fork()
[DEBUG/Process-1] Queue._after_fork()
[INFO/Process-1] child process calling self.run()
[DEBUG/Process-1] Queue._start_thread()
[DEBUG/Process-1] doing self._thread.start()
[DEBUG/Process-1] starting thread to feed data to pipe
[DEBUG/Process-1] ... done self._thread.start()
[DEBUG/Process-1] telling queue thread to quit
[INFO/Process-1] process shutting down
[DEBUG/Process-1] running all "atexit" finalizers with priority >= 0
[DEBUG/Process-1] running the remaining "atexit" finalizers
[DEBUG/Process-1] joining queue thread
Traceback (most recent call last):
  File "/usr/lib/python3.7/multiprocessing/queues.py", line 242, in _feed
    send_bytes(obj)
  File "/usr/lib/python3.7/multiprocessing/connection.py", line 200, in send_bytes
    self._send_bytes(m[offset:offset + size])
  File "/usr/lib/python3.7/multiprocessing/connection.py", line 404, in _send_bytes
    self._send(header + buf)
  File "/usr/lib/python3.7/multiprocessing/connection.py", line 368, in _send
    n = write(self._handle, buf)
BrokenPipeError: [Errno 32] Broken pipe
[DEBUG/Process-1] feeder thread got sentinel -- exiting
[DEBUG/Process-1] ... queue thread joined
[INFO/Process-1] process exiting with exitcode 0
[INFO/MainProcess] process shutting down
[DEBUG/MainProcess] running all "atexit" finalizers with priority >= 0
[DEBUG/MainProcess] running the remaining "atexit" finalizers

“成功”(实际上是静默失败,只能在Python 3.6中复制):
[DEBUG/MainProcess] created semlock with handle 139710276231168
[DEBUG/MainProcess] created semlock with handle 139710276227072
[DEBUG/MainProcess] created semlock with handle 139710276222976
[DEBUG/MainProcess] Queue._after_fork()
[DEBUG/Process-1] Queue._after_fork()
[INFO/Process-1] child process calling self.run()
[DEBUG/Process-1] Queue._start_thread()
[DEBUG/Process-1] doing self._thread.start()
[DEBUG/Process-1] starting thread to feed data to pipe
[DEBUG/Process-1] ... done self._thread.start()
[DEBUG/Process-1] telling queue thread to quit
[INFO/Process-1] process shutting down
[INFO/Process-1] error in queue thread: [Errno 32] Broken pipe
[DEBUG/Process-1] running all "atexit" finalizers with priority >= 0
[DEBUG/Process-1] running the remaining "atexit" finalizers
[DEBUG/Process-1] joining queue thread
[DEBUG/Process-1] ... queue thread joined
[INFO/Process-1] process exiting with exitcode 0
[INFO/MainProcess] process shutting down
[DEBUG/MainProcess] running all "atexit" finalizers with priority >= 0
[DEBUG/MainProcess] running the remaining "atexit" finalizers

真正的成功(使用time.sleep(0.01)):

[DEBUG/MainProcess] created semlock with handle 140283921616896
[DEBUG/MainProcess] created semlock with handle 140283921612800
[DEBUG/MainProcess] created semlock with handle 140283921608704
[DEBUG/MainProcess] Queue._after_fork()
[DEBUG/Process-1] Queue._after_fork()
[INFO/Process-1] child process calling self.run()
[DEBUG/Process-1] Queue._start_thread()
[DEBUG/Process-1] doing self._thread.start()
[DEBUG/Process-1] starting thread to feed data to pipe
[DEBUG/Process-1] ... done self._thread.start()
[DEBUG/Process-1] telling queue thread to quit
[INFO/Process-1] process shutting down
[DEBUG/Process-1] feeder thread got sentinel -- exiting
[DEBUG/Process-1] running all "atexit" finalizers with priority >= 0
[DEBUG/Process-1] running the remaining "atexit" finalizers
[DEBUG/Process-1] joining queue thread
[DEBUG/Process-1] ... queue thread joined
[INFO/Process-1] process exiting with exitcode 0
[INFO/MainProcess] process shutting down
[DEBUG/MainProcess] running all "atexit" finalizers with priority >= 0
[DEBUG/MainProcess] running the remaining "atexit" finalizers

区别似乎在于,在真正成功的情况下,馈送器在 atexit 处理程序之前收到哨兵对象。

奇怪...在可能的故障模式中,如果出现任何情况,我本来会预期一致的死锁。 - user2357112
如果我注释掉工作线程的 queue.close(),它就会卡住。 - charmoniumQ
结果证明,它在小对象大小时也可能失败。 - charmoniumQ
你可以尝试使用备用的多进程队列实现方式:queue = multiprocessing.Manager().Queue(),在我的经验中它更加稳定。 - natonomo
1
在Python 3.9和3.10-rc中仍存在问题,如果在退出之前“清空队列”,问题就会消失,但这并不是真正的“修复”。 - Andy
显示剩余6条评论
1个回答

0

你的代码主要问题是没有人消耗工作进程放入队列中的数据。Python 队列希望在将数据放入队列的进程被终止之前,队列中的数据被消耗("flushed to pipe")。

从这个角度来看,你的示例并没有太多意义,但是如果你想让它正常工作:

关键在于使用 queue.cancel_join_thread() -- https://docs.python.org/3/library/multiprocessing.html

警告 如上所述,如果子进程向队列中放入了项目(并且没有使用 JoinableQueue.cancel_join_thread),那么该进程将不会终止,直到所有缓冲的项目都被刷新到管道中。这意味着如果你尝试加入该进程,除非你确定队列中的所有项目都已经被消耗,否则可能会出现死锁。同样,如果子进程是非守护进程,那么父进程在退出时尝试加入所有非守护子进程时可能会挂起。

请注意,使用管理器创建的队列不会出现此问题。

^ 相关部分。问题在于子进程将内容放入队列中,但没有任何人消费它。在这种情况下,在要求子进程 join 之前必须在其上调用 cancel_join_queue。此代码示例将解决该错误。

import multiprocessing
import time
import logging
import multiprocessing.util
multiprocessing.util.log_to_stderr(level=logging.DEBUG)

queue = multiprocessing.Queue(maxsize=10)

def worker(queue):
    queue.put('abcdefghijklmnop')

    # "Indicate that no more data will be put on this queue by the
    # current process." --Documentation
    # time.sleep(0.01)
    queue.close()
    
    queue.cancel_join_thread() # ideally, this would not be here but would rather be a response to a signal (or other IPC message) sent from the main process


proc = multiprocessing.Process(target=worker, args=(queue,))
proc.start()

# "Indicate that no more data will be put on this queue by the current
# process." --Documentation
# time.sleep(0.01)
queue.close()

proc.join()

我没有为此使用IPC,因为根本没有消费者,但我希望这个想法是清晰的。


为什么这是一个答案?即使没有任何进程,只要执行queue.put(1),queue.close()就会触发BrokenPipe。 - agemO

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接