Python多进程队列故障

10

我创建了100个子进程

proc_list = [
    Process(target = simulator, args=(result_queue,))
    for i in xrange(100)]

并启动它们

for proc in proc_list: proc.start()

每个进程在处理一些数据后,将10000个元组放入结果队列(multiprocessing.Queue的一个实例)。

def simulate(alg_instance, image_ids, gamma, results,
                     simulations, sim_semaphore):
  (rs, qs, t_us) =  alg_instance.simulate_multiple(image_ids, gamma,
                                             simulations)
  all_tuples = zip(rs, qs, t_us)
  for result in all_tuples:
    results.put(result)
  sim_semaphore.release()

我本应该从队列中获取1000000个元组,但在多次运行后,我得到了以下(样本)大小: 14912 19563 12952 13524 7487 18350 15986 11928 14281 14282 有什么建议吗?


你确定 "simulate()" 实际上返回了 10,000 个元组吗? - jterrace
是的。我已经进行了广泛的测试... - user1451817
4
你是否将每个进程加入以确保等待它们全部完成? - K. Brafford
3
给变量命名为“tuple”是一个非常糟糕的想法。“tuple”是一种基本类型,你不应该用其他东西替换它在你的命名空间中。 - K. Brafford
@K.Brafford 我并没有真正给它们命名为元组 - 只是写了函数的草图。我已经更新了函数代码。 - user1451817
3个回答

23

我解决多进程问题的方法几乎总是使用管理器对象。虽然暴露的接口相同,但底层实现更简单,出错也更少。

from multiprocessing import Manager
manager = Manager()
result_queue = manager.Queue()

试一试,看看它是否能解决你的问题。


2
我开始对multiprocessing.Queue感到失去耐心,想象中一定是我犯了什么错误,但实际上没有任何改变,只是将其替换为manager.Queue,然后它就完美地工作了:--非常感谢您的建议。 - Hrvoje Špoljar

6

multiprocessing.Queue在文档中被称为线程安全的。但是,在使用Queue进行进程间通信时,应该使用multiprocessing.Manager().Queue()。


2

从原帖中没有证据表明multiprocessing.Queue不起作用。原帖中发布的代码并不足以理解正在发生什么:他们是否加入了所有进程?他们是否正确地将队列传递给子进程(如果在Windows上,则必须作为参数)?他们的子进程是否验证了它们实际上获得了10000个元组等。

有可能OP真的遇到了一个难以复制的mp.Queue错误,但考虑到CPython经过了大量的测试,并且我刚刚运行了100个进程x 10000个结果而没有任何问题,我怀疑OP实际上在自己的代码中遇到了一些问题。

是的,其他答案中提到的Manager().Queue()是共享数据的完全可靠的方法,但没有理由基于未经证实的报告“有问题”而避免使用multiprocessing.Queue()


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接