使用Multiprocessing运行子进程时出现系统错误

8

在使用Multiprocessing软件包(python 2.73与numpy 1.7.0在Amazon EC2上的Ubuntu 12.04)进行一些基于numpy的矩阵代数计算时,我遇到了系统错误(如下所示)。我的代码对于较小的矩阵大小运行良好,但对于较大的矩阵(即使有足够的可用内存)会崩溃。我使用的矩阵的大小相当大(顺便说一句,我将这些矩阵传递给/来自子进程)。10 vs 500是一个运行时参数,其他所有内容都保持不变(输入数据,其他运行时参数等)。

我还尝试使用python3运行相同的代码-对于较大的矩阵,子进程进入睡眠/空闲模式(而不是像python 2.7中那样崩溃),程序/子进程什么都没做。对于较小的矩阵,代码在python 3中运行良好。

非常感谢任何建议(我的想法已经用尽了)

错误消息:

Exception in thread Thread-5: Traceback (most recent call last):  
File "/usr/lib/python2.7/threading.py", line 551, in __bootstrap_inner
    self.run()   File "/usr/lib/python2.7/threading.py", line 504, in run
    self.__target(*self.__args, **self.__kwargs)   File "/usr/lib/python2.7/multiprocessing/pool.py", line 319, in _handle_tasks
    put(task) SystemError: NULL result without error in PyObject_Call

我使用的多进程代码:

def runProcessesInParallelAndReturn(proc, listOfInputs, nParallelProcesses):
    if len(listOfInputs) == 0:
        return
    # Add result queue to the list of argument tuples.
    resultQueue = mp.Manager().Queue()
    listOfInputsNew = [(argumentTuple, resultQueue) for argumentTuple in listOfInputs]
    # Create and initialize the pool of workers.
    pool = mp.Pool(processes = nParallelProcesses)
    pool.map(proc, listOfInputsNew)
    # Run the processes.
    pool.close()
    pool.join()
    # Return the results.
    return [resultQueue.get() for i in range(len(listOfInputs))]

以下是每个子进程执行的“proc”。基本上,它使用numpy解决许多线性方程组(在子进程内构建所需的矩阵),并将结果作为另一个矩阵返回。再次强调,在较小的运行时参数值下工作正常,但对于较大的值会崩溃(或在python3中挂起)。

def solveForLFV(param):
    startTime = time.time()
    (chunkI, LFVin, XY, sumLFVinOuterProductLFVallPlusPenaltyTerm, indexByIndexPurch, outerProductChunkSize, confWeight), queue = param
    LFoutChunkSize = XY.shape[0]
    nLFdim = LFVin.shape[1]
    sumLFVinOuterProductLFVpurch = np.zeros((nLFdim, nLFdim))
    LFVoutChunk = np.zeros((LFoutChunkSize, nLFdim))
    for LFVoutIndex in xrange(LFoutChunkSize):
        LFVInIndexListPurch = indexByIndexPurch[LFVoutIndex]
        sumLFVinOuterProductLFVpurch[:, :] = 0.
        LFVInIndexChunkLow, LFVInIndexChunkHigh = getChunkBoundaries(len(LFVInIndexListPurch), outerProductChunkSize)
        for LFVInIndexChunkI in xrange(len(LFVInIndexChunkLow)):
            LFVinSlice = LFVin[LFVInIndexListPurch[LFVInIndexChunkLow[LFVInIndexChunkI] : LFVInIndexChunkHigh[LFVInIndexChunkI]], :]
            sumLFVinOuterProductLFVpurch += sum(LFVinSlice[:, :, np.newaxis] * LFVinSlice[:, np.newaxis, :])
        LFVoutChunk[LFVoutIndex, :] = np.linalg.solve(confWeight * sumLFVinOuterProductLFVpurch + sumLFVinOuterProductLFVallPlusPenaltyTerm, XY[LFVoutIndex, :])
    queue.put((chunkI, LFVoutChunk))
    print 'solveForLFV: ', time.time() - startTime, 'sec'
    sys.stdout.flush()

你能分享 proc 函数的代码吗? - barracel
刚刚完成了。我还没有描述proc的参数 - 其中一些是矩阵,一些是列表的列表,还有一些只是浮点数/整数。 queue 用于从每个子进程返回结果。 - Yevgeny
1个回答

6

500,000,000是相当大的数字:如果你使用的是float64,那么就需要4亿字节,大约4GB。(10,000,000个float数组只需80,000,000字节,大约80MB - 小得多。) 我猜测问题可能与多进程试图将数组pickle到管道中发送给子进程有关。

由于您在unix平台上,可以利用fork()的内存继承行为(用于创建多进程的工作程序)来避免这种行为。我曾经从这个项目中学习并成功利用这一技巧(如注释所述)。

### A helper for letting the forked processes use data without pickling.
_data_name_cands = (
    '_data_' + ''.join(random.sample(string.ascii_lowercase, 10))
    for _ in itertools.count())
class ForkedData(object):
    '''
    Class used to pass data to child processes in multiprocessing without
    really pickling/unpickling it. Only works on POSIX.

    Intended use:
        - The master process makes the data somehow, and does e.g.
            data = ForkedData(the_value)
        - The master makes sure to keep a reference to the ForkedData object
          until the children are all done with it, since the global reference
          is deleted to avoid memory leaks when the ForkedData object dies.
        - Master process constructs a multiprocessing.Pool *after*
          the ForkedData construction, so that the forked processes
          inherit the new global.
        - Master calls e.g. pool.map with data as an argument.
        - Child gets the real value through data.value, and uses it read-only.
    '''
    # TODO: does data really need to be used read-only? don't think so...
    # TODO: more flexible garbage collection options
    def __init__(self, val):
        g = globals()
        self.name = next(n for n in _data_name_cands if n not in g)
        g[self.name] = val
        self.master_pid = os.getpid()

    @property
    def value(self):
        return globals()[self.name]

    def __del__(self):
        if os.getpid() == self.master_pid:
            del globals()[self.name]

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接