Python多线程+多进程BrokenPipeError(子进程不退出?)

3

当使用多进程JoinableQueue线程创建进程时,我遇到了BrokenPipeError错误。看起来这是因为程序完成工作并尝试退出之后发生的,因为它已经完成了所有应该做的事情。这意味着什么?有没有办法解决这个问题或者可以安全地忽略它?

import requests
import multiprocessing
from multiprocessing import JoinableQueue
from queue import Queue
import threading


class ProcessClass(multiprocessing.Process):
    def __init__(self, func, in_queue, out_queue):
        super().__init__()
        self.in_queue = in_queue
        self.out_queue = out_queue
        self.func = func

    def run(self):
        while True:
            arg = self.in_queue.get()
            self.func(arg, self.out_queue)
            self.in_queue.task_done()


class ThreadClass(threading.Thread):
    def __init__(self, func, in_queue, out_queue):
        super().__init__()
        self.in_queue = in_queue
        self.out_queue = out_queue
        self.func = func

    def run(self):
        while True:
            arg = self.in_queue.get()
            self.func(arg, self.out_queue)
            self.in_queue.task_done()


def get_urls(host, out_queue):
    r = requests.get(host)
    out_queue.put(r.text)
    print(r.status_code, host)


def get_title(text, out_queue):
    print(text.strip('\r\n ')[:5])


if __name__ == '__main__':
    def test():

        q1 = JoinableQueue()
        q2 = JoinableQueue()

        for i in range(2):
            t = ThreadClass(get_urls, q1, q2)
            t.daemon = True
            t.setDaemon(True)
            t.start()

        for i in range(2):
            t = ProcessClass(get_title, q2, None)
            t.daemon = True
            t.start()

        for host in ("http://ibm.com", "http://yahoo.com", "http://google.com", "http://amazon.com", "http://apple.com",):
            q1.put(host)

        q1.join()
        q2.join()

    test()
    print('Finished')

程序输出:

200 http://ibm.com
<!DOC
200 http://google.com
<!doc
200 http://yahoo.com
<!DOC
200 http://apple.com
<!DOC
200 http://amazon.com
<!DOC
Finished
Exception in thread Thread-2:
Traceback (most recent call last):
  File "C:\Python\33\lib\multiprocessing\connection.py", line 313, in _recv_bytes
    nread, err = ov.GetOverlappedResult(True)
BrokenPipeError: [WinError 109]

The pipe has been ended

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "C:\Python\33\lib\threading.py", line 901, in _bootstrap_inner
    self.run()
  File "D:\Progs\Uspat\uspat\spider\run\threads_test.py", line 31, in run
    arg = self.in_queue.get()
  File "C:\Python\33\lib\multiprocessing\queues.py", line 94, in get
    res = self._recv()
  File "C:\Python\33\lib\multiprocessing\connection.py", line 251, in recv
    buf = self._recv_bytes()
  File "C:\Python\33\lib\multiprocessing\connection.py", line 322, in _recv_bytes
    raise EOFError
EOFError
....

如果我将JoinableQueue切换为queue.Queue用于多线程部分,一切都能解决,但是为什么?

(为其他线程剪切相同的错误。)

1个回答

5
这是因为在主线程退出时,您让后台线程阻塞在multiprocessing.Queue.get调用中,但只有在某些条件下才会发生这种情况:
  1. 一个守护线程正在运行并阻塞在multiprocessing.Queue.get上,当主线程退出时。
  2. 一个multiprocessing.Process正在运行。
  3. multiprocessing上下文是除'fork'之外的其他内容。
异常告诉您的是,当multiprocessing.JoinableQueue在get()调用内部侦听其另一端的Connection发送EOF时。通常这意味着连接的另一端已关闭。这在关闭期间发生是有意义的 - Python在退出解释器之前清理所有对象,其中包括关闭所有打开的Connection对象。我尚未能够找出为什么仅在生成了multiprocessing.Process(而不是默认情况下在Linux上发生的分叉)并且仍在运行时才会始终发生。即使我创建一个只在while循环中睡眠的multiprocessing.Process也可以重现它,完全不需要任何Queue对象。由于某种原因,存在运行的生成子进程似乎保证引发异常。它可能仅仅是导致销毁顺序恰到好处以出现竞争条件的原因,但这只是猜测。
无论如何,使用queue.Queue而不是multiprocessing.JoinableQueue是解决它的好方法,因为您实际上不需要那里的multiprocessing.Queue。您还可以通过向其队列发送sentinel来确保在主线程之前关闭后台线程和/或后台进程。因此,使两个run方法都检查哨兵是个好办法:
def run(self):
    for arg in iter(self.in_queue.get, None):  # None is the sentinel
        self.func(arg, self.out_queue)
        self.in_queue.task_done()
    self.in_queue.task_done()

完成后,发送哨兵:

    threads = []
    for i in range(2):
        t = ThreadClass(get_urls, q1, q2)
        t.daemon = True
        t.setDaemon(True)
        t.start()
        threads.append(t)

    p = multiprocessing.Process(target=blah)
    p.daemon = True
    p.start()
    procs = []
    for i in range(2):
        t = ProcessClass(get_title, q2, None)
        t.daemon = True
        t.start()
        procs.append(t)

    for host in ("http://ibm.com", "http://yahoo.com", "http://google.com", "http://amazon.com", "http://apple.com",):
        q1.put(host)

    q1.join()
    # All items have been consumed from input queue, lets start shutting down.
    for t in procs:
        q2.put(None)
        t.join()
    for t in threads:
        q1.put(None)
        t.join()
    q2.join()

非常感谢您提供详尽的回答(附注:或许对某人有帮助:我想使用multiprocessing.JoinableQueue而不是queue.Queue,以便能够将参数从多进程部分传递回应用程序的多线程部分,尽管在我上面的示例中没有这样的代码)。 - Bob

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接