共享一个结果队列给多个进程使用

123

multiprocessing模块的文档展示了如何将队列传递给使用multiprocessing.Process启动的进程。但是我如何与使用apply_async启动的异步工作进程共享一个队列呢?我不需要动态加入或其他任何东西,只需要一种方法让工作进程(反复)向基础进程报告其结果。

import multiprocessing
def worker(name, que):
    que.put("%d is done" % name)

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=3)
    q = multiprocessing.Queue()
    workers = pool.apply_async(worker, (33, q))

这会导致错误: RuntimeError: Queue objects should only be shared between processes through inheritance。 我理解了这个意思,并且我也理解了继承而不是要求pickle / unpickle(以及所有特殊的Windows限制)的建议。但是我该如何以能够正常工作的方式传递队列呢?我找不到示例,并且我尝试了几种失败了的替代方法。请帮忙?


对于未来想要与工作进程共享队列但不使用管理器并牺牲性能的读者来说,此答案可能值得一看。可以尝试使用这个答案中提到的方法创建可序列化的队列 https://dev59.com/PsXsa4cB1Zd3GeqPusKu#75247561 - Charchit Agarwal
@charchit,谢谢!不冒犯,但是你在链接答案中的解决方案看起来很复杂:大量代码,评论中的长讨论,以OP的“免责声明”结束。对于实际编写具有无数请求的电子商务网站的人来说,您的解决方案是否真的有一个显著的性能提升,可以证明所有这些努力都是值得的?在这里接受的答案中的6行代码几乎是最简单的。 - alexis
相对而言,引入管理器在您的代码中会对性能产生明显影响。首先,在使用Manager.Queue()时,每个对象都需要进行两次pickle/unpickle操作,而不是普通队列的一次(一次用于发送到/从管理进程,另一次用于检索/放置对象在队列上)。其次,每个托管对象上的方法调用在调用方法之前需要花费1000倍的时间来解析。这些问题成为性能敏感应用程序的主要瓶颈,但实际上可能并不那么严重(1/2)。 - Charchit Agarwal
这是因为,绝对来说,即使在托管对象上名称解析慢了1000倍,它仍然只会引入0.001秒的延迟,即使双重pickling / unpickling可能成为定期将大项目放入队列的代码的严重瓶颈,如果您要放入队列的只是小字符串和文本,则其影响实际上可能是可以忽略不计的。因此,是否值得额外麻烦地使可picklable队列取决于您的用例的技术细节,但对于大多数情况,最好只使用Manager.Queue()(2/2)。 - Charchit Agarwal
感谢您仔细的解释。 "比较而言",执行时间可能增加10倍,只要我的任务在几秒钟内完成...甚至几分钟,考虑到替代方案的复杂性(好吧,是感知上的复杂性 - 您的解决方案有点可怕 :-D)。简而言之,我并不是要反对,但如果我真的需要这个,我会等待实际基准测试结果出来再做决定。 - alexis
2个回答

166

尝试使用multiprocessing.Manager来管理您的队列,并使其可以被不同的工作进程访问。

import multiprocessing
def worker(name, que):
    que.put("%d is done" % name)

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=3)
    m = multiprocessing.Manager()
    q = m.Queue()
    workers = pool.apply_async(worker, (33, q))

26
为什么queue.Queue()不适合这个任务? - mrgloom
6
queue.Queue 是为线程而建的,使用内存锁。在多进程环境中,每个子进程将获得自己内存空间中的 queue.Queue() 实例的副本,因为子进程不共享内存(大多数情况下)。 - LeoRochael
1
@alexis 在多个工作线程将数据插入Manager().Queue()之后,如何从中获取元素? - MSS
@LeoRochael,这并没有解释为什么在多进程环境中使用继承时,那些queue.Queue仍然可用,我指的是Process而不是Pool。 - g.pickardou
显示剩余2条评论

26

multiprocessing.Pool已经有一个共享的结果队列,因此没有必要再额外使用Manager.QueueManager.Queue在幕后是queue.Queue(多线程队列),位于单独的服务器进程上并通过代理公开。与Pool内部队列相比,这会增加额外的开销。与依赖于Pool本身的结果处理不同,Manager.Queue中的结果也不能保证顺序。

工作进程不是使用.apply_async()启动的,这已经在实例化Pool时发生。当您调用pool.apply_async()时启动了一个新的“任务” 。 Pool的工作进程在幕后运行multiprocessing.pool.worker函数。此函数负责处理传输到Pool的内部Pool._inqueue的新“任务”,并将结果发送回父进程通过Pool._outqueue。指定的func将在multiprocessing.pool.worker内执行。 func只需return某些内容即可自动将结果发送回父进程。

.apply_async()立即(异步)返回AsyncResult对象(别名为ApplyResult)。您需要在该对象上调用.get()(阻止)才能接收实际结果。另一个选择是注册回调函数,它会在结果准备就绪时触发。

from multiprocessing import Pool

def busy_foo(i):
    """Dummy function simulating cpu-bound work."""
    for _ in range(int(10e6)):  # do stuff
        pass
    return i

if __name__ == '__main__':

    with Pool(4) as pool:
        print(pool._outqueue)  # DEMO
        results = [pool.apply_async(busy_foo, (i,)) for i in range(10)]
        # `.apply_async()` immediately returns AsyncResult (ApplyResult) object
        print(results[0])  # DEMO
        results = [res.get() for res in results]
        print(f'result: {results}')       

实例输出:

<multiprocessing.queues.SimpleQueue object at 0x7fa124fd67f0>
<multiprocessing.pool.ApplyResult object at 0x7fa12586da20>
result: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

注意:对于.get()指定timeout参数不会停止worker实际处理任务,它只通过引发multiprocessing.TimeoutError来解除等待的父进程的阻塞。


有趣,我会在第一时间尝试它。在2012年肯定不是这样工作的。 - alexis
@alexis Python 2.7(2010年)在这里相关的只是缺少上下文管理器和apply_asyncerror_callback参数,因此它并没有改变太多。 - Darkonaut
我发现回调函数是最有用的,尤其是与部分函数结合使用以允许使用常规列表来收集异步结果,如此处所述:https://gist.github.com/Glench/5789879 - user5359531
如果你想从子进程中获取数据流而不是单个结果,那么这种方法行不通。 - user48956

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接