使用多进程/线程将numpy数组操作分成块

10

我有一个函数,用于呈现一个MxN的数组。 由于该数组非常大,因此我希望使用该函数同时生成小型数组(M1xN、M2xN、M3xN - MixN。M1 + M2 + M3 + ... + Mi = M),并使用多进程/线程并行处理这些小型数组,最终将它们合并为mxn数组。正如Mr. Boardrider所建议的,提供一个可行的示例,以下示例可以广泛传达我的意图。

import numpy as n
def mult(y,x):
    r = n.empty([len(y),len(x)])
    for i in range(len(r)):
        r[i] = y[i]*x
    return r
x = n.random.rand(10000)
y = n.arange(0,100000,1)
test = mult(y=y,x=x)
随着 xy 的长度增加,系统将需要更多的时间。关于这个例子,我想运行这段代码,以便如果我有4个核心,我可以将工作分成四份,即将计算元素 r[0]r[24999] 的工作分配给第一个核心,将r[25000]r[49999] 的工作分配给第二个核心,将r[50000]r[74999] 的工作分配给第三个核心,将r[75000]r[99999] 的工作分配给第四个核心。最终将结果合并,附加到一个单一的数组 r[0]r[99999] 中。

我希望这个例子能让事情变得清晰明了。如果我的问题还不清楚,请告诉我。


1
怎么样使用 [mcve]? - boardrider
你在Python中永远无法比NumPy的内部广播机制更快地编写程序,即使它是多线程/进程...让NumPy在内部处理。 - Aaron
请注意不要为了使用多个线程/进程而盲目行事。在大量数据上执行少量工作只会导致CPU被内存总线速度拖慢(与CPU缓存等相比,它很慢)。因此,如果您的算法受到I/O限制,则添加更多线程不会提高速度。 - bazza
1个回答

9
首先要说的是:如果涉及到同一处理器上的多个核心,numpy 已经能够比我们手动并行化操作更好地完成此操作(请参见 Python 中大型数组的乘法 的讨论)。
在这种情况下,关键是确保乘法是通过批量数组操作而不是 Python 的 for 循环完成的。
test2 = x[n.newaxis, :] * y[:, n.newaxis]

n.abs( test - test2 ).max()  # verify equivalence to mult(): output should be 0.0, or very small reflecting floating-point precision limitations

如果你想要将这个操作分配到多个独立的CPU上执行,那就是另外一回事了,但问题似乎暗示只有一个(多核)CPU。因此,考虑到这一点:假设你想并行化一个比mult()更复杂的操作。假设你已经努力优化了这个操作,使它变成numpy可以自己并行化的批量数组操作,但你的操作还是不容易实现。在这种情况下,你可以使用一个带有lock=False参数的共享内存multiprocessing.Array,以及multiprocessing.Pool来分配进程以处理它的非重叠部分,这些部分被分割到y维度(如果需要,也可同时划分到x维度)。以下是一个示例清单。请注意,这种方法并不明确地执行你指定的操作(将结果合并并追加到单个数组中)。相反,它做得更高效:多个进程同时在共享内存的非重叠区域中组装它们各自部分的答案。完成后,不需要整理/附加操作:我们只需读出结果即可。
import os, numpy, multiprocessing, itertools

SHARED_VARS = {} # the best way to get multiprocessing.Pool to send shared multiprocessing.Array objects between processes is to attach them to something global - see https://dev59.com/BXI-5IYBdhLWcg3wxruQ

def operate( slices ):
    # grok the inputs
    yslice, xslice = slices
    y, x, r = get_shared_arrays('y', 'x', 'r')
    # create views of the appropriate chunks/slices of the arrays:
    y = y[yslice]
    x = x[xslice]
    r = r[yslice, xslice]
    # do the actual business
    for i in range(len(r)):
        r[i] = y[i] * x  # If this is truly all operate() does, it can be parallelized far more efficiently by numpy itself.
                         # But let's assume this is a placeholder for something more complicated.

    return 'Process %d operated on y[%s] and x[%s] (%d x %d chunk)' % (os.getpid(), slicestr(yslice), slicestr(xslice), y.size, x.size)

def check(y, x, r):
    r2 = x[numpy.newaxis, :] * y[:, numpy.newaxis]  # obviously this check will only be valid if operate() literally does only multiplication (in which case this whole business is unncessary)
    print( 'max. abs. diff. = %g' % numpy.abs(r - r2).max() )
    return y, x, r

def slicestr(s):
    return ':'.join( '' if x is None else str(x) for x in [s.start, s.stop, s.step] )

def m2n(buf, shape, typecode, ismatrix=False):
    """
    Return a numpy.array VIEW of a multiprocessing.Array given a
    handle to the array, the shape, the data typecode, and a boolean
    flag indicating whether the result should be cast as a matrix.
    """
    a = numpy.frombuffer(buf, dtype=typecode).reshape(shape)
    if ismatrix: a = numpy.asmatrix(a)
    return a

def n2m(a):
    """
    Return a multiprocessing.Array COPY of a numpy.array, together
    with shape, typecode and matrix flag.
    """
    if not isinstance(a, numpy.ndarray): a = numpy.array(a)
    return multiprocessing.Array(a.dtype.char, a.flat, lock=False), tuple(a.shape), a.dtype.char, isinstance(a, numpy.matrix)

def new_shared_array(shape, typecode='d', ismatrix=False):
    """
    Allocate a new shared array and return all the details required
    to reinterpret it as a numpy array or matrix (same order of
    output arguments as n2m)
    """
    typecode = numpy.dtype(typecode).char
    return multiprocessing.Array(typecode, int(numpy.prod(shape)), lock=False), tuple(shape), typecode, ismatrix

def get_shared_arrays(*names):
    return [m2n(*SHARED_VARS[name]) for name in names]

def init(*pargs, **kwargs):
    SHARED_VARS.update(pargs, **kwargs)

if __name__ == '__main__':

    ylen = 1000
    xlen = 2000

    init( y=n2m(range(ylen)) )
    init( x=n2m(numpy.random.rand(xlen)) )
    init( r=new_shared_array([ylen, xlen], float) )

    print('Master process ID is %s' % os.getpid())

    #print( operate([slice(None), slice(None)]) ); check(*get_shared_arrays('y', 'x', 'r'))  # local test

    pool = multiprocessing.Pool(initializer=init, initargs=SHARED_VARS.items())
    yslices = [slice(0,333), slice(333,666), slice(666,None)]
    xslices = [slice(0,1000), slice(1000,None)]
    #xslices = [slice(None)]  # uncomment this if you only want to divide things up in the y dimension
    reports = pool.map(operate, itertools.product(yslices, xslices))
    print('\n'.join(reports))
    y, x, r = check(*get_shared_arrays('y', 'x', 'r'))

有点复杂,但非常有效。 - Samuel Prevost

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接