Python中的队列和可连接队列有什么区别?

28

在使用Python的multiprocessing模块时,有两种队列:

  • Queue
  • JoinableQueue。

它们之间有什么区别?

Queue

from multiprocessing import Queue
q = Queue()
q.put(item) # Put an item on the queue
item = q.get() # Get an item from the queue

可连接队列

from multiprocessing import JoinableQueue
q = JoinableQueue()
q.task_done() # Signal task completion
q.join() # Wait for completion
2个回答

15

JoinableQueue具有join()task_done()方法,而Queue没有。


class multiprocessing.Queue( [maxsize] )

返回使用管道和一些锁/信号量实现的进程共享队列。当进程第一次将一个项目放入队列中时,会启动一个喂饱线程,该线程将对象从缓冲区传输到管道中。

为了表示超时,引发标准库 Queue 模块的常规 Queue.Empty 和 Queue.Full 异常。

Queue 实现了 Queue.Queue 的所有方法,但不包括 task_done() 和 join() 方法。


JoinableQueue是Queue的子类,它除了具有Queue的功能外,还有task_done()和join()方法。 指示先前排队的任务已完成。由队列消费者线程使用。每个用于获取任务的get()调用之后,对task_done()的后续调用告诉队列处理该任务已完成。
如果join()当前正在阻塞,则当所有项目都已处理(也就是说,已为放入队列的每个项目收到了task_done()调用)时,它将恢复。
如果调用次数超过放置在队列中的项目数量,则引发ValueError。 阻塞,直到队列中的所有项目都被获取并处理完毕。
未完成任务的计数会随着向队列添加项目而增加。当消费者线程调用task_done()表示已检索到该项并且所有工作已完成时,计数会减少。当未完成任务的计数降至零时,join()解除阻塞。
如果您使用JoinableQueue,则必须为从队列中删除的每个任务调用JoinableQueue.task_done(),否则用于计算未完成任务数量的信号量可能最终会溢出,导致异常。

你知道溢出值是多少,或者如何找到它吗?文档上没有说明。 - Arthur Dent
23
除了引用文档之外,这个答案还有什么价值?提供一个示例和一些解释会更好。 - Xavier Bourret Sicotte
2
能否添加一个示例代码呢?这将会改善答案。目前它对我帮助不大,抱歉。 - buhtz

6
根据文档,很难确定Queue是否真的为空。使用JoinableQueue,您可以通过调用q.join()等待队列为空。在需要将工作完成为不同批次的情况下,在每个批次结束时进行离散操作,这可能是有帮助的。
例如,也许您通过队列每次处理1000个项目,然后向用户发送推送通知,告诉他们您已经完成了另一个批次。这对于普通的Queue来说是具有挑战性的。
它可能看起来像:
import multiprocessing as mp

BATCH_SIZE = 1000
STOP_VALUE = 'STOP'

def consume(q):
  for item in iter(q.get, STOP_VALUE):
    try:
      process(item)
    # Be very defensive about errors since they can corrupt pipes.
    except Exception as e:
      logger.error(e)
    finally:
      q.task_done()

q = mp.JoinableQueue()
with mp.Pool() as pool:
  # Pull items off queue as fast as we can whenever they're ready.
  for _ in range(mp.cpu_count()):
    pool.apply_async(consume, q)
  for i in range(0, len(URLS), BATCH_SIZE):
    # Put `BATCH_SIZE` items in queue asynchronously.
    pool.map_async(expensive_func, URLS[i:i+BATCH_SIZE], callback=q.put)
    # Wait for the queue to empty.
    q.join()
    notify_users()
  # Stop the consumers so we can exit cleanly.
  for _ in range(mp.cpu_count()):
    q.put(STOP_VALUE)

注意:我实际上没有运行过这段代码。如果你从队列中取出项目的速度比放入更快,那么你可能会提前完成。在这种情况下,此代码至少每1000个项目发送一次更新,也可能更频繁。对于进度更新,这可能是可以接受的。如果确切地需要1000个,则可以使用 mp.Value('i', 0) 并检查当你的 join 释放时是否为1000。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接