对于需要从不同任务中修改相同共享数据的代码进行并行化是困难的。因此,我假设setinner
和setouter
是非变异函数;如果这不是真的,事情会更加复杂。
第一步是决定您想要在并行中执行什么操作。
有一个显而易见的事情就是同时执行 setinner
和 setouter
。它们彼此完全独立,总是需要同时完成。因此,我将这样做。不再像这样操作:
inneropt,partition,x = setinner(Q,G,n)
outeropt = setouter(Q,G,n)
我们希望将这两个函数作为任务提交给进程池,然后等待两者都完成,最后获取两者的结果。
concurrent.futures
模块(在 Python 2.x 中需要第三方后备)使得像“等待两者都完成”这样的操作比 multiprocessing
模块更容易(在 2.6+ 的 stdlib 中),但在这种情况下,我们不需要任何花哨的功能;如果其中一个提前完成,我们也没有什么事可做,直到另一个完成。因此,让我们使用 multiprocessing.apply_async
:
pool = multiprocessing.Pool(2)
while i < 1000:
inner_result = pool.apply_async(setinner, (Q, G, n))
outer_result = pool.apply_async(setouter, (Q, G, n))
inneropt,partition,x = inner_result.get()
outeropt = outer_result.get()
你可能希望将池移到函数外面,这样它就可以永久存在并被代码的其他部分使用。如果不是这样,你几乎肯定希望在函数结束时关闭池。(后来的multiprocessing
版本允许你在with
语句中直接使用池,但我认为这需要Python 3.2+,所以你必须明确地这样做。)
如果您想要更多并行工作怎么办?嗯,在不重构循环的情况下,这里没有其他明显的事情可做。在从setinner和setouter获取结果之前,您不能执行updateGraph,而这里没有其他慢的地方。
但是,如果您可以重新组织每个循环的setinner,使其独立于之前的所有内容(根据您的算法可能或可能不可能 - 我不知道您在做什么,所以无法猜测),则可以将2000个任务一次性推送到队列中,然后通过仅在需要时获取结果来循环。例如:
pool = multiprocessing.Pool()
inner_results = []
outer_results = []
for _ in range(1000):
inner_results.append(pool.apply_async(setinner, (Q,G,n,i))
outer_results.append(pool.apply_async(setouter, (Q,G,n,i))
while i < 1000:
inneropt,partition,x = inner_results.pop(0).get()
outeropt = outer_results.pop(0).get()
当然,你可以让它更加复杂。
例如,假设你很少需要超过几百次迭代,那么总是计算1000次就会浪费资源。你可以在启动时只推送前N个,并在每次循环时推送一个(或每N次推送N个),这样你永远不会做多于N个的无用迭代——你不能得到完美并行和最小化浪费之间的理想权衡,但通常你可以调整得很好。
此外,如果任务实际上并不需要那么长时间,但你有很多任务,你可能想要将它们批量处理。一种非常简单的方法是使用其中一种map
变体而不是apply_async
;这可能会使你的获取代码稍微复杂一些,但它使排队和批处理代码完全变得轻松(例如,对于具有10个参数的100个参数列表中的每个func
进行chunksize
映射只需要两行简单的代码)。
Q,G,n
对象很大吗?您是否在每次迭代中更新它们的几乎所有值(我假设它们是复合对象),还是仅更新其中的一小部分? - jfs