多进程中的共享内存对象

157
假设我有一个大的内存numpy数组,我有一个名为 func 的函数,它将这个巨大的数组作为输入(以及一些其他参数)。使用不同参数运行 func 可以并行进行。例如:
def func(arr, param):
    # do stuff to arr, param

# build array arr

pool = Pool(processes = 6)
results = [pool.apply_async(func, [arr, param]) for param in all_params]
output = [res.get() for res in results]

如果我使用多进程库,那么巨大的数组将会被复制多次到不同的进程中。

有没有办法让不同的进程共享同一个数组?这个数组对象是只读的,永远不会被修改。

更复杂的是,如果arr不是一个数组,而是任意的Python对象,是否有一种方法可以共享它呢?

[编辑]

我看了答案,但仍然有些困惑。由于fork()是写时复制,所以在Python多进程库中生成新进程时不应该产生任何额外的开销。但以下代码表明存在巨大的开销:

from multiprocessing import Pool, Manager
import numpy as np; 
import time

def f(arr):
    return len(arr)

t = time.time()
arr = np.arange(10000000)
print "construct array = ", time.time() - t;


pool = Pool(processes = 6)

t = time.time()
res = pool.apply_async(f, [arr,])
res.get()
print "multiprocessing overhead = ", time.time() - t;

输出(顺便提一下,随着数组大小的增加,成本也会增加,因此我怀疑仍然存在与内存复制相关的开销):

construct array =  0.0178790092468
multiprocessing overhead =  0.252444982529

为什么即使我们没有复制数组,仍然存在如此巨大的开销?共享内存又有什么作用呢?

你已经查看了文档,对吧?(http://docs.python.org/library/multiprocessing.html#multiprocessing.Array) - Lev Levitsky
@FrancisAvila 有没有一种方法可以共享任意的Python对象,而不仅仅是数组? - CuriousMind
1
@LevLevitsky 我必须问一下,是否有一种方法可以共享任意的Python对象,而不仅仅是数组? - CuriousMind
2
这个答案很好地解释了为什么任意的Python对象不能被共享。 - Janne Karila
4个回答

147
如果您使用的操作系统使用写时复制的fork()语义(像任何常见的unix),只要您不修改数据结构,它将对所有子进程可用,而不会占用额外的内存。您不必做任何特殊的事情(除了确保绝对不修改对象)。
对于您的问题,您可以做的最有效的事情是将数组打包到高效的数组结构中(使用numpyarray),将其放置在共享内存中,用multiprocessing.Array包装它,并将其传递给您的函数。此答案显示如何执行此操作
如果您想要一个可写的共享对象,那么您需要使用某种同步或锁来包装它。{multiprocessing}提供了两种方法:一种使用共享内存(适用于简单值、数组或ctypes),另一种是Manager代理,其中一个进程持有内存,而管理器从其他进程(甚至跨网络)仲裁对其的访问。 Manager方法可以用于任意Python对象,但比使用共享内存的等效方法更慢,因为需要对对象进行序列化/反序列化并在进程之间发送。
在Python中有大量并行处理库和方法可用。 multiprocessing是一个优秀且全面的库,但如果您有特殊需求,也许其他方法可能更好。

1
多进程使用pickling和消息传递与子进程通信,因此通过将数组作为“apply_async”的参数包含在内,它被序列化并复制。为了避免复制,您需要在调用“apply_async”之前创建对该对象的引用(并且不要在参数列表中包括“arr”),或者将其包装在“multiprocessing.Array”中并传递。 - Francis Avila
35
只是注意,在Python中fork()实际上意味着访问时复制(因为仅访问对象将更改其引用计数)。 - Fabio Zadrozny
3
它实际上会复制整个对象,还是只复制包含其引用计数的内存页? - zigg
5
据我所知,只有包含引用计数的内存页面被访问时才会被加载(因此每个对象访问会加载4KB的内存页面)。 - Fabio Zadrozny
3
如何使用闭包?你提供给 apply_async 的函数应该是可序列化的吗?或者这只是 map_async 的限制? - GermanK
显示剩余4条评论

18
这是 Ray 的预期使用情况,它是一个用于Python并行和分布式的库。在底层,它使用Apache Arrow数据布局(这是一种零复制格式)来序列化对象,并将它们存储在共享内存对象存储器中,以便多个进程可以访问它们而不创建副本。

代码看起来像下面这样。

import numpy as np
import ray

ray.init()

@ray.remote
def func(array, param):
    # Do stuff.
    return 1

array = np.ones(10**6)
# Store the array in the shared memory object store once
# so it is not copied multiple times.
array_id = ray.put(array)

result_ids = [func.remote(array_id, i) for i in range(4)]
output = ray.get(result_ids)

如果您不调用ray.put,那么数组仍将存储在共享内存中,但这将在每次调用func时执行一次,这不是您想要的。

请注意,这不仅适用于数组,也适用于包含数组的对象,例如下面将整数映射到数组的字典。

您可以通过在IPython中运行以下命令来比较Ray与pickle中序列化的性能。

import numpy as np
import pickle
import ray

ray.init()

x = {i: np.ones(10**7) for i in range(20)}

# Time Ray.
%time x_id = ray.put(x)  # 2.4s
%time new_x = ray.get(x_id)  # 0.00073s

# Time pickle.
%time serialized = pickle.dumps(x)  # 2.6s
%time deserialized = pickle.loads(serialized)  # 1.9s

使用Ray进行序列化只比pickle略快,但由于使用了共享内存,反序列化速度快了1000倍(这个数字当然取决于对象)。

请参阅 Ray文档 。 您可以阅读有关使用Ray和Arrow进行快速序列化的更多信息。 注意我是Ray开发人员之一。


2
Ray听起来不错!但是,我之前尝试过使用这个库,但不幸的是,我刚刚意识到Ray不支持Windows。希望你们能尽快支持Windows。谢谢开发者! - Hzzkygcs
1
Ray支持Windows,并且很稳定,但需要安装VC++运行时库,不过这很容易,因为Visual Studio已经自带了。 - Corneliu Maftuleac
Ray目前支持MacOS和Linux。Windows的安装包现已推出,但是Windows支持仍处于实验阶段并正在开发中。- 来自Ray网站最新消息 https://docs.ray.io/en/latest/installation.html - KawaiKx
2
Ray现在支持Windows :) - Robert Nishihara
@RobertNishihara 我尝试在共享内存中加载字典列表,但结果发现零拷贝仅适用于数值numpy数组。有没有办法可以使用Ray在进程之间共享此数据集而不复制它?我使用了多进程管理器,但读取速度太慢了。 - Cypheon

