将 multiprocessing.Queue 转化为列表

32

我希望将一个 multiprocessing.Queue 转换成列表。为此,我编写了以下函数:

import Queue

def dump_queue(queue):
    """
    Empties all pending items in a queue and returns them in a list.
    """
    result = []

    # START DEBUG CODE
    initial_size = queue.qsize()
    print("Queue has %s items initially." % initial_size)
    # END DEBUG CODE

    while True:
        try:
            thing = queue.get(block=False)
            result.append(thing)
        except Queue.Empty:

            # START DEBUG CODE
            current_size = queue.qsize()
            total_size = current_size + len(result)
            print("Dumping complete:")
            if current_size == initial_size:
                print("No items were added to the queue.")
            else:
                print("%s items were added to the queue." % \
                      (total_size - initial_size))
            print("Extracted %s items from the queue, queue has %s items \
            left" % (len(result), current_size))
            # END DEBUG CODE

            return result

但出于某种原因它不起作用。

请观察以下的shell会话:

>>> import multiprocessing
>>> q = multiprocessing.Queue()
>>> for i in range(100):
...     q.put([range(200) for j in range(100)])
... 
>>> q.qsize()
100
>>> l=dump_queue(q)
Queue has 100 items initially.
Dumping complete:
0 items were added to the queue.
Extracted 1 items from the queue, queue has 99 items left
>>> l=dump_queue(q)
Queue has 99 items initially.
Dumping complete:
0 items were added to the queue.
Extracted 3 items from the queue, queue has 96 items left
>>> l=dump_queue(q)
Queue has 96 items initially.
Dumping complete:
0 items were added to the queue.
Extracted 1 items from the queue, queue has 95 items left
>>> 
这里发生了什么?为什么没有倾倒所有的物品?

这里发生了什么?为什么没有倾倒所有的物品?

2个回答

29

试试这个:

import Queue
import time

def dump_queue(queue):
    """
    Empties all pending items in a queue and returns them in a list.
    """
    result = []

    for i in iter(queue.get, 'STOP'):
        result.append(i)
    time.sleep(.1)
    return result

import multiprocessing
q = multiprocessing.Queue()
for i in range(100):
    q.put([range(200) for j in range(100)])
q.put('STOP')
l=dump_queue(q)
print len(l)

多进程队列有一个内部缓冲区,其中有一个饲料线程从缓冲区中取出工作并将其刷新到管道中。如果并非所有对象都已被刷新,则可能会提前引发Empty异常。使用哨兵表示队列结束是安全(且可靠)的。此外,使用iter(get, sentinel)习语比依赖于Empty更好。

我不喜欢它可能由于刷新时间而引发Empty异常(我添加了time.sleep(.1)以允许与饲料线程进行上下文切换,您可能不需要它,没有它也能正常运行 - 这是一个释放GIL锁的习惯)。


3
杰西,你的想法不错,但为了更加安全可靠,最好使用 uuid 字符串作为哨兵(或者对于线程而非多进程,可以使用特定的 sentinel=object()),而不是通用字符串。即便如此,如果另一个线程同时访问,则仍然可能出现问题;唯一真正安全的方式是依赖于队列内部机制,可惜这是唯一的方法!-) - Alex Martelli
你说得对。我采用了一个字符串哨兵的“快速”解决方案,但这只适用于特定情况。我开始怀疑mp.queue是否需要在队列中构建一些哨兵支持。 - jnoller
1
谢谢你的回答。我今天遇到了一个类似的问题,这个回复帮助我解决了。完整的问题描述在这里:http://www.bryceboe.com/2011/01/28/the-python-multiprocessing-queue-and-large-objects/ - bboe

3
# in theory:
def dump_queue(q):
    q.put(None)
    return list(iter(q.get, None))

# in practice this might be more resilient:
def dump_queue(q):
    q.put(None)
    return list(iter(lambda : q.get(timeout=0.00001), None))

# but neither case handles all the ways things can break
# for that you need 'managers' and 'futures' ... see Commentary

