从numpy数组创建RawArray?

9

我希望能在多个进程之间共享一个numpy数组。这些进程只需要读取数据,因此我希望避免复制数据。如果我从multiprocessing.sharedctypes.RawArray开始,并使用numpy.frombuffer创建numpy数组,我知道如何做。但是如果我最初收到的就是一个numpy数组呢?有没有一种方法可以使用numpy数组的数据初始化RawArray而不复制数据?或者是否有其他方法可以在进程之间共享数据而不复制它?


@christianmbrodbeck 这个答案 解释了如何使用Cython和OpenMP通过共享内存在不同进程中处理相同的数组。 - Saullo G. P. Castro
@christianmbrodbeck 是的,prange 会产生一些开销,如果您创建静态线程,则可以将其最小化。您是正确的... 在这个例子中prange 已经在最外层循环中了。 - Saullo G. P. Castro
@christianmbrodbeck 在那个例子中,你只启动了一个Python线程,而内部使用了多个核心来处理这个线程...在你的问题中,似乎你需要使用许多线程来访问同一个数组中的数据,是这样吗? - Saullo G. P. Castro
你是否考虑过使用线程而不是进程,例如使用multiprocessing.dummy?Numpy在释放GIL(CPython的全局解释器锁)方面做得非常好,而Cython也具有nogil功能。 - user2379410
谢谢@moarningsun。我一直没有使用线程,因为我的代码中有很多Python代码,但也许我应该进行一些分析。 - christianbrodbeck
显示剩余9条评论
4个回答

4
据我所知,在将内存分配给特定进程后,不可能将其声明为共享内存。类似的讨论可以在这里这里(更适合)找到。
让我快速概述您提到的解决方法(从RawArray开始并获取对其的numpy.ndarray引用)。
import numpy as np
from multiprocessing.sharedctypes import RawArray
# option 1
raw_arr = RawArray(ctypes.c_int, 12)
# option 2 (set is up, similar to some existing np.ndarray np_arr2)
raw_arr = RawArray(
        np.ctypeslib.as_ctypes_type(np_arr2.dtype), len(np_arr2)
        )
np_arr = np.frombuffer(raw_arr, dtype=np.dtype(raw_arr))
# np_arr: numpy array with shared memory, can be processed by multiprocessing

如果你必须使用 numpy.ndarray 开始,那么你别无选择,只能复制数据。

import numpy as np
from multiprocessing.sharedctypes import RawArray

np_arr = np.zeros(shape=(3, 4), dtype=np.ubyte)
# option 1
tmp = np.ctypeslib.as_ctypes(np_arr)
raw_arr = RawArray(tmp._type_, tmp)
# option 2
raw_arr = RawArray(np.ctypeslib.as_ctypes_type(np_arr.dtype), np_arr.flatten())

print(raw_arr[:])

这里不需要使用np.frombuffer,你可以直接使用np.asarray,它会自动找到合适的dtype。 - Eric

0

我还有一些你的要求:a) 给定一个大的numpy数组,b) 需要在一堆进程之间共享它,c) 只读等。为此,我一直在使用类似于以下的东西:

mynparray = #initialize a large array from a file
shrarr_base_ptr = RawArray(ctypes.c_double, len*rows*cols)
shrarr_ptr = np.frombuffer(shrarr_base_ptr)
shrarr_ptr = mynparray

在我的情况下,mynparray 是三维的。至于实际共享,我使用了以下样式,目前它可以正常工作。

    inq1 = Queue()
    inq2 = Queue()  
    outq = Queue()
    p1 = Process(target = myfunc1, args=(inq1, outq,))
    p1.start()
    inq1.put((shrarr_ptr, ))
    p2 = Process(target = myfunc2, args=(inq2, outq,))
    p2.start()
    inq2.put((shrarr_ptr,))
    inq1.close()
    inq2.close()
    inq1.join_thread()
    inq2.join_thread()
    ....

1
在你的代码行shrarr_ptr = mynparray中,你将原始的numpy数组赋值给了变量名shrarr_ptr。当你之后执行inq1.put((shrarr_ptr,))时,你通过Queue发送了整个numpy数组... - christianbrodbeck
不,RawArray来自sharedctypes,因此对象是在共享内存中创建并继承的。我并没有物理地发送整个数组。此外,根据我的经验,通过队列发送这样大的对象将需要很长时间。这是我上面代码的灵感来源:https://dev59.com/HWsz5IYBdhLWcg3wcXbF。 - Jey
1
@christianbrodbeck 是对的,代码中的 shrarr_ptr = mynparray 这行是有问题的。我认为应该改成 shrarr_ptr[:] = mynparray[:],这样才能把数据复制到新的共享内存中。 - coderforlife
为了解决数据传回原始numpy数组的问题,这是不可能的。一个解决方案是提前分配基于RawArray的ndarray,并将其用作创建原始ndarray的函数的out参数(如果可能)。 - coderforlife
我同意@christianbrodbeck的观点。这个答案只适用于np.ndarray,并且其中有两行不必要的代码。可以通过print(id(mynparray)print(id(shrarr_base_ptr)进行检查。 - Markus Dutschke

0
请注意,如果您计划使用numpy数组,则可以完全省略RawArray,并使用以下内容:
from multiprocessing.heap import BufferWrapper

def shared_array(shape, dtype):
    dt = np.dtype((dtype, shape))
    wrapper = BufferWrapper(dt.itemsize)
    mem = wrapper.create_memoryview()

    # workaround for bpo-41673 to keep `wrapper` alive
    ct = (ctypes.c_ubyte * dt.itemsize).from_buffer(mem)
    ct._owner = wrapper
    mem = memoryview(ct)

    return np.asarray(mem).view(dt)

这种方法的优点在于它可以在 np.ctypeslib.as_ctypes_type 失败的情况下正常工作。


-1

我不确定这是否会在内部复制数据,但您可以传递平坦的数组:

a = numpy.random.randint(1,10,(4,4))
>>> a
array([[5, 6, 7, 7],
       [7, 9, 2, 8],
       [3, 4, 6, 4],
       [3, 1, 2, 2]])

b = RawArray(ctypes.c_long, a.flat)
>>> b[:]
[5, 6, 7, 7, 7, 9, 2, 8, 3, 4, 6, 4, 3, 1, 2, 2]

无法工作,从内存使用情况来看,修改b也不会影响a... - christianbrodbeck
最初未分配为“共享”的内存无法轻易地变为共享。您需要将数据复制到共享内存块中,或使用@Jey的方法,使数组最初基于共享内存,因此始终是共享的。 - coderforlife

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接