在Python的多进程中共享字典

5

在我的程序中,我需要在Python的多进程之间共享一个字典。我将代码简化,以便在此处提供示例:

import multiprocessing
def folding (return_dict, seq):
    dis = 1
    d = 0
    ddg = 1 
    '''This is irrelevant, actually my program sends seq parameter to other extern program that returns dis, d and ddg parameters'''
    return_dict [seq] = [dis, d, ddg]

seqs = ['atcgtg', 'agcgatcg', 'atcgatcgatc', atcggatcg', agctgctagct']
manager = Manager()
return_dict = manager.dict()
n_cores = 3

for i in range (0, len(seqs), n_cores) #n_cores is the number of cores availables in the computer, defined by the user
    subseqs = seqs[i:i + n_cores]
    processes = [Process(target=folding, args =(return_dict, seq)) for seq in subseqs]
    for p in processes:
        p.start()
    for p in processes:
        p.join()

for i in retun_dict:
    print i

我希望在程序结束时能够得到包含所有属性值的return_dict。运行程序时,需要处理数千个序列并多次重复操作。有时可以得到正确的结果,但大部分情况下程序无法结束,也没有任何错误提示,我不知道出了什么问题。此外,我认为这种方法在时间上并不是很有效率,想知道是否有更快更高效的方式来实现。谢谢大家!

你用的是什么操作系统?Windows吗? - SmCaterpillar
1个回答

14

在修复了一些小的语法错误之后,您的代码似乎可以工作。

但是,我建议使用多进程池代替您的自定义解决方案,以始终同时运行n_cores个进程。您的方法存在的问题是,在启动下一批进程之前,所有进程都需要完成。取决于计算folding所需时间的变化程度,您可能会遇到瓶颈。在最坏的情况下,这意味着与单核处理相比没有任何加速。

此外,您的程序会在Windows下遇到严重问题。您需要确保您的主模块可以安全地导入,而不需要重新运行您的多进程代码。也就是说,您需要通过if __name__ == '__main___'来保护您的主要入口点,这可能已经在其他Python脚本中看到过。这将确保您受保护的代码仅在您的脚本被解释器作为文件启动时执行,即仅执行一次,而不是每个新生成的子进程都执行。

以下是我稍微更改的使用池的代码:

import multiprocessing as mp


def folding(return_dict, seq):
    dis = 1
    d = 0
    ddg = 1
    '''This is irrelevant, actually my program sends seq parameter to other extern program that returns dis, d and ddg parameters'''
    return_dict[seq] = [dis, d, ddg]


def main():
    seqs = ['atcgtg', 'agcgatcg', 'atcgatcgatc', 'atcggatcg', 'agctgctagct']
    manager = mp.Manager()
    return_dict = manager.dict()
    n_cores = 3

    # created pool running maximum 3 cores
    pool = mp.Pool(n_cores)

    # Execute the folding task in parallel
    for seq in seqs:
        pool.apply_async(folding, args=(return_dict, seq))

    # Tell the pool that there are no more tasks to come and join
    pool.close()
    pool.join()

    # Print the results
    for i in return_dict.keys():
        print(i, return_dict[i])


if __name__ == '__main__':
    # Protected main function
    main()

而这将被打印出来

atcgtg [1, 0, 1]
atcgatcgatc [1, 0, 1]
agcgatcg [1, 0, 1]
atcggatcg [1, 0, 1]
agctgctagct [1, 0, 1]

没有共享数据的示例

编辑: 在您的情况下实际上不需要共享数据结构。您可以简单地依赖于池的map函数。map接受一个可迭代对象,然后使用可迭代对象中的所有元素调用函数folding一次。使用map而不是map_async的优点在于结果按照输入的顺序排列。但是,在您可以处理它们之前,您需要等待收集所有结果。

这里是使用map的示例。请注意,您的函数folding现在返回结果,而不是将其放入共享字典中:

import multiprocessing as mp


def folding(seq):
    dis = 1
    d = 0
    ddg = 1
    '''This is irrelevant, actually my program sends seq parameter to other extern program that returns dis, d and ddg parameters'''
    # Return results instead of using shared data
    return [dis, d, ddg]


def main():
    seqs = ['atcgtg', 'agcgatcg', 'atcgatcgatc', 'atcggatcg', 'agctgctagct']
    n_cores = 3

    pool = mp.Pool(n_cores)

    # Use map which blocks until all results are ready:
    res = pool.map(folding, iterable=seqs)

    pool.close()
    pool.join()

    # Print the results
    # Using list(zip(..)) to print inputs next to outputs
    print(list(zip(seqs, res)))


if __name__ == '__main__':
    main()

而这个会打印

[('atcgtg', [1, 0, 1]), ('agcgatcg', [1, 0, 1]), ('atcgatcgatc', [1, 0, 1]), ('atcggatcg', [1, 0, 1]), ('agctgctagct', [1, 0, 1])]

非常感谢!您的提示看起来非常有用!我正在尝试在我的程序中使用它,看起来运行得很好。我正在使用LINUX,所以我没有__main__问题,但实现起来更加困难,因为多进程是在另一个更复杂的函数中,这是一个大型程序! - Irene Díaz
也许当您将数据分配给共享字典时,您的工作进程会崩溃。您可以尝试使用日志记录来查看工作进程是否以及如何崩溃。这里(https://dev59.com/NIjca4cB1Zd3GeqPsivH)是一个示例,说明如何记录子进程中的错误并获取完整的堆栈跟踪信息。 - SmCaterpillar
另一个想法是,你说实际上调用了另一个软件来完成(蛋白质?)折叠。也许那个软件碰巧卡住/崩溃了,在第一种情况下你只是不走运而已!? - SmCaterpillar
我会尝试记录日志,谢谢。是的,其他软件(RNA折叠)可能会崩溃,但我使用stderr捕获这些错误,所以我很确定这不是问题,几乎不总是问题。 - Irene Díaz
是的,也许这可能是问题所在。我正在尝试寻找其他解决方案(或其他问题),因为我认为如果问题是RAM要求,唯一的解决方案就是更换计算机。 - Irene Díaz
显示剩余5条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接