在一个长度相同的一维NumPy数组上,评估一维函数数组的并行化算法

5
以下内容的要点是,我有一个 pe embarrassingly parallel 的 for 循环,我正在尝试将其线程化。需要解释一下问题,但尽管有很多冗长的描述,我认为这应该是一个相当简单的问题,多进程模块应该能够轻松解决。
我有一个长度为 N 的大数组,包含 k 个不同的函数,以及一个长度为 N 的 abcissa 数组。由于 @senderle 提供的巧妙解决方案(Efficient algorithm for evaluating a 1-d array of functions on a same-length 1d numpy array),我可以使用快速的基于 numpy 的算法,在 abcissa 上评估这些函数,返回一个长度为 N 的 ordinates 数组。
def apply_indexed_fast(abcissa, func_indices, func_table):
    """ Returns the output of an array of functions evaluated at a set of input points 
    if the indices of the table storing the required functions are known. 

    Parameters 
    ----------
    func_table : array_like 
        Length k array of function objects

    abcissa : array_like 
        Length Npts array of points at which to evaluate the functions. 

    func_indices : array_like 
        Length Npts array providing the indices to use to choose which function 
        operates on each abcissa element. Thus func_indices is an array of integers 
        ranging between 0 and k-1. 

    Returns 
    -------
    out : array_like 
        Length Npts array giving the evaluation of the appropriate function on each 
        abcissa element. 
    """
    func_argsort = func_indices.argsort()
    func_ranges = list(np.searchsorted(func_indices[func_argsort], range(len(func_table))))
    func_ranges.append(None)
    out = np.zeros_like(abcissa)

    for i in range(len(func_table)):
        f = func_table[i]
        start = func_ranges[i]
        end = func_ranges[i+1]
        ix = func_argsort[start:end]
        out[ix] = f(abcissa[ix])

    return out

我现在尝试使用多进程来并行化此函数内部的for循环。为了清晰起见,我将简要概述@senderle开发的算法。如果您可以立即阅读上面的代码并理解它,请跳过下一段文字。
首先,我们找到对输入func_indices进行排序的索引数组,用它来定义长度为k的整数数组func_rangesfunc_ranges的整数条目控制应用于输入abcissa的适当子数组的函数,其工作方式如下。让f是输入func_table中的第i个函数。然后,我们应该将函数f应用于的abcissa的切片是slice(func_ranges [i],func_ranges [i + 1])。因此,一旦计算出func_ranges,我们只需在输入func_table上运行简单的for循环,并依次将每个函数对象应用于适当的切片,填充输出数组。请参见下面的代码,以了解此算法在操作中的最小示例。
def trivial_functional(i): 
    def f(x):
        return i*x
    return f

k = 250
func_table = np.array([trivial_functional(j) for j in range(k)])

Npts = 1e6
abcissa = np.random.random(Npts)
func_indices = np.random.random_integers(0,len(func_table)-1,Npts)

result = apply_indexed_fast(abcissa, func_indices, func_table)

所以我的目标现在是使用多进程来并行计算。我认为使用线程的尴尬并行循环的常规技巧应该很简单。但是,我以下的尝试引发了一个我不理解的异常。

from multiprocessing import Pool, cpu_count
def apply_indexed_parallelized(abcissa, func_indices, func_table):
    func_argsort = func_indices.argsort()
    func_ranges = list(np.searchsorted(func_indices[func_argsort], range(len(func_table))))
    func_ranges.append(None)
    out = np.zeros_like(abcissa)

    num_cores = cpu_count()
    pool = Pool(num_cores)

    def apply_funci(i):
        f = func_table[i]
        start = func_ranges[i]
        end = func_ranges[i+1]
        ix = func_argsort[start:end]
        out[ix] = f(abcissa[ix])

    pool.map(apply_funci, range(len(func_table)))
    pool.close()

    return out

result = apply_indexed_parallelized(abcissa, func_indices, func_table)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

我在 Stack Overflow 上看到过这个问题:如何在类中使用 Pool.map 进行多进程处理?。我已经尝试了那里提出的每种方法;在所有情况下,我都会得到一个“打开的文件太多”的错误,因为线程从未关闭,或者适应的算法只是挂起。这似乎应该有一个简单的解决方案,因为这只是线程化一个并行的 for 循环。

