如何清空一个多进程队列?

36

我只想知道如何像在Python中清空queue.Queue一样清空multiprocessing.Queue

>>> import queue
>>> queue.Queue().clear()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
AttributeError: 'Queue' object has no attribute 'clear'
>>> queue.Queue().queue.clear()
>>> import multiprocessing
>>> multiprocessing.Queue().clear()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
AttributeError: 'Queue' object has no attribute 'clear'
>>> multiprocessing.Queue().queue.clear()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
AttributeError: 'Queue' object has no attribute 'queue'

1
在普通队列的情况下,您可以通过normal_q.queue.clear()清除其内容。但是我需要使用多进程队列。谢谢回复。 - FelipeG
5
这不是对你根本问题的回答,但我觉得有必要指出你问题中的 import 语句会相互覆盖。如果你使用上面写的代码,multi_qnormal_q 都将成为常规的 Queue.Queue 实例。为了使其工作,你需要只导入模块并使用完全限定名称来调用类(例如:multi_q = multiprocessing.Queue()),或者使用 as 关键字在导入时为它们命名(例如:from Queue import Queue as qQueue)。 - Blckknght
5个回答

73

所以,我看了一下队列类(Queue class),你可以尝试这段代码:

while not some_queue.empty():
    some_queue.get()  # as docs say: Remove and return an item from the queue.

18
while not some_queue.empty(): 这个代码更符合 Python 风格。 - Jonathan
4
这个答案应该被选中,因为它实际上提供了解决方案,而不仅仅是说“没有直接的方法可以做到这一点”。 - exfizik
11
根据您的实现方式,当调用.empty()时队列可能不为空,但在调用.get()时它实际上已经为空,这会导致竞态条件。如果是这种情况,由于.get()是阻塞的,您的程序可能会挂起。使用.get_nowait()可以避免这种竞态条件。 - Manuel J. Diaz
8
警告:当调用not some_queue.empty()且在调用some_queue.get()之前,如果队列中只剩下一个项目,则会发生阻塞。这在多线程环境中是完全可能的。我的ClearableQueue解决方案避免了这种情况。 - Dan H
4
@PirateApp:是的,get_nowait()可以避免这个问题……但你必须捕获Empty异常……到那时,基本上你已经实现了我的解决方案,该方案位于https://dev59.com/JWQn5IYBdhLWcg3w7ayK#36018632中--如果你可以一直使用`get_nowait()`直到出现`Empty`,为什么还要检查`.empty()`呢? - Dan H
显示剩余3条评论

20

在征得许可之前请求原谅;尝试清空队列直到收到Empty异常,然后忽略该异常:

from Queue import Empty

def clear(q):
    try:
        while True:
            q.get_nowait()
    except Empty:
        pass
更好的方法是:内置类是否缺少您想要的方法?继承内置类,并添加您认为应该存在的方法!
from Queue import Queue, Empty

class ClearableQueue(Queue):

    def clear(self):
        try:
            while True:
                self.get_nowait()
        except Empty:
            pass

您的 ClearableQueue 类继承了内置 Queue 类的所有优点(和行为),并且具有您现在想要的方法。

只需在使用 Queue() 的所有位置上使用 q = ClearableQueue(),并在需要时调用 q.clear()


