Python多进程队列的可靠性,Queue vs SimpleQueue vs JoinableQueue

12

直接引用自Python 文档:

class multiprocessing.Queue([maxsize])

...

qsize() 返回队列的大约大小。由于多线程/多进程语义,这个数字不可靠。

empty() 如果队列为空则返回True,否则返回False。由于多线程/多进程语义,这是不可靠的。

在我的代码中,我有一堆进程(每一个都是同一个主进程的子进程),每个进程的run方法如下:

while self.active:
    if(self.exclusive_queue.empty() and self.exclusive_queue.qsize() == 0):
        try:
            self.exclusive_queue.put(self.general_queue.get(timeout=self.queue_timeout))
        except Queue.Empty as empty_queue:
            continue
    else:
        task = self.exclusive_queue.get()
        self.compute(task)

基本上,该过程在 general_queue 等待工作,但首先检查其 exclusive_queue。主进程可以将任务放入进程的通用队列或独占队列中。现在,在 if(self.exclusive_queue.empty() and self.exclusive_queue.qsize() == 0) 中,我首先使用了 self.exclusive_queue.empty(),结果出现了相当奇怪的行为(qsize() 大于30且 empty()=True)。

所以我的问题是,关于 multiprocessing.queues.SimpleQueue 在文档中写道:

empty() 如果队列为空,则返回True,否则返回False。

没有提到其可靠性。 SimpleQueue.empty() 是否可靠?

另外,multiprocessing.JoinableQueue 是否可靠或比 Queue 更可靠,因为它具有 task_done() 机制?

这种方法是否可以被认为是正确的,或者通过子进程之间的共享管道端点进行回调的方法更合适?


在我的情况下,SimpleQueue 只能存储很少的元素 [大约 360 个 (int, str, int)-tuples)]。因此,我决定使用一个 STOP 元素(如 Midnighter 的回答中所提到的)与 Queue 一起使用。对于 Queue,即使是相同结构的 100,000 个元素,我也没有遇到大小限制。 - Markus Dutschke
1个回答

7

虽然不是直接回答,但我越来越依赖于使用守卫条件迭代输入队列。在multiprocessing模块的文档中有一个例子:

def worker(input, output):
    for func, args in iter(input.get, 'STOP'):
        result = calculate(func, args)
        output.put(result)

因此,当您将数据输入队列完成后,您只需向队列中添加与已启动进程数量相同的STOP字符串或其他保护标识。


是的,我在我的服务器上基本上使用相同的方法。如果 compute 获得一个 None,它会将 active 改为 False。然而,正如你在我的代码中所看到的,我有两个队列要等待——“独占”和“常规”队列,其中的想法是“如果独占为空,则从常规队列获取”,但是因为我希望即使在常规队列中有条目,也能够停止进程,所以我将 None 放入了独占队列中。 - dmg
进程可能会在没有正确清理的情况下死亡,因此这种技术让我有些担心。 - Paul Prescod
如果工作进程死亡,那么它无需被停止。如果有其他资源需要清理,而这种方法没有处理,那是真的,但我认为这并不是它的目的。 - Midnighter

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接