如何使Python的多进程队列(multiprocessing Queue)的.empty()方法返回正确的值?或者有什么替代方案?

9

我有一个片段,使用了multiprocess模块中的Queue类。我很困惑一个Queue实例的.empty()方法没有给出我期望的正确值。以下是我的代码:

from time import sleep
from multiprocessing import Queue, Lock

foo = Queue()
locker = Lock()

with locker:  # even with this, still True
    foo.put("bar")

print(foo.empty())  # True, obviously not
print(foo.empty())  # True
print(foo.empty())  # True
print(foo.qsize())  # 1L
print(foo.empty())  # True

然而,如果我使用timesleep函数,即在执行过程中引起一定的时间延迟。 它可以正常工作。
from time import sleep
from multiprocessing import Queue, Lock

foo = Queue()
locker = Lock()

foo.put("bar")

sleep(0.01)

print(foo.empty())  # False
print(foo.empty())  # False
print(foo.empty())  # False
print(foo.qsize())  # 1L
print(foo.empty())  # False

我知道我的替代方案是.qsize() > 0表达式,但我确定我做错了。我做错了什么?
*编辑*
我现在明白它是不可靠的了,谢谢 @Mathias Ettinger。有没有更好的替代方案?我需要可靠地知道我的 Queue 是否为空。

没有任何一个函数,包括empty()full()qsize()都是可靠的。 参考链接 - 301_Moved_Permanently
哈哈,谢谢你引用了那个。有其他的选择吗? - A. K. Tolentino
不确定你想做什么,但也许 JoinableQueue 是你正在寻找的。 - swenzel
1
我认为队列的唯一真正接口是 putgetempty 仅用于近似统计。在处理节点中,我会使用 get(block=False) 立即查看队列是否为空或获取一项工作。 - 9000
2个回答

10

很不幸,队列的复杂实现意味着.empty().qsize()检查不同的内容来做出它们的决策。这意味着它们可能会有一段时间的分歧,就像你所看到的那样。

由于.qsize()在您的平台上是被支持的(这在并非普遍适用),您可以重新实现基于.qsize().empty()检查,这将对您起作用:

# mp.Queue() is a function, not a class, so we need to find the true class
# to subclass
import multiprocessing.queues

class XQueue(multiprocessing.queues.Queue):
    def empty(self):
        try:
            return self.qsize() == 0
        except NotImplementedError:  # OS X -- see qsize() implementation
            return super(XQueue, self).empty()

在幕后,队列.put()是一个复杂的过程:队列将对象放入缓冲区并获取进程间信号量,而隐藏的守护线程负责清空缓冲区并将其内容序列化到管道中。(然后使用者通过从此管道读取并释放进程间信号量来.get()。)所以,这就是为什么在您的示例中睡眠起作用的原因:在调用.empty()之前,守护线程有足够的时间将对象从内存缓冲区移动到I/O表示。

顺便说一下,我觉得这种行为很令人惊讶:处于完全相同的内部状态的队列可能会对“你是否有任何元素入队”的问题给出两个不同的答案(qsize会说“是”,而empty会说“否”)。

我想我理解了这是如何发生的。由于并非所有平台都支持sem_getvalue(),因此并非所有平台都可以实现qsize,但是empty可以通过轮询FIFO来合理地实现。对于支持后者的平台,我本来希望将empty实现为qsize的术语。


你对NotImplementedError有什么评论?我查了文档,但没有关于它的内容。 - User
1
@用户,如果您按照我的回答第二段中的链接 - 关于qsize()在每个平台上都没有实现 - 您将会看到该方法_“...可能在类Unix平台(如Mac OS X)上引发NotImplementedError,因为sem_getvalue()未被实现。”_ - pilcrow
文档中提到,qsizeemptyfull 都是不可靠的。请问您能否提供一个链接,证明 qsize 立即返回正确的队列大小? - Matthew Moisen
@MatthewMoisen,我在5年后重新考虑了这个答案的措辞,并进行了澄清。谢谢。 - pilcrow

7
根据文档,无论是empty()full(),还是qsize()都不是可靠的。
其他可选方案包括:
  • Reading the exact amount of items going through the Queue:

    AMT = 8
    for _ in range(AMT):
        queue.put('some stuff')
    
    for _ in range(AMT):
        print(queue.get())
    

    This is useful if you know beforehand how many items must be processed in total or how many will be processed by each thread.

  • Reading items until a guardian appears:

    num_threads = 8
    guardian = 'STUFF DONE'
    
    while num_threads:
        item = queue.get()
        if item == guardian:
            num_threads -= 1
        else:
            process(item)
    

    This is helpful if every thread have a variable amount of work (and you don't know the total beforehand) to do but can determine when it’s done.


1
虽然这一切都是真的,但队列大小方法通常是“不可靠”的,因为获取信息后很快就可能过时,而不是与实际队列状态不一致(这是 OP 注意到的内容)。此外,使用哨兵或先验知识来了解队列中项目数量的技术是很好的,可以知道何时完成处理,但不清楚这是否是 OP 对 .empty() 的用例。 - pilcrow

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接