如果我没记错的话,你的问题是由于使用了一个不可从__main__命名空间访问的函数(即将在另一个函数内部定义或者超出主要范围的函数传递给池)。 - Joe Kington
是的,我之前也遇到过这个建议,但我没有看到有人实现解决方案。我的第一个想法是Python正在强制执行这个极其严格的条件。你能看出如何使这个问题符合这个要求吗?我不知道该怎么做,因为func_table和func_indices必须绑定在我的函数命名空间内。 - aph
2
一个解决方案是传递一个类的实例。然而,你有一个更深层次的问题。你把numpy数组当作共享内存来处理。相反,将会复制并独立地传递给每个进程。原始数组不会被修改。 - Joe Kington
是的,你说得对,这是一个更深层次的问题。这是我第一次尝试以这种方式并行化某些东西,所以我以前没有遇到过这个问题。你知道如何使用多进程模块以允许线程共享对numpy数组的访问吗?或者你能指导我去哪里阅读相关资源吗? - aph
1
关于numpy和多进程的其他问题,请参考sharedmem,https://github.com/sturlamolden/sharedmem-numpy。但它只能节省内存,而不能节省时间。 - hpaulj
如果函数释放了GIL(大多数Numpy函数都会在某种程度上这样做),那么您可以从实际的线程而不是单独的进程中受益。为了进行快速测试,请将第一行更改为“from multiprocessing.dummy import Pool,cpu_count”。这应该消除对所有内容进行pickle的需要。 - user2379410
1个回答

7

警告/注意:

您可能不想将multiprocessing应用于此问题。您会发现,对于大型数组的相对简单操作,问题将受到numpy的内存限制。瓶颈是将数据从RAM移动到CPU缓存中。CPU缺乏数据,因此将更多的CPU投入问题并没有太大帮助。此外,您当前的方法将为输入序列中的每个项目拆分和复制整个数组,这会增加很多开销。

有很多情况下,numpy+multiprocessing非常有效,但您需要确保正在处理一个CPU绑定问题。理想情况下,它是一个具有相对较小的输入和输出以减轻输入和输出拆分的开销的CPU绑定问题。对于numpy最常用的许多问题来说,情况并非如此。


你当前方法存在两个问题

接下来回答你的问题:

你目前的错误是因为传入了一个无法从全局作用域访问的函数(即在函数内部定义的函数)。

然而,你还有另一个问题。你把numpy数组当作共享内存来处理,认为每个进程都可以修改它们。但实际上,在使用多进程时,原始数组将被pickle化(相当于复制)并独立地传递给每个进程。原始数组永远不会被修改。


避免PicklingError

为了复现您的错误,考虑以下最简示例:

import multiprocessing

def apply_parallel(input_sequence):
    def func(x):
        pass
    pool = multiprocessing.Pool()
    pool.map(func, input_sequence)
    pool.close()

foo = range(100)
apply_parallel(foo)

这将导致:
PicklingError: Can't pickle <type 'function'>: attribute lookup 
               __builtin__.function failed

当然,在这个简单的例子中,我们可以将函数定义简单地移回到__main__命名空间中。但是,在您的情况下,需要引用传递进来的数据。让我们看一个更接近您正在做的示例:
import numpy as np
import multiprocessing

def parallel_rolling_mean(data, window):
    data = np.pad(data, window, mode='edge')
    ind = np.arange(len(data)) + window

    def func(i):
        return data[i-window:i+window+1].mean()

    pool = multiprocessing.Pool()
    result = pool.map(func, ind)
    pool.close()
    return result

foo = np.random.rand(20).cumsum()
result = parallel_rolling_mean(foo, 10)

有多种方法可以处理这个问题,但常见的方法是:

import numpy as np
import multiprocessing

class RollingMean(object):
    def __init__(self, data, window):
        self.data = np.pad(data, window, mode='edge')
        self.window = window

    def __call__(self, i):
        start = i - self.window
        stop = i + self.window + 1
        return self.data[start:stop].mean()

def parallel_rolling_mean(data, window):
    func = RollingMean(data, window)
    ind = np.arange(len(data)) + window

    pool = multiprocessing.Pool()
    result = pool.map(func, ind)
    pool.close()
    return result