17
我遇到了同样的问题,并编写了一个小型共享内存实用程序类来解决它。
我正在使用(无锁),并且对数组的访问也根本没有同步(无锁),请小心不要伤到自己。
使用这个解决方案,我在四核i7上加速了约3倍。
以下是代码: 随意使用和改进它,并请报告任何错误。
'''
Created on 14.05.2013

@author: martin
'''

import multiprocessing
import ctypes
import numpy as np

class SharedNumpyMemManagerError(Exception):
    pass

'''
Singleton Pattern
'''
class SharedNumpyMemManager:    

    _initSize = 1024

    _instance = None

    def __new__(cls, *args, **kwargs):
        if not cls._instance:
            cls._instance = super(SharedNumpyMemManager, cls).__new__(
                                cls, *args, **kwargs)
        return cls._instance        

    def __init__(self):
        self.lock = multiprocessing.Lock()
        self.cur = 0
        self.cnt = 0
        self.shared_arrays = [None] * SharedNumpyMemManager._initSize

    def __createArray(self, dimensions, ctype=ctypes.c_double):

        self.lock.acquire()

        # double size if necessary
        if (self.cnt >= len(self.shared_arrays)):
            self.shared_arrays = self.shared_arrays + [None] * len(self.shared_arrays)

        # next handle
        self.__getNextFreeHdl()        

        # create array in shared memory segment
        shared_array_base = multiprocessing.RawArray(ctype, np.prod(dimensions))

        # convert to numpy array vie ctypeslib
        self.shared_arrays[self.cur] = np.ctypeslib.as_array(shared_array_base)

        # do a reshape for correct dimensions            
        # Returns a masked array containing the same data, but with a new shape.
        # The result is a view on the original array
        self.shared_arrays[self.cur] = self.shared_arrays[self.cnt].reshape(dimensions)

        # update cnt
        self.cnt += 1

        self.lock.release()

        # return handle to the shared memory numpy array
        return self.cur

    def __getNextFreeHdl(self):
        orgCur = self.cur
        while self.shared_arrays[self.cur] is not None:
            self.cur = (self.cur + 1) % len(self.shared_arrays)
            if orgCur == self.cur:
                raise SharedNumpyMemManagerError('Max Number of Shared Numpy Arrays Exceeded!')

    def __freeArray(self, hdl):
        self.lock.acquire()
        # set reference to None
        if self.shared_arrays[hdl] is not None: # consider multiple calls to free
            self.shared_arrays[hdl] = None
            self.cnt -= 1
        self.lock.release()

    def __getArray(self, i):
        return self.shared_arrays[i]

    @staticmethod
    def getInstance():
        if not SharedNumpyMemManager._instance:
            SharedNumpyMemManager._instance = SharedNumpyMemManager()
        return SharedNumpyMemManager._instance

    @staticmethod
    def createArray(*args, **kwargs):
        return SharedNumpyMemManager.getInstance().__createArray(*args, **kwargs)

    @staticmethod
    def getArray(*args, **kwargs):
        return SharedNumpyMemManager.getInstance().__getArray(*args, **kwargs)

    @staticmethod    
    def freeArray(*args, **kwargs):
        return SharedNumpyMemManager.getInstance().__freeArray(*args, **kwargs)