5
这段代码存在一个错误,因为在Python中,即使队列已满(是的,真的),get_nowait函数也可能抛出Empty异常。 - Geoffrey Irving
3
我想我会认为这是Python中的一个错误...而不是我的示例代码!说真的:你是在提到已记录的功能吗?还是你只是指出了由于在此方法返回时,可能已经有其他内容被推送到队列上而导致的竞态条件。如果是后者...好吧,是的,那样可能发生。多线程处理就是这样。但这仍然不是我提出的clear()实现的错误。 - Dan H
1
我在实践中发现,我需要这种方法而不是检查,因为多线程可以在“if”和“get”之间更改事物。 - Daniel Möller
1
根据文档(https://docs.python.org/3/library/multiprocessing.html#pipes-and-queues)所述,“multiprocessing使用通常的queue.Empty和queue.Full异常来表示超时”,因此我认为假设异常“queue.Empty”意味着队列为空是错误的。 - Stan
1
另外,如果要对multiprocessing.Queue进行子类化,这是行不通的,因为multiprocessing.Queue是一个函数而不是类,所以您需要对multiprocessing.queues.Queue进行子类化。 - Irv
显示剩余6条评论

3

清空 multiprocessing.Queue 没有直接的方法。

我认为最接近的方法是 close(),但它仅表示不会再向该队列推送更多数据,并在所有数据已刷新到管道时关闭它。


谢谢回复,我尝试使用close()方法,但问题是我有一个while循环,如下所示: <code> while not someQueue.empty(): 做一些事情 结束进程 </code> 因此,我想清空队列以便while循环结束。但是,如果我关闭队列,会引发错误。 - FelipeG

0

pipe(7) Linux 手册页面 指定管道具有有限的容量(默认为 65,536 字节),并且在向已满的管道写入数据时会阻塞,直到从管道中读取足够的数据以允许写入完成为止:

管道和FIFO上的I/O

[...]

如果一个进程试图从空管道中读取数据,那么read(2)将会阻塞直到有数据可用。如果一个进程试图向满管道中写入数据(见下文),那么write(2)将会阻塞直到足够的数据被从管道中读取以允许写入完成。可以通过使用fcntl(2)F_SETFL操作来启用O_NONBLOCK打开文件状态标志来实现非阻塞I/O。

[...]

管道容量

管道具有有限的容量。如果管道已满,则write(2)将会阻塞或失败,这取决于是否设置了O_NONBLOCK标志(见下文)。不同的实现对管道容量有不同的限制。应用程序不应该依赖于特定的容量:应该设计一个读取进程在数据可用时立即消耗数据的应用程序,以便写入进程不会保持阻塞。

在Linux 2.6.11之前的版本中,管道的容量与系统页面大小相同(例如,在i386上为4096字节)。自Linux 2.6.11以来,管道容量为16页(即,在页面大小为4096字节的系统中为65536字节)。自Linux 2.6.35以来,默认管道容量为16页,但可以使用fcntl(2)F_GETPIPE_SZF_SETPIPE_SZ操作查询和设置容量。有关更多信息,请参见fcntl(2)

这就是为什么multiprocessing Python库文档建议在生产者进程中使用Queue.join_thread调用(隐式地进行垃圾回收或显式地进行)之前,通过Queue.get调用使每个Queue对象的消费者进程为空:

Joining processes that use queues

Bear in mind that a process that has put items in a queue will wait before terminating until all the buffered items are fed by the “feeder” thread to the underlying pipe. (The child process can call the Queue.cancel_join_thread method of the queue to avoid this behaviour.)

This means that whenever you use a queue you need to make sure that all items which have been put on the queue will eventually be removed before the process is joined. Otherwise you cannot be sure that processes which have put items on the queue will terminate. Remember also that non-daemonic processes will be joined automatically.

An example which will deadlock is the following:

from multiprocessing import Process, Queue

def f(q):
    q.put('X' * 1000000)

if __name__ == '__main__':
    queue = Queue()
    p = Process(target=f, args=(queue,))
    p.start()
    p.join()                    # this deadlocks
    obj = queue.get()

A fix here would be to swap the last two lines (or simply remove the p.join() line).

在某些应用程序中,消费者进程可能不知道生产者进程已经向队列添加了多少项。在这种情况下,一种可靠的清空队列的方法是让每个生产者进程在完成时添加一个哨兵项,并让消费者进程删除项目(常规和哨兵项),直到它删除了与生产者进程数量相同的哨兵项为止:
import multiprocessing

def f(q, e):
    while True:
        q.put('X' * 1000000)  # block the feeder thread (size > pipe capacity)
        if e.is_set():
            break
    q.put(None)  # add a sentinel item

if __name__ == '__main__':
    start_count = 5
    stop_count = 0
    q = multiprocessing.Queue()
    e = multiprocessing.Event()
    for _ in range(start_count):
        multiprocessing.Process(target=f, args=(q, e)).start()
    e.set()  # stop producer processes
    while stop_count < start_count:
        if q.get() is None:  # empty the queue
            stop_count += 1  # count the sentinel items removed

此解决方案使用 阻塞式Queue.get 调用来清空队列。这可以确保 所有 项目都已添加到队列并已被移除。

@DanH 的解决方案 使用 非阻塞式Queue.get_nowait 调用来清空队列。该解决方案的问题在于,生产者进程仍然可以在消费者进程清空队列后向队列添加项目,这将导致死锁(消费者进程将等待生产者进程终止,每个生产者进程将等待其馈线程终止,每个生产者进程的馈线程将等待消费者进程移除添加到队列中的项目):

import multiprocessing.queues

def f(q):
    q.put('X' * 1000000)  # block the feeder thread (size > pipe capacity)

if __name__ == '__main__':
    q = multiprocessing.Queue()
    p = multiprocessing.Process(target=f, args=(q,))
    p.start()
    try:
        while True:
            q.get_nowait()
    except multiprocessing.queues.Empty:
        pass  # reached before the producer process adds the item to the queue
    p.join()  # deadlock

或者,如果与其作为属性一起提供的队列的同步资源在垃圾回收之前被清除,新创建的生产者进程可能无法反序列化消费者进程的Process对象,从而引发FileNotFoundError异常:

import multiprocessing.queues

def f(q):
    q.put('X' * 1000000)

if __name__ == '__main__':
    q = multiprocessing.Queue()
    multiprocessing.Process(target=f, args=(q,)).start()
    try:
        while True:
            q.get_nowait()
    except multiprocessing.queues.Empty:
        pass  # reached before the producer process deserialises the Process

标准错误:

Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "/usr/local/Cellar/python@3.9/3.9.12/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/spawn.py", line 116, in spawn_main
    exitcode = _main(fd, parent_sentinel)
  File "/usr/local/Cellar/python@3.9/3.9.12/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/spawn.py", line 126, in _main
    self = reduction.pickle.load(from_parent)
  File "/usr/local/Cellar/python@3.9/3.9.12/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/synchronize.py", line 110, in __setstate__
    self._semlock = _multiprocessing.SemLock._rebuild(*state)
FileNotFoundError: [Errno 2] No such file or directory

-1

我是一个新手,所以请不要对我生气,但是

为什么不重新定义.Queue()变量呢?

import multiprocessing as mp

q = mp.Queue()
chunk = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

for i in chunk:
    q.put(i)
print(q.empty())

q = mp.Queue()
print(q.empty())

我的输出:

>>False
>>True

我现在只是自学,如果我有错误,请随时指出


2
这是为什么不起作用的原因:1)如果您正在进行多进程处理,则意味着一个或多个进程正在向队列中添加内容,而一个或多个进程正在从队列中删除内容。2)您的代码没有清除第一个Queue实例。相反,在执行您的代码的进程中,您只是停止“关注”第一个Queue,创建了一个新的Queue,并将其分配给同一变量。3)因此,任何具有指向第一个Queue的指针的进程仍然将其视为非空。 - Dan H

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接