foo = np.random.rand(20).cumsum()
result = parallel_rolling_mean(foo, 10)

太棒了!它起作用了!


然而,如果您将其扩展到大型数组,则很快会发现它要么运行速度非常慢(可以通过在pool.map调用中增加chunksize来加速),要么很快就会耗尽内存(一旦增加chunksize)。

multiprocessing对输入数据进行了pickle处理,以便可以将其传递给独立的Python进程。这意味着您为每个操作的i复制了整个数组。

稍后我们会回到这一点...


multiprocessing不会在进程之间共享内存

multiprocessing模块通过将输入进行pickling并将其传递给独立进程来工作。这意味着,如果您在一个进程中修改了某些内容,则另一个进程将看不到该修改。

但是,multiprocessing还提供了存在于共享内存中的原语,可以被子进程访问和修改。有几种不同的方法可以调整numpy数组以使用共享内存multiprocessing.Array。但是,我建议您首先避免使用它们(如果您不熟悉虚假共享, 请先阅读相关资料)。虽然有些情况下它非常有用,但通常是为了节省内存而不是提高性能。

因此,最好在单个进程中执行对大数组的所有修改(这也是一种非常有用的通用IO模式)。它不必是“主”进程,但这种方式最容易理解。
例如,假设我们想要让我们的parallel_rolling_mean函数接受一个输出数组来存储数据。以下是类似于以下内容的有用模式。请注意使用迭代器并仅在主进程中修改输出:
import numpy as np
import multiprocessing

def parallel_rolling_mean(data, window, output):
    def windows(data, window):
        padded = np.pad(data, window, mode='edge')
        for i in xrange(len(data)):
            yield padded[i:i + 2*window + 1]

    pool = multiprocessing.Pool()
    results = pool.imap(np.mean, windows(data, window))
    for i, result in enumerate(results):
        output[i] = result
    pool.close()

foo = np.random.rand(20000000).cumsum()
output = np.zeros_like(foo)
parallel_rolling_mean(foo, 10, output)
print output

希望这个例子能够帮助澄清一些问题。

chunksize 和性能

关于性能的一个快速提示:如果我们将其扩展,它会变得非常缓慢。如果您查看系统监视器(例如top/htop),您可能会注意到大部分时间内您的核心处于空闲状态。

默认情况下,主进程为每个进程拆分并立即传递每个输入,然后等待它们完成以拆分下一个输入。在许多情况下,这意味着主进程工作,然后处于空闲状态,而工作进程正在忙碌,然后工作进程处于空闲状态,而主进程正在拆分下一个输入。

关键是要增加 chunksize 参数。这将导致 pool.imap 为每个进程“预取”指定数量的输入。基本上,主线程可以保持繁忙状态拆分输入,而工作进程可以保持繁忙状态处理。缺点是您使用了更多的内存。如果每个输入使用大量RAM,则可能不是一个好主意。但是,如果没有,这可以极大地加快速度。

以下是一个快速示例:

import numpy as np
import multiprocessing

def parallel_rolling_mean(data, window, output):
    def windows(data, window):
        padded = np.pad(data, window, mode='edge')
        for i in xrange(len(data)):
            yield padded[i:i + 2*window + 1]

    pool = multiprocessing.Pool()
    results = pool.imap(np.mean, windows(data, window), chunksize=1000)
    for i, result in enumerate(results):
        output[i] = result
    pool.close()

foo = np.random.rand(2000000).cumsum()
output = np.zeros_like(foo)
parallel_rolling_mean(foo, 10, output)
print output

使用chunksize=1000,处理一个200万元素的数组需要21秒:

python ~/parallel_rolling_mean.py  83.53s user 1.12s system 401% cpu 21.087 total

但是使用chunksize=1(默认值)需要大约8倍的时间(2分钟41秒)。

python ~/parallel_rolling_mean.py  358.26s user 53.40s system 246% cpu 2:47.09 total

事实上,使用默认的块大小,它实际上比相同任务的单进程实现要糟糕得多,后者只需要45秒:
python ~/sequential_rolling_mean.py  45.11s user 0.06s system 99% cpu 45.187 total

哇,非常有启发的回答,乔。我已经读了好几遍,并且从中不断学到新的东西。非常非常感谢你抽出时间写下这样深思熟虑的回答。 - aph

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接