多进程.Queue:管道破裂错误

4
注1: 我想在多个进程中使用multiprocessing.Queue,但在单进程情况下发现了此问题。因此,以下代码使用单个进程简化问题。
类似的问题如下:Broken pipe error with multiprocessing.Queue
该帖子中的答案证明了这个问题是由于主线程在队列线程完成工作之前退出了导致的。他解决这个问题的方法是在代码中添加sleep(0.1):
import multiprocessing
import time
def main():
    q = multiprocessing.Queue()
    for i in range(10):
        print i
        q.put(i)
    time.sleep(0.1) # Just enough to let the Queue finish

if __name__ == "__main__":
    main()

不过,我认为睡眠并不是一种稳定的生产代码方法,因此我尝试使用 join 来实现。您可以查看下面的代码,但不幸的是,它无法正常工作。有人知道如何在没有睡眠的情况下完成这个吗?

import multiprocessing
import time


def main():
    q = multiprocessing.Queue()
    for i in range(10):
        q.put(i)
    # time.sleep(4)
    q.close()
    q.join_thread()

if __name__ == "__main__":
    main() 

{btsdaf} - Sraw
{btsdaf} - scott huang
在多进程中,你几乎不会遇到这个问题。因为你将拥有一个可以阻塞主进程的消费者进程。 - Sraw
1
{btsdaf} - scott huang
2个回答

3

让我们先描述一些有关 multiprocessing.Queue 的细节。

当一个对象被放入队列中时,该对象会被 pickled,并且后台线程会将 pickled 数据刷新到底层的 pipe 中。

该 pipe 通过 reader, writer = socket.socketpair() 创建。

queue.close() 适用于多进程,它做了两件事:

  1. 关闭 reader(重要!)
  2. queue.buffer 发送一个 sentinel 值,如果遇到这样的值,后台线程将退出

在单进程情况下,queue.close() 不起作用的原因是,如果 buffer 中仍有数据,后台线程将继续向已关闭的 socket 写入数据,从而导致 Broken pipe 错误。

以下是一个简单的示例来演示该错误。

import socket

reader, writer = socket.socketpair()
writer.send("1")

# queue.close() will internally call reader.close()
reader.close()

# got a Broken pipe error
writer.send("2")

在多进程情况下,仅在主进程中关闭reader只会减少基础套接字的引用计数(主进程和子进程共享套接字),而不是真正地关闭(或关闭)套接字。

1

程序:

import multiprocessing

def main():
    q = multiprocessing.Queue()
    q.put(0)

if __name__ == '__main__':
    main()

输出:

Traceback (most recent call last):
  File "/usr/local/Cellar/python@3.9/3.9.10/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/queues.py", line 251, in _feed
    send_bytes(obj)
  File "/usr/local/Cellar/python@3.9/3.9.10/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/connection.py", line 205, in send_bytes 
    self._send_bytes(m[offset:offset + size])
  File "/usr/local/Cellar/python@3.9/3.9.10/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/connection.py", line 416, in _send_bytes 
    self._send(header + buf)
  File "/usr/local/Cellar/python@3.9/3.9.10/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/connection.py", line 373, in _send
    n = write(self._handle, buf)
BrokenPipeError: [Errno 32] Broken pipe

BrokenPipeError异常会在一个的队列线程仍将已入队项目发送到队列管道的写端之后,在队列被垃圾回收后自动关闭读端引发。队列管道的写端不会被垃圾回收,因为它也被队列线程引用。

我认为这是一个bug,所以我在GitHub上开了一个pull request

解决方法是要确保在队列被垃圾回收之前,没有留下任何已经排队等待发送给队列线程的项目,需要在垃圾回收之前将所有已排队项目出队:

import multiprocessing

def main():
    q = multiprocessing.Queue()
    q.put(0)
    q.get()

if __name__ == '__main__':
    main()

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接