多进程.Queue在“reader”进程死亡后出现死锁问题

5

我一直在使用multiprocessing包,并注意到在以下情况下,队列可能会出现死锁读取:

  1. The "reader" process is using get with timeout > 0:

    self.queue.get(timeout=3)
    
  2. "reader" dies while get is blocking due to timeout.

之后队列将永久锁定。

演示该问题的应用程序

我创建了两个子进程 "Worker"(将任务放入队列)和 "Receiver"(从队列获取任务)。此外,父进程定期检查他的子进程是否仍在运行,必要时启动新的子进程。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import multiprocessing
import procname
import time

class Receiver(multiprocessing.Process):
    ''' Reads from queue with 3 secs timeout '''

    def __init__(self, queue):
        multiprocessing.Process.__init__(self)
        self.queue = queue

    def run(self):
        procname.setprocname('Receiver')
        while True:
            try:
                msg = self.queue.get(timeout=3)
                print '<<< `{}`, queue rlock: {}'.format(
                    msg, self.queue._rlock)
            except multiprocessing.queues.Empty:
                print '<<< EMPTY, Queue rlock: {}'.format(
                    self.queue._rlock)
                pass


class Worker(multiprocessing.Process):
    ''' Puts into queue with 1 sec sleep '''

    def __init__(self, queue):
        multiprocessing.Process.__init__(self)
        self.queue = queue

    def run(self):
        procname.setprocname('Worker')
        while True:
            time.sleep(1)
            print 'Worker: putting msg, Queue size: ~{}'.format(
                self.queue.qsize())
            self.queue.put('msg from Worker')


if __name__ == '__main__':
    queue = multiprocessing.Queue()

    worker = Worker(queue)
    worker.start()

    receiver = Receiver(queue)
    receiver.start()

    while True:
        time.sleep(1)
        if not worker.is_alive():
            print 'Restarting worker'
            worker = Worker(queue)
            worker.start()
        if not receiver.is_alive():
            print 'Restarting receiver'
            receiver = Receiver(queue)
            receiver.start()

ps中,进程树是什么样子的

bash
 \_ python queuetest.py
     \_ Worker
     \_ Receiver

控制台输出

$ python queuetest.py
Worker: putting msg, Queue size: ~0
<<< `msg from Worker`, queue rlock: <Lock(owner=None)>
Worker: putting msg, Queue size: ~0
<<< `msg from Worker`, queue rlock: <Lock(owner=None)>
Restarting receiver                        <-- killed Receiver with SIGTERM
Worker: putting msg, Queue size: ~0
Worker: putting msg, Queue size: ~1
Worker: putting msg, Queue size: ~2
<<< EMPTY, Queue rlock: <Lock(owner=SomeOtherProcess)>
Worker: putting msg, Queue size: ~3
Worker: putting msg, Queue size: ~4
Worker: putting msg, Queue size: ~5
<<< EMPTY, Queue rlock: <Lock(owner=SomeOtherProcess)>
Worker: putting msg, Queue size: ~6
Worker: putting msg, Queue size: ~7

有没有什么方法可以绕过这个问题?使用get_nowait和sleep的组合似乎是一种解决方法,但它不能“随时读取”数据。

系统信息

$ uname -sr
Linux 3.11.8-200.fc19.x86_64

$ python -V
Python 2.7.5

In [3]: multiprocessing.__version__
Out[3]: '0.70a1'

“一切都能正常工作”的解决方案

在撰写这篇问题时,我对Receiver类做出了一些愚蠢的修改:

class Receiver(multiprocessing.Process):

    def __init__(self, queue):
        multiprocessing.Process.__init__(self)
        self.queue = queue

    def run(self):
        procname.setprocname('Receiver')
        while True:
            time.sleep(1)
            while True:
                try:
                    msg = self.queue.get_nowait()
                    print '<<< `{}`, queue rlock: {}'.format(
                        msg, self.queue._rlock)
                except multiprocessing.queues.Empty:
                    print '<<< EMPTY, Queue rlock: {}'.format(
                        self.queue._rlock)
                    break

但是我觉得这并不是很好。

1
这里有一个 Python 问题,回复是“这是预期的”。 - neutrinus
1个回答

2

这可能是因为在Queue.get()中,*not_empty.release()*从未发生(进程已被杀死)。您是否尝试在接收器中捕获TERM信号并在退出之前释放队列互斥锁?


你是指 **_rlock.release()**(这个在Queue.get中使用,据我所知)吗?不,我还没有尝试捕获任何信号,因为我更关注接收器由于错误而崩溃,而不是可以处理的“温和”信号。 - Michael
正如另一个答案所指出的那样,正确的解决方案是使用“鲁棒互斥锁”或“命名信号量”。我建议您报告多进程库中的错误。 - neutrinus

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接