用多进程并发地填充一个字典

16
昨天我提了一个问题:使用multiprocess并行读取数据
我得到了非常好的答案,我实施了标为正确的答案中提到的解决方案。
def read_energies(motif):
    os.chdir("blabla/working_directory")
    complx_ener = pd.DataFrame()
    # complex function to fill that dataframe 
    lig_ener = pd.DataFrame()
    # complex function to fill that dataframe 
    return motif, complx_ener, lig_ener

COMPLEX_ENERGIS = {}
LIGAND_ENERGIES = {}
p = multiprocessing.Pool(processes=CPU)
for x in p.imap_unordered(read_energies, peptide_kd.keys()):
    COMPLEX_ENERGIS[x[0]] = x[1]
    LIGAND_ENERGIES[x[0]] = x[2]

然而,这个解决方案所需的时间与我只是遍历peptide_kd.keys()并逐个填充DataFrames的时间相同。为什么会这样?有没有一种方法可以并行填充所需的字典并实际获得速度提升?我正在48核HPC上运行它。


使用多进程的开销可能比进行复杂函数处理的开销更大。也许让read_energies()每次处理可变数量的数据框,可以使您调整到优势点。 - martineau
1个回答

18

你在启动每个进程和跨多个进程复制pandas.DataFrame等数据时产生了大量开销。如果你只需要并行填充一个字典,我建议使用共享内存字典。如果没有键被覆盖,那么这很容易,你不必担心锁问题。

(请注意,以下示例中我使用的是multiprocess,它是multiprocessing的一个分支 - 但仅为了从解释器演示,否则您必须从__main__运行以下内容.)

>>> from multiprocess import Process, Manager
>>> 
>>> def f(d, x):
...   d[x] = x**2
... 
>>> manager = Manager()
>>> d = manager.dict()
>>> job = [Process(target=f, args=(d, i)) for i in range(5)]
>>> _ = [p.start() for p in job]
>>> _ = [p.join() for p in job]
>>> print d
{0: 0, 1: 1, 2: 4, 3: 9, 4: 16}

这个方案没有对 dict 进行复制来在进程之间共享,从而减少了一部分开销。对于像 pandas.DataFrame 这样的大型对象,与简单操作如 x**2 的成本相比,它可能是显著的。同样,生成一个 Process 可能需要时间,您可以通过使用线程(即来自 multiprocess.dummy 而不是 multiprocess)来加速上述过程(对于轻量级对象),甚至可能更快,无论是使用您最初发布的解决方案还是我上面的解决方案。

如果您确实需要共享 DataFrames(正如您的代码所暗示的而不是问题所询问的),您可以通过创建共享内存 numpy.ndarray 来完成。


谢谢您的回答!我现在要尝试一下,但首先我想问一些问题。我不明白所提到的“共享”数据帧(变量,我猜)。为什么我的代码意味着我使用了一个共享DataFrame?我想要并行执行的工作就像您描述的那样,填充一个字典,并在以后以不同的方式使用它(读取其中的数据),但不更改其中的任何内容。 - Gábor Erdős
我提到你可能需要研究共享内存数组的原因是,你从每个进程中返回了两个 DataFrame 实例。然而,由于你只展示了元代码,很难告诉你是否需要这样做。 - Mike McKerns
哦,我明白了。我需要这两个“DataFrames”。返回两个是否有问题?分两步完成会更容易吗? - Gábor Erdős
不返回两个值并不会有问题,只是比较耗费资源。你可以创建两个共享内存数组,然后根据需要填充 DataFrame.values - Mike McKerns
@Curious:每次调用Process都会创建一个新的进程,因此我使用range(5)设置进程数。 - Mike McKerns
显示剩余3条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接