# Init Singleton on module load
SharedNumpyMemManager.getInstance()

if __name__ == '__main__':

    import timeit

    N_PROC = 8
    INNER_LOOP = 10000
    N = 1000

    def propagate(t):
        i, shm_hdl, evidence = t
        a = SharedNumpyMemManager.getArray(shm_hdl)
        for j in range(INNER_LOOP):
            a[i] = i

    class Parallel_Dummy_PF:

        def __init__(self, N):
            self.N = N
            self.arrayHdl = SharedNumpyMemManager.createArray(self.N, ctype=ctypes.c_double)            
            self.pool = multiprocessing.Pool(processes=N_PROC)

        def update_par(self, evidence):
            self.pool.map(propagate, zip(range(self.N), [self.arrayHdl] * self.N, [evidence] * self.N))

        def update_seq(self, evidence):
            for i in range(self.N):
                propagate((i, self.arrayHdl, evidence))

        def getArray(self):
            return SharedNumpyMemManager.getArray(self.arrayHdl)

    def parallelExec():
        pf = Parallel_Dummy_PF(N)
        print(pf.getArray())
        pf.update_par(5)
        print(pf.getArray())

    def sequentialExec():
        pf = Parallel_Dummy_PF(N)
        print(pf.getArray())
        pf.update_seq(5)
        print(pf.getArray())

    t1 = timeit.Timer("sequentialExec()", "from __main__ import sequentialExec")
    t2 = timeit.Timer("parallelExec()", "from __main__ import parallelExec")

    print("Sequential: ", t1.timeit(number=1))    
    print("Parallel: ", t2.timeit(number=1))

刚刚意识到,在创建多进程池之前,必须先设置共享内存数组,还不知道为什么,但肯定不能反过来做。 - martin.preinfalk
原因是Multiprocessing pool在实例化时调用fork(),因此之后的任何内容都无法访问创建后的任何共享内存指针。 - Xiv
1
当我在py35下尝试运行这段代码时,我在multiprocessing.sharedctypes.py中得到了异常,因此我猜测这段代码仅适用于py2。 - Dr. Hillier Dániel

6

正如Robert Nishihara所提到的,Apache Arrow使得这个过程变得容易,特别是通过Plasma内存对象存储器,而Ray就是基于它构建的。

我专门为此创建了brain-plasma - 在Flask应用程序中快速加载和重新加载大型对象。它是一个共享内存对象命名空间,用于Apache Arrow可序列化对象,包括由pickle.dumps(...)生成的pickle字节串。

Apache Ray和Plasma的关键区别在于它会为您跟踪对象ID。任何在本地运行的进程、线程或程序都可以通过从任何Brain对象调用名称来共享变量的值。

$ pip install brain-plasma

$ plasma_store -m 10000000 -s /tmp/plasma

from brain_plasma import Brain
brain = Brain(path='/tmp/plasma/')

brain['a'] = [1]*10000

brain['a']
# >>> [1,1,1,1,...]

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接