如何使进程能够在主程序的数组中进行写入?

9
我正在制作一个进程池,每个进程都需要在主程序中存在的矩阵的不同部分中进行写入。由于每个进程将使用矩阵的不同行,因此不存在覆盖信息的风险。如何使矩阵可以从进程内部进行写入?
该程序是一款矩阵乘法器,由教授指定并且需要多进程处理。它将为计算机的每个核心创建一个进程。主程序将向各个进程发送矩阵的不同部分,它们将计算这些部分,然后以一种我可以识别哪个响应对应哪个行的方式返回它们。

你确定需要这个吗?乍一看,多进程的重点是分配计算任务,而不是写入数组。为什么你的子程序不能返回适当的结果,以供主程序进一步处理呢? - Lev Levitsky
是的,你说得对。那么我需要问的是,如何管理可能有多个进程(每个核心一个)的响应? - user1249212
1
multiprocessing 模块允许您收集异步运行的函数的结果,无论它们返回什么。比如,它们可以返回与矩阵行对应的1D数组。如果您的情况更复杂,请编辑您的帖子以提供更多细节。 - Lev Levitsky
我添加了更多信息,感谢你们的帮助。 - user1249212
1
如何从多个进程向numpy数组中写入数据的示例:点击此处 - jfs
4个回答

11

+1:对于共享数组,不需要同步,因此可以使用multiprocessing.RawArray - jfs
请您提供一个使用进程池和数组的简短示例好吗?使用pool.map似乎无法同时传递arrnum - YPOC
1
@YPOC,这方面已经有很多问答了。你查看过其中任何一个吗(例如这个那个)?否则,我建议你为你的问题发布一个新的问题。 - moooeeeep

6
新建进程或复制矩阵(如果进程被重用)的成本超过了矩阵乘法的成本。无论如何,numpy.dot() 可以自行利用不同的 CPU 核心。
可以通过在不同的进程中计算结果的不同行来分配矩阵乘法,例如,给定输入矩阵 ab,则结果的 (i,j) 元素为:
out[i,j] = sum(a[i,:] * b[:,j])

所以第 i 行可以计算为:

import numpy as np

def dot_slice(a, b, out, i):
    t = np.empty_like(a[i,:])
    for j in xrange(b.shape[1]):
        # out[i,j] = sum(a[i,:] * b[:,j])
        np.multiply(a[i,:], b[:,j], t).sum(axis=1, out=out[i,j])

numpy数组接受切片作为索引,例如,a[1:3,:]返回第2行和第3行。

ab是只读的,因此它们可以被子进程继承(在Linux上利用写时复制),结果使用共享数组计算。在计算过程中只复制索引:

import ctypes
import multiprocessing as mp

def dot(a, b, nprocesses=mp.cpu_count()):
    """Perform matrix multiplication using multiple processes."""
    if (a.shape[1] != b.shape[0]):
        raise ValueError("wrong shape")

    # create shared array
    mp_arr = mp.RawArray(ctypes.c_double, a.shape[0]*b.shape[1])

    # start processes
    np_args = mp_arr, (a.shape[0], b.shape[1]), a.dtype
    pool = mp.Pool(nprocesses, initializer=init, initargs=(a, b)+np_args)

    # perform multiplication
    for i in pool.imap_unordered(mpdot_slice, slices(a.shape[0], nprocesses)):
        print("done %s" % (i,))
    pool.close()
    pool.join()

    # return result
    return tonumpyarray(*np_args)

在哪里:

def mpdot_slice(i):
    dot_slice(ga, gb, gout, i)
    return i

def init(a, b, *np_args):
    """Called on each child process initialization."""
    global ga, gb, gout
    ga, gb = a, b
    gout = tonumpyarray(*np_args)

def tonumpyarray(mp_arr, shape, dtype):
    """Convert shared multiprocessing array to numpy array.

    no data copying
    """
    return np.frombuffer(mp_arr, dtype=dtype).reshape(shape)

def slices(nitems, mslices):
    """Split nitems on mslices pieces.

    >>> list(slices(10, 3))
    [slice(0, 4, None), slice(4, 8, None), slice(8, 10, None)]
    >>> list(slices(1, 3))
    [slice(0, 1, None), slice(1, 1, None), slice(2, 1, None)]
    """
    step = nitems // mslices + 1
    for i in xrange(mslices):
        yield slice(i*step, min(nitems, (i+1)*step))

测试它:

def test():
    n = 100000
    a = np.random.rand(50, n)
    b = np.random.rand(n, 60)
    assert np.allclose(np.dot(a,b), dot(a,b, nprocesses=2))

在Linux上,这个多进程版本的性能与使用线程并在计算期间释放GIL(在C扩展中)的解决方案相同:
$ python -mtimeit -s'from test_cydot import a,b,out,np' 'np.dot(a,b,out)'
100 loops, best of 3: 9.05 msec per loop

$ python -mtimeit -s'from test_cydot import a,b,out,cydot' 'cydot.dot(a,b,out)' 
10 loops, best of 3: 88.8 msec per loop

$ python -mtimeit -s'from test_cydot import a,b; import mpdot' 'mpdot.dot(a,b)'
done slice(49, 50, None)
..[snip]..
done slice(35, 42, None)
10 loops, best of 3: 82.3 msec per loop

注意:测试已更改为在所有地方使用np.float64

2

矩阵乘法是指计算每个结果矩阵元素。这似乎是使用进程池的工作。由于这是作业(也是为了遵循 SO 代码),我只会演示如何使用进程池本身,而不是整个解决方案。

因此,您需要编写一个程序来计算结果矩阵的 (i, j) 元素:

def getProductElement(m1, m2, i, j):
    # some calculations
    return element

然后您需要初始化连接池:
from multiprocessing import Pool, cpu_count
pool = Pool(processes=cpu_count())

然后你需要提交这些任务。你也可以将它们组织成矩阵,但为什么要费心呢,让我们只列一个清单。

result = []
# here you need to iterate through the the columns of the first and the rows of
# the second matrix. How you do it, depends on the implementation (how you store
# the matrices). Also, make sure you check the dimensions are the same.
# The simplest case is if you have a list of columns:

N = len(m1)
M = len(m2[0])
for i in range(N):
    for j in range(M):
        results.append(pool.apply_async(getProductElement, (m1, m2, i, j)))

然后使用结果填充矩阵:
m = []
count = 0
for i in range(N):
    column = []
    for j in range(M):
        column.append(results[count].get())
    m.append(column)

再次强调,代码的确切形状取决于您如何表示矩阵。


1
复制整个矩阵来计算单个元素是非常低效的。 - jfs
@J.F.Sebastian,我一直认为它们是按引用传递的。我一直错了吗?至少在这个例子中,矩阵是列表。 - Lev Levitsky
哎呀,我没有考虑到那个问题。现在我需要重新思考很多东西,谢谢 :) - Lev Levitsky
@user1249212 不,这不应该是个问题,该函数是异步应用的。然而,内存使用对于这个解决方案确实是一个问题,正如J.F. Sebastian所指出的那样。尝试将我的答案与moooeeep关于共享数组的建议相结合。 - Lev Levitsky
是的,我读了Sebastian在moooeeep的解决方案中的建议,我认为你回答了我所有的问题,并提出了更好的解决方案。非常感谢你的帮助。 - user1249212
显示剩余4条评论

-4

不需要。

要么他们以主程序可用的格式返回他们的编辑内容,要么你使用某种进程间通信方式让他们传送修改,或者你使用一些共享存储方式,比如数据库或者像 Redis 这样的数据结构服务器。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接