我喜欢使用None作为哨兵,但我倾向于支持jnoller的观点,认为mp.queue可以使用一个安全而简单的哨兵。他对提前引发空值风险的评论也是有效的,请参见下文。
评论:
这是老问题了,Python已经改变了,但是如果你在MP Python中遇到列表<->队列的问题,这确实会成为一个障碍。所以,让我们深入了解一下:
首先,这不是一个错误,而是一个特性:https://bugs.python.org/issue20147。为了节省您阅读该讨论和文档中更多细节的时间,以下是一些要点(有点哲学,但我认为这可能会帮助一些刚开始使用MP / MT的Python):
  • MP队列是具有跨不同线程、不同进程以及实际上可以跨不同(网络)计算机进行通信的结构。
  • 一般而言,在并行/分布式系统中,严格的同步是昂贵的,因此每次使用任何MP / MT数据结构的API的部分时,您需要查看文档以了解它承诺做什么或不做什么。提示:如果函数不包括"lock"或"semaphore"或"barrier"等单词,则会是"异步"和"尽力而为"(近似),或者你可以称之为"不可靠的"。
  • 特别针对这种情况:Python是一种解释型语言,具有著名的单个解释器线程和其著名的"全局解释器锁定(GIL)"。 如果您的整个程序是单进程、单线程的,则一切都很顺利。 如果不是(使用MP),您需要给解释器一些空间。 time.sleep() 是您的朋友。 在这种情况下,超时。
在你的解决方案中,你仅仅使用了不可靠的函数——get()和qsize()。而且事实上,代码比你想象的更糟糕——如果增加队列和对象的大小,很可能会出现问题。

enter image description here

现在,您可以处理不稳定的例程,但需要给它们活动的空间。在您的示例中,您只是在强行使用队列。您只需要更改行thing = queue.get(block=False),改为thing = queue.get(block=True,timeout=0.00001)即可。

时间0.00001被精心选择(10^-5),这是您可以安全设置的最小值(这就是艺术与科学相遇的地方)。

一些关于为什么需要超时的评论:这与MP队列工作原理的内部相关。当你将某物放入MP队列中时,它实际上并没有被放入队列,而是排队等待最终到达。这就是为什么qsize()会给出正确结果的原因——代码的这一部分知道队列中有一堆东西。你只需要意识到,在队列中的对象和“我现在可以读取它”不是同一件事情。把MP队列想象成使用USPS或FedEx发送信件——你可能会得到一个收据和跟踪号码,显示“它已经寄出了”,但收件人还不能打开它。现在,更具体地说,在你的情况下,你立即可以访问0个项目。这是因为你运行的单个解释器线程还没有机会处理那些“排队等待”的东西,所以你的第一个循环只是为队列排队了一堆东西,但你立即强制你的单个线程尝试在甚至没有为你排列一个对象的情况下执行get()。
有人可能会认为这些超时会减慢代码运行速度。实际上,MP队列是重量级构造,你应该只在传递相当耗费资源的“东西”时使用它们,要么是庞大的数据块,要么是至少复杂的计算。添加10^-5秒的操作实际上是给解释器一个执行线程调度的机会——此时它将看到你积压的put()操作。

注意

上述说法并非完全正确,这(可以说)是get()函数设计的一个问题。将timeout设置为非零值的语义是,get()函数不会在返回Empty之前阻塞超过这个时间。但它实际上可能不是Empty(尚未)。因此,如果你知道你的队列里有一堆东西要获取,那么第二种解决方案更好,甚至可以使用更长的超时时间。我个人认为他们应该保持timeout=0的行为,但是有一些实际内置的1e-5容差,因为很多人会对MP结构中的gets和puts发生的事情感到困惑。
在你的示例代码中,实际上你并没有启动并行进程。如果我们这样做,那么你会开始得到一些随机结果-有时只会删除一些队列对象,有时它会挂起,有时它会崩溃,有时会发生多个事情。在下面的示例中,一个进程崩溃了,另一个进程挂起了:

enter image description here

问题在于插入哨兵时需要知道队列已经完成。这应该作为队列逻辑的一部分来完成-例如,如果您有一个经典的主从设计,则主节点在最后一个任务被添加时需要推送哨兵(结束)。否则,将出现竞争条件。
“正确”的(有弹性的)方法是涉及到管理器期货
import multiprocessing
import concurrent.futures

def fill_queue(q):
    for i in range(5000):
        q.put([range(200) for j in range(100)])

def dump_queue(q):
    q.put(None)
    return list(iter(q.get, None))

with multiprocessing.Manager() as manager:
    q = manager.Queue()
    with concurrent.futures.ProcessPoolExecutor() as executor:
        executor.submit(fill_queue, q)  # add stuff
        executor.submit(fill_queue, q)  # add more stuff
        executor.submit(fill_queue, q)  # ... and more
        
    # 'step out' of the executor
    l = dump_queue(q)

# 'step out' of the manager
print(f"Saw {len(l)} items")

让经理处理您的MP构造(队列,字典等),在其中让未来处理您的进程(如果需要,在其中让另一个未来处理线程)。这确保了在“展开”工作时清理事物。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接