首先要说的是:如果涉及到同一处理器上的多个核心,
numpy
已经能够比我们手动并行化操作更好地完成此操作(请参见
Python 中大型数组的乘法 的讨论)。
在这种情况下,关键是确保乘法是通过批量数组操作而不是 Python 的
for
循环完成的。
test2 = x[n.newaxis, :] * y[:, n.newaxis]
n.abs( test - test2 ).max() # verify equivalence to mult(): output should be 0.0, or very small reflecting floating-point precision limitations
如果你想要将这个操作分配到多个独立的CPU上执行,那就是另外一回事了,但问题似乎暗示只有一个(多核)CPU。因此,考虑到这一点:假设你想并行化一个比
mult()
更复杂的操作。假设你已经努力优化了这个操作,使它变成
numpy
可以自己并行化的批量数组操作,但你的操作还是不容易实现。在这种情况下,你可以使用一个带有
lock=False
参数的共享内存
multiprocessing.Array
,以及
multiprocessing.Pool
来分配进程以处理它的非重叠部分,这些部分被分割到
y
维度(如果需要,也可同时划分到
x
维度)。以下是一个示例清单。请注意,这种方法并不明确地执行你指定的操作(将结果合并并追加到单个数组中)。相反,它做得更高效:多个进程同时在共享内存的非重叠区域中组装它们各自部分的答案。完成后,不需要整理/附加操作:我们只需读出结果即可。
import os, numpy, multiprocessing, itertools
SHARED_VARS = {}
def operate( slices ):
yslice, xslice = slices
y, x, r = get_shared_arrays('y', 'x', 'r')
y = y[yslice]
x = x[xslice]
r = r[yslice, xslice]
for i in range(len(r)):
r[i] = y[i] * x
return 'Process %d operated on y[%s] and x[%s] (%d x %d chunk)' % (os.getpid(), slicestr(yslice), slicestr(xslice), y.size, x.size)
def check(y, x, r):
r2 = x[numpy.newaxis, :] * y[:, numpy.newaxis]
print( 'max. abs. diff. = %g' % numpy.abs(r - r2).max() )
return y, x, r
def slicestr(s):
return ':'.join( '' if x is None else str(x) for x in [s.start, s.stop, s.step] )
def m2n(buf, shape, typecode, ismatrix=False):
"""
Return a numpy.array VIEW of a multiprocessing.Array given a
handle to the array, the shape, the data typecode, and a boolean
flag indicating whether the result should be cast as a matrix.
"""
a = numpy.frombuffer(buf, dtype=typecode).reshape(shape)
if ismatrix: a = numpy.asmatrix(a)
return a
def n2m(a):
"""
Return a multiprocessing.Array COPY of a numpy.array, together
with shape, typecode and matrix flag.
"""
if not isinstance(a, numpy.ndarray): a = numpy.array(a)
return multiprocessing.Array(a.dtype.char, a.flat, lock=False), tuple(a.shape), a.dtype.char, isinstance(a, numpy.matrix)
def new_shared_array(shape, typecode='d', ismatrix=False):
"""
Allocate a new shared array and return all the details required
to reinterpret it as a numpy array or matrix (same order of
output arguments as n2m)
"""
typecode = numpy.dtype(typecode).char
return multiprocessing.Array(typecode, int(numpy.prod(shape)), lock=False), tuple(shape), typecode, ismatrix
def get_shared_arrays(*names):
return [m2n(*SHARED_VARS[name]) for name in names]
def init(*pargs, **kwargs):
SHARED_VARS.update(pargs, **kwargs)
if __name__ == '__main__':
ylen = 1000
xlen = 2000
init( y=n2m(range(ylen)) )
init( x=n2m(numpy.random.rand(xlen)) )
init( r=new_shared_array([ylen, xlen], float) )
print('Master process ID is %s' % os.getpid())
pool = multiprocessing.Pool(initializer=init, initargs=SHARED_VARS.items())
yslices = [slice(0,333), slice(333,666), slice(666,None)]
xslices = [slice(0,1000), slice(1000,None)]
reports = pool.map(operate, itertools.product(yslices, xslices))
print('\n'.join(reports))
y, x, r = check(*get_shared_arrays('y', 'x', 'r'))