在Python多进程中传递共享内存变量

3
我有一堆文件,想要使用Python的多进程读取并将所有数据收集到单个NumPy数组中。为此,我想定义一个共享内存NumPy数组,并将其切片传递给不同的进程以并行读取。下面的代码是我尝试使用多进程修改numpy数组的玩具示例。
示例1:

import numpy as np
import multiprocessing

def do_stuff(i, arr):
    arr[:]=i
    return

def print_error(err):
    print(err)

if __name__ == '__main__':
    idx = [0,1,2,3]
    # Need to fill this array in parallel
    arr = np.zeros(4)
    p = multiprocessing.Pool(4)
    # Passing slices to arr to modify using multiprocessing
    for i in idx:
        p.apply(do_stuff, args=(i,arr[i:i+1]))
    p.close()
    p.join()
    print(arr)


在这段代码中,我希望数组arr被填充为0、1、2、3。然而,输出结果显示arr全是0。在阅读了这里的答案后,我使用了multiprocessing.Array来定义共享内存变量,并修改了我的代码如下:
例子二:
import numpy as np
import multiprocessing

def do_stuff(i, arr):
    arr[:]=i
    return

def print_error(err):
    print(err)

if __name__ == '__main__':
    idx = [0,1,2,3]
    p = multiprocessing.Pool(4)
    # Shared memory Array
    shared = multiprocessing.Array('d', 4)
    arr = np.ctypeslib.as_array(shared.get_obj())

    for i in idx:
        p.apply(do_stuff, args=(i,arr[i:i+1]))
    p.close()
    p.join()
    print(arr)


这也会在 arr 中打印所有的零。然而,当我在 main 之外定义数组并使用 pool.map 时,代码可以正常工作。例如,下面的代码可以正常工作。
示例 3:
import numpy as np
import multiprocessing

shared = multiprocessing.Array('d', 4)
arr = np.ctypeslib.as_array(shared.get_obj())

def do_stuff(i):
    arr[i]=i
    return

def print_error(err):
    print(err)

if __name__ == '__main__':
    idx = [0,1,2,3]
    p = multiprocessing.Pool(4)
    shared = multiprocessing.Array('d', 4)
    p.map(do_stuff, idx)
    p.close()
    p.join()
    print(arr)
             

这将打印出 [0,1,2,3]。
我对所有这些感到非常困惑。我的问题是:
1. 当我定义 arr = np.zeros(4) 时,哪个处理器拥有这个变量?当我将这个数组的切片发送到不同的处理器时,如果这个变量在那些处理器上没有定义,会发送什么?
2. 为什么示例2不起作用,而示例3起作用?
我正在使用Linux和Python/3.7/4。
1个回答

1
当我定义arr = np.zeros(4)时,哪个处理器拥有这个变量?
只有主进程才能访问它。如果您使用“fork”作为启动方法,所有内容都将对子进程可用,但是一旦尝试修改某些内容,它将被复制到自己的私有内存空间中,然后再进行修改(写时复制)。如果您有大型只读数组,则可以减少开销,但不能帮助您将数据写回到这些数组中。
如果在这些处理器上未定义此变量,会发送什么?
当参数通过管道和pickle从主进程发送并重建时,在子进程中创建一个新数组。数据被序列化为文本并重建,因此除了切片中数据的值之外,没有其他信息。这是一个全新的对象。
为什么示例2不起作用而示例3起作用?
例子3之所以有效,是因为在“fork”(调用Pool的时刻),arr已经被创建并共享。重要的是你使用了一个Array来创建它,因此当你尝试修改数据时,数据会被共享(具体机制很复杂)。
例子2与例子1类似,不起作用:你将数组的一个切片作为参数传递,它被转换为一个全新的对象,所以在do_stuff函数中的arr只是从主进程中的arr[i:i+1]复制而来。仍然很重要的是,在调用Pool之前创建任何将在进程之间共享的内容(如果你依赖于“fork”来共享数据),但这不是这个例子不起作用的原因。
你应该知道:例子3之所以有效,是因为你在Linux上,并且默认的启动方法是fork。由于在锁定状态下复制锁对象可能会导致死锁的可能性,这不是首选的启动方法。这在Windows上根本不起作用,在3.8及以上版本的MacOS上默认也不起作用。
最好的解决方案(最便携)是将Array本身作为参数传递,并在子进程内重新构造numpy数组。这会导致“共享对象”只能在创建子进程时作为参数传递的复杂性。如果使用Process,这并不是很大的问题,但是对于Pool,您基本上必须将任何共享对象作为参数传递给初始化函数,并将重新构建的数组作为子进程范围的全局变量。例如,在这个例子中,如果尝试使用p.mapp.applybuf作为参数传递,将会出现错误,但是当将buf作为initargs=(buf,)传递给Pool()时,则不会出现错误。
import numpy as np
from multiprocessing import Pool, Array

def init_child(buf):
    global arr #use global context (for each process) to pass arr to do_stuff
    arr = np.frombuffer(buf.get_obj(), dtype='d')

def do_stuff(i):
    global arr
    arr[i]=i

if __name__ == '__main__':
    idx = [0,1,2,3]
    
    buf = Array('d', 4)
    arr = np.frombuffer(buf.get_obj(), dtype='d')
    arr[:] = 0
    
    #"with" context is easier than writing "close" and "join" all the time
    with Pool(4, initializer=init_child, initargs=(buf,)) as p:
        for i in idx:
            p.apply(do_stuff, args=(i,)) #you could pass more args to get slice indices too
    print(arr)

在3.8及以上版本中,有一个新模块比Array或任何其他sharedctypes类更好,叫做:shared_memory。这个使用起来有点复杂,并且有一些额外的依赖于操作系统的问题,但从理论上讲,它的开销更低,速度更快。如果你想深入了解,我已经写了几篇 关于 shared_memory的文章,并最近回答了很多关于并发的问题,如果你想看看我过去一个月或两个月的答案。


感谢您详细的回答,对我理解这个问题帮助很大。您提出的解决方案有效。但是,您说我问题中的示例2不起作用,因为数组是在分叉之后才创建的。我尝试将arr定义移动到调用Pool之前,但是这个示例仍然不起作用。您能解释一下这可能是为什么吗?我也会查看您分享的链接以改善我的理解。 - Deepak Dalakoti
@DeepakDalakoti 示例2与示例1大多相同。在do_stuff函数的本地范围内,参数会覆盖全局作用域中的arr,因此arr在全局作用域中是什么并不重要。如果省略传递arr,它将不会被覆盖,您将获得全局版本。我有点忽略了这一点,可能应该在我的答案中提到它... - Aaron

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接