如何使用Python多进程利用所有核心

36

我已经在使用Python的multiprocessing功能上研究了一个小时以上,尝试使用multiprocessing.Processmultiprocessing.Manager来并行化一个相当复杂的图遍历函数:

import networkx as nx
import csv
import time 
from operator import itemgetter
import os
import multiprocessing as mp

cutoff = 1

exclusionlist = ["cpd:C00024"]

DG = nx.read_gml("KeggComplete.gml", relabel=True)

for exclusion in exclusionlist:
    DG.remove_node(exclusion)

# checks if 'memorizedPaths exists, and if not, creates it
fn = os.path.join(os.path.dirname(__file__),
                  'memorizedPaths' + str(cutoff+1))
if not os.path.exists(fn):
    os.makedirs(fn)

manager = mp.Manager()
memorizedPaths = manager.dict()
filepaths = manager.dict()
degreelist = sorted(DG.degree_iter(),
                    key=itemgetter(1),
                    reverse=True)

def _all_simple_paths_graph(item, DG, cutoff, memorizedPaths, filepaths):
    source = item[0]
    uniqueTreePaths = []

    if cutoff < 1:
        return

    visited = [source]
    stack = [iter(DG[source])]

    while stack:
        children = stack[-1]
        child = next(children, None)

        if child is None:
            stack.pop()
            visited.pop()
        elif child in memorizedPaths:
            for path in memorizedPaths[child]:
                newPath = (tuple(visited) + tuple(path))
                if (len(newPath) <= cutoff) and
                    (len(set(visited) & set(path)) == 0):
                    uniqueTreePaths.append(newPath)
            continue
        elif len(visited) < cutoff:
            if child not in visited:
                visited.append(child)
                stack.append(iter(DG[child]))

                if visited not in uniqueTreePaths:
                    uniqueTreePaths.append(tuple(visited))
        else: # len(visited) == cutoff:
            if (visited not in uniqueTreePaths) and
                (child not in visited):
                uniqueTreePaths.append(tuple(visited + [child]))
            stack.pop()
            visited.pop()
    # writes the absolute path of the node path file into the hash table
    filepaths[source] = str(fn) + "/" + str(source) + "path.txt"
    with open (filepaths[source], "wb") as csvfile2:
        writer = csv.writer(csvfile2, delimiter=" ", quotechar="|")
        for path in uniqueTreePaths:
            writer.writerow(path)

    memorizedPaths[source] = uniqueTreePaths

############################################################################

if __name__ == '__main__':
    start = time.clock()

    for item in degreelist:
        test = mp.Process(target=_all_simple_paths_graph,
                          args=(DG, cutoff, item, memorizedPaths, filepaths))
        test.start()
        test.join()

end = time.clock()
print (end-start)

目前,尽管是通过运气和魔法,它(某种程度上)起作用了。我的问题是,我只使用了24个核心中的12个。

有人能解释一下这可能是什么原因吗?也许我的代码不是最佳的多进程解决方案,或者这是我的架构Intel Xeon CPU E5-2640 @ 2.50GHz x18 running on Ubuntu 13.04 x64的一个特性?

编辑:

我设法获得:

p = mp.Pool()
for item in degreelist:
    p.apply_async(_all_simple_paths_graph,
                  args=(DG, cutoff, item, memorizedPaths, filepaths))
p.close()
p.join()

工作起来非常缓慢!因此我认为我正在使用错误的函数。希望这可以澄清我要完成的任务!

编辑2:.map 尝试:

partialfunc = partial(_all_simple_paths_graph,
                      DG=DG,
                      cutoff=cutoff,
                      memorizedPaths=memorizedPaths,
                      filepaths=filepaths)
p = mp.Pool()
for item in processList:
    processVar = p.map(partialfunc, xrange(len(processList)))   
p.close()
p.join()

工作正常,比单核慢。需要优化时间!


3
你等待每个新进程结束(test.join())后循环才开始另一个进程,这看起来很奇怪。尝试在加入任何一个进程之前至少启动24个进程。 - Tim Peters
是的,我只是在添加函数来看看哪些有效... :D 有没有办法让进程数与计算机上的处理器数量相同?(即是否有用于 CPU 的系统变量?)另外,如果我启动24个进程,它们会在空闲时不断添加更多节点吗?我有4700个节点要排序!我假设不行,这似乎是一厢情愿的想法。有没有办法做到这样的事情? - Darkstarone
类似于 R 中的 mclapply - Darkstarone
1
您可能还想考虑使用Parallel Python - John Y
1个回答

