Python多进程和共享numpy数组

5

我有一个问题,类似于这个:

import numpy as np

C = np.zeros((100,10))

for i in range(10):
    C_sub = get_sub_matrix_C(i, other_args) # shape 10x10
    C[i*10:(i+1)*10,:10] = C_sub

显然,不需要将其作为串行计算运行,因为每个子矩阵可以独立计算。

我想使用multiprocessing模块并为for循环创建多达4个进程。

我读了一些关于multiprocessing的教程,但无法弄清楚如何使用它来解决我的问题。

感谢您的帮助。


2
为了使多进程能够提高性能,计算必须需要花费相当长的时间。因为多进程将会对数据进行序列化,发送到子进程中,反序列化并执行计算,再次序列化结果,将其发送回主进程,最后再次反序列化。序列化/反序列化需要相当长的时间,加上进程间通信也不是很快。如果 get_sub_matrix 只是几个矩阵访问,那么你不会获得任何加速。 - Bakuriu
这只是举例而已。最终,我的矩阵将具有大约100000 x 20000的尺寸,但更重要的是,get_sub_matrix_C函数的速度有点慢,我认为我无法使该函数更快。 - RoSt
get_sub_matrix_C需要访问整个矩阵还是只访问子矩阵?如果需要访问整个矩阵,那么为每个子进程序列化一个大矩阵的副本将非常耗费时间和内存。 - eguaio
实际上,get_sub_matrix_C不依赖于C的任何条目。它只是提供我要写入C的子矩阵,其中i确定“位置”。 - RoSt
2个回答

4
使用一个进程池Pool是并行化该代码的一种简单方法:
pool = multiprocessing.Pool()
results = pool.starmap(get_sub_matrix_C, ((i, other_args) for i in range(10)))

for i, res in enumerate(results):
    C[i*10:(i+1)*10,:10] = res

我使用了starmap,因为get_sub_matrix_C函数有多个参数(starmap(f, [(x1, ..., xN)])调用f(x1, ..., xN))。
请注意,序列化/反序列化可能需要很长时间和空间,因此您可能需要使用更低级别的解决方案来避免这种开销。
看起来您正在运行过时的Python版本。您可以将starmap替换为普通的map,但是您必须提供一个接受单个参数的函数:
def f(args):
    return get_sub_matrix_C(*args)

pool = multiprocessing.Pool()
results = pool.map(f, ((i, other_args) for i in range(10)))

for i, res in enumerate(results):
    C[i*10:(i+1)*10,:10] = res

谢谢你的回答。不幸的是,我无法测试它,因为我没有星图。可能我正在使用过时的多进程版本?版本:0.70a1 - RoSt
@RoSt 你可以使用 map 函数并修改函数以接受单个参数。我已经编辑了答案,添加了这个解决方案。 - Bakuriu
谢谢你提供简单明了的解决方案。它很有效。我想为您投票,但是我的声望不足15,抱歉... - RoSt

0

以下的方法或许可以完成任务。如有疑问,请随时询问。

import numpy as np
import multiprocessing

def processParallel():

    def own_process(i, other_args, out_queue):
        C_sub = get_sub_matrix_C(i, other_args)
        out_queue.put(C_sub)            

    sub_matrices_list = []
    out_queue = multiprocessing.Queue()
    other_args = 0
    for i in range(10):
        p = multiprocessing.Process(
                            target=own_process,
                            args=(i, other_args, out_queue))
        procs.append(p)
        p.start()

    for i in range(10):
        sub_matrices_list.extend(out_queue.get())

    for p in procs:
        p.join()

    return sub_matrices_list    

C = np.zeros((100,10))

result = processParallel()

for i in range(10):
    C[i*10:(i+1)*10,:10] = result[i]

谢谢你的回答。我尝试了,但是结果很混乱。相同的条目一遍又一遍地重复出现。 - RoSt
1
我刚刚修复了那个错误,抱歉。不过,另一个答案似乎更简洁实用。我也会自己尝试一下! :) - eguaio

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接