如何在Python中使用多进程技术

5

我刚学Python,想要在下面的代码中进行并行编程,并且想使用Python中的multiprocessing实现。那么该如何修改代码呢?我一直在搜索使用Pool的方法,但发现可以跟随的示例很有限。有没有人能帮助我?谢谢。

请注意,setinner和setouter是两个独立的函数,我想在这里使用并行编程来减少运行时间。

def solve(Q,G,n):
    i = 0
    tol = 10**-4

    while i < 1000:

        inneropt,partition,x = setinner(Q,G,n)
        outeropt = setouter(Q,G,n)

        if (outeropt - inneropt)/(1 + abs(outeropt) + abs(inneropt)) < tol:
            break

        node1 = partition[0]
        node2 = partition[1]

        G = updateGraph(G,node1,node2)
        if i == 999:
            print "Maximum iteration reaches"
    print inneropt

3
那段代码的缩进不正确,会导致语法错误。你能否更正一下代码? - user764357
1
顺便提一下,这段代码似乎可能使用了一些矩阵或图形库,这些库可能是作为C扩展模块构建的,并且可能在没有持有GIL的情况下运行缓慢操作(例如NumPy),因此您可以只使用“线程”而不是“多进程”,甚至在适当时自动进行数据并行处理(例如SciPy或KDT的某些部分,如果您已经正确配置和安装了先决条件),在这种情况下,“多进程”实际上会减慢您的速度... - abarnert
这三个函数(setinnner,setouter,updateGraph)的相对运行时间是多少?Q,G,n 对象很大吗?您是否在每次迭代中更新它们的几乎所有值(我假设它们是复合对象),还是仅更新其中的一小部分? - jfs
1个回答

1

对于需要从不同任务中修改相同共享数据的代码进行并行化是困难的。因此,我假设setinnersetouter是非变异函数;如果这不是真的,事情会更加复杂。

第一步是决定您想要在并行中执行什么操作。


有一个显而易见的事情就是同时执行 setinnersetouter。它们彼此完全独立,总是需要同时完成。因此,我将这样做。不再像这样操作:

inneropt,partition,x = setinner(Q,G,n)
outeropt = setouter(Q,G,n)

我们希望将这两个函数作为任务提交给进程池,然后等待两者都完成,最后获取两者的结果。

concurrent.futures 模块(在 Python 2.x 中需要第三方后备)使得像“等待两者都完成”这样的操作比 multiprocessing 模块更容易(在 2.6+ 的 stdlib 中),但在这种情况下,我们不需要任何花哨的功能;如果其中一个提前完成,我们也没有什么事可做,直到另一个完成。因此,让我们使用 multiprocessing.apply_async

pool = multiprocessing.Pool(2) # we never have more than 2 tasks to run
while i < 1000:
    # parallelly start both tasks
    inner_result = pool.apply_async(setinner, (Q, G, n))
    outer_result = pool.apply_async(setouter, (Q, G, n))

    # sequentially wait for both tasks to finish and get their results
    inneropt,partition,x = inner_result.get()
    outeropt = outer_result.get()

    # the rest of your loop is unchanged

你可能希望将池移到函数外面,这样它就可以永久存在并被代码的其他部分使用。如果不是这样,你几乎肯定希望在函数结束时关闭池。(后来的multiprocessing版本允许你在with语句中直接使用池,但我认为这需要Python 3.2+,所以你必须明确地这样做。)


如果您想要更多并行工作怎么办?嗯,在不重构循环的情况下,这里没有其他明显的事情可做。在从setinner和setouter获取结果之前,您不能执行updateGraph,而这里没有其他慢的地方。
但是,如果您可以重新组织每个循环的setinner,使其独立于之前的所有内容(根据您的算法可能或可能不可能 - 我不知道您在做什么,所以无法猜测),则可以将2000个任务一次性推送到队列中,然后通过仅在需要时获取结果来循环。例如:
pool = multiprocessing.Pool() # let it default to the number of cores
inner_results = []
outer_results = []
for _ in range(1000):
    inner_results.append(pool.apply_async(setinner, (Q,G,n,i))
    outer_results.append(pool.apply_async(setouter, (Q,G,n,i))
while i < 1000:
    inneropt,partition,x = inner_results.pop(0).get()
    outeropt = outer_results.pop(0).get()
    # result of your loop is the same as before

当然,你可以让它更加复杂。

例如,假设你很少需要超过几百次迭代,那么总是计算1000次就会浪费资源。你可以在启动时只推送前N个,并在每次循环时推送一个(或每N次推送N个),这样你永远不会做多于N个的无用迭代——你不能得到完美并行和最小化浪费之间的理想权衡,但通常你可以调整得很好。

此外,如果任务实际上并不需要那么长时间,但你有很多任务,你可能想要将它们批量处理。一种非常简单的方法是使用其中一种map变体而不是apply_async;这可能会使你的获取代码稍微复杂一些,但它使排队和批处理代码完全变得轻松(例如,对于具有10个参数的100个参数列表中的每个func进行chunksize映射只需要两行简单的代码)。


1
对于一些明确定义的用例,例如将两个函数设置为异步运行的情况,lelo助手包可能会有所帮助 - (在这种情况下,它可能只需简单地装饰setinner和setoutter):https://pypi.python.org/pypi/lelo/1.0rc3dev - jsbueno
@jsbueno:看起来很酷。我担心Python中的自动惰性对象,因为实际上不可能覆盖Python中的每个有用表达式,所以你最终可能会得到minval < lazy_result没有实际评估结果的情况。(在C++中,这不是问题,因为(a)您可以重载的事物包括赋值、复制和转换为T&,(b)静态类型通常以Python无法强制执行的方式强制执行严格性。)但显然它在OP的情况下有效,因为他立即解压了一个,并将另一个传递给了abs... - abarnert
@jsbueno:实际上,仔细想想...在OP的情况下,将第一个结果解包成三个变量显然会阻塞,这意味着他需要像我的答案一样重写代码(执行操作,然后获取结果)才能获得任何并行性。不过,只需颠倒两个调用就可以快速解决这个问题。 - abarnert
@abanert - 我不同意每个有用的表达式都不能被覆盖。包“lelo”可能存在错误 - 但是在Python中使用一个值,必须经过“dunder”(__NAME__)方法之一。纯赋值是不可覆盖的,但它们所有赋值所做的就是将延迟对象本身绑定到另一个名称上。当要打印、比较、序列化时,它的值将被获取。 - jsbueno

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接