65

这里需要处理的问题太多了,不适合在评论中解决。对于mp表示multiprocessing:

mp.cpu_count() 应该返回处理器数量。但是要测试一下。有些平台很复杂,这个信息并不总是容易得到的。Python尽力而为。

如果您启动24个进程,它们将完全按照您的指示执行;-) 看起来使用 mp.Pool() 对您来说最方便。您可以将要创建的进程数传递给其构造函数。 mp.Pool(processes=None) 将使用 mp.cpu_count() 作为处理器数量。

然后,您可以在Pool实例上使用例如.imap_unordered(...),将您的degreelist分布在多个进程之间。或者也许其他一些Pool方法对您来说效果更好-请尝试。

如果您无法将问题纳入Pool的视野中,那么您可以创建一个mp.Queue来创建工作队列,并在主程序中.put()节点(或节点的切片,以减少开销),并编写工作程序以从该队列中获取工作项。如果需要示例,请询问。请注意,您需要在所有“真实”工作项之后将哨兵值(每个进程一个)放在队列上,以便工作进程可以测试哨兵来知道何时完成。

顺带一提,我喜欢队列,因为它们更明确。许多人更喜欢Pool,因为它们更神奇;-)

示例

这里有一个可执行的原型供您参考。这展示了使用imap_unorderedchunksize的一种方式,不需要更改任何函数签名。当然,您需要插入您的真实代码;-) 请注意,init_worker方法允许仅一次传递“大部分”参数,而不是针对您的每个degreeslist项目都传递一次。减少进程间通信的数量对于速度至关重要。

import multiprocessing as mp

def init_worker(mps, fps, cut):
    global memorizedPaths, filepaths, cutoff
    global DG

    print "process initializing", mp.current_process()
    memorizedPaths, filepaths, cutoff = mps, fps, cut
    DG = 1##nx.read_gml("KeggComplete.gml", relabel = True)

def work(item):
    _all_simple_paths_graph(DG, cutoff, item, memorizedPaths, filepaths)

def _all_simple_paths_graph(DG, cutoff, item, memorizedPaths, filepaths):
    pass # print "doing " + str(item)

if __name__ == "__main__":
    m = mp.Manager()
    memorizedPaths = m.dict()
    filepaths = m.dict()
    cutoff = 1 ##
    # use all available CPUs
    p = mp.Pool(initializer=init_worker, initargs=(memorizedPaths,
                                                   filepaths,
                                                   cutoff))
    degreelist = range(100000) ##
    for _ in p.imap_unordered(work, degreelist, chunksize=500):
        pass
    p.close()
    p.join()

我强烈建议按照原样运行此代码,以便您可以看到其运行速度非常快。然后逐步添加内容,以查看如何影响时间。例如,只需添加

   memorizedPaths[item] = item

对于_all_simple_paths_graph()函数的调用会极大地降低速度。为什么呢?因为每次添加后字典都会变得越来越大,并且这个安全的进程共享字典必须在所有进程之间同步(在底层进行)。同步的单位是“整个字典” - mp机制无法利用任何内部结构来对共享字典进行增量更新。

如果你承受不起这样的代价,那么就不能在此使用Manager.dict()。有许多巧妙的解决方法可供选择;-)


1
谢谢您的回复!我正在努力解决这个问题。似乎使用Pool并不可行,因为我有5个参数,而且我不确定如何让它接受多个参数(我正在使用Python 2.7.3的pypy,所以没有starmap,也不确定如何使用那么多参数来使partial函数正常运行!您能添加一个.map例子或一个.Queue()的示例吗? - Darkstarone
添加了一段多进程代码片段,虽然可以运行,但比单核运行时间慢得多。 - Darkstarone
嗯,我真的无论如何都无法让它工作,你能提供一个map函数的元组示例吗?我只想澄清语法,并确定问题是否出在其他地方! - Darkstarone
修改了EDIT2,因为我已经让它工作了。我不知道为什么要传递字符串 - 今天很长 - 但它仍然很慢,是时候研究块了!感谢您的所有帮助。您的名字将永远被注释在我的代码中! - Darkstarone
1
请查看我回答的编辑 - 有更简单的方法可以继续操作;-) 祝你好运! - Tim Peters
显示剩余5条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接