如何将队列引用传递给由pool.map_async()管理的函数?

43

我希望一个长时间运行的进程可以通过队列(或类似的东西)返回其进度,然后我将提供给进度条对话框。当进程完成时我还需要它的结果。这里的测试示例失败了,出现了RuntimeError: Queue objects should only be shared between processes through inheritance

import multiprocessing, time

def task(args):
    count = args[0]
    queue = args[1]
    for i in xrange(count):
        queue.put("%d mississippi" % i)
    return "Done"

def main():
    q = multiprocessing.Queue()
    pool = multiprocessing.Pool()
    result = pool.map_async(task, [(x, q) for x in range(10)])
    time.sleep(1)
    while not q.empty():
        print q.get()
    print result.get()

if __name__ == "__main__":
    main()

我已经能够使用单独的Process对象使其工作(在那里我被允许传递Queue引用),但是那时候我没有池来管理我想要启动的多个进程。是否有更好的模式建议?


这不是对你问题的答案,但是你可以尝试使用execnet库http://codespeak.net/execnet/来进行多进程映射。内置的multiprocessing还有一些问题需要修复(请参考Python追踪器)。除此之外,它的源代码相当庞大且复杂。对我来说,execnet库看起来比multiprocessing要好得多。 - Andrey Vlasovskikh
2个回答

58

以下代码似乎可以工作:

import multiprocessing, time

def task(args):
    count = args[0]
    queue = args[1]
    for i in xrange(count):
        queue.put("%d mississippi" % i)
    return "Done"


def main():
    manager = multiprocessing.Manager()
    q = manager.Queue()
    pool = multiprocessing.Pool()
    result = pool.map_async(task, [(x, q) for x in range(10)])
    time.sleep(1)
    while not q.empty():
        print q.get()
    print result.get()

if __name__ == "__main__":
    main()

请注意,队列是从 manager.Queue() 获得的,而不是 multiprocessing.Queue()。感谢 Alex 指引我朝着正确方向前进。


+1,只是想简单地说一下,你的问题帮助了我解决今天遇到的一个问题。我找到了队列的 Manager 版本,但我的代码没有工作,因为我依赖于全局变量。它需要像你所做的那样作为参数传递。 - winwaed

8
使 q 成为全局变量的方法...:
import multiprocessing, time

q = multiprocessing.Queue()

def task(count):
    for i in xrange(count):
        q.put("%d mississippi" % i)
    return "Done"

def main():
    pool = multiprocessing.Pool()
    result = pool.map_async(task, range(10))
    time.sleep(1)
    while not q.empty():
        print q.get()
    print result.get()

if __name__ == "__main__":
    main()

如果您需要多个队列,例如为了避免混淆各个池进程的进度,全局队列列表应该可以解决问题(当然,每个进程都需要知道在列表中使用哪个索引,但将其作为参数传递是可以的;-)。

2
@David,你可以尝试一下;如果你的真实代码不能以这种简单的方式工作,那么你需要提高复杂度并选择一个管理器(它可以为你提供队列代理等)。 - Alex Martelli
这似乎根本不起作用。在我的机器上,q从未返回任何内容,q.empty()始终为True。即使我将睡眠调用增加到10秒,这应该是任务将几条消息放入队列的过度时间,q.empty始终返回True。 - David
是的,我复制了您发布的代码,虽然结果被返回(我在列表中得到了10个“完成”),但队列从未返回任何内容。调试器显示q始终返回q.empty() == True。Windows 7,ActivePython 2.6.5.12 - David
@David,正如我所说,在Mac上运行良好(没有Windows可以检查)。噢,那看起来经理是您唯一可行的选择。 - Alex Martelli
1
这个在 Mac 上运行而不是在 Windows 上运行有一个很好的原因。在 Windows 中,创建新进程的默认上下文是 spawn,因为 Fork 不可用。 - micsthepick
显示剩余2条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接