在原地映射NumPy数组

56

是否有可能就地映射一个NumPy数组?如果是,如何实现?

给定 a_values - 2D 数组 - 目前这段代码可以满足我的需求:

for row in range(len(a_values)):
    for col in range(len(a_values[0])):
        a_values[row][col] = dim(a_values[row][col])

但这样做实在是太丑了,我怀疑在 NumPy 中一定有一个函数可以用以下代码完成相同的操作并且更加美观:

a_values.map_in_place(dim)

但如果像上面提到的那样有类似的东西存在,我一直没有找到它。


5
你可以使用a_values = np.vectorize(dim)(a_values)来避免使用嵌套循环,但这仍然不是一种原地操作的方法,所以它并不是答案。 - Dan D.
7
我不知道有没有这样的函数,但如果有的话,它只会让代码看起来更整洁。如果你想获得 Numpy 特有的性能提升,那么你需要重新编写 dim() 函数,使其直接适用于 numpy 数组。 - Bob
我曾经尝试滥用vectorize,但现在放弃了。赞成Bob的看法。 - senderle
@Joe Kington,有两个问题:首先,每次调用一个矢量化函数时,实际上它所包装的函数会在数组中第一个值上被调用 两次 ,以确定返回值的类型!因此,使用 vectorize 进行就地工作会导致函数在第一个值上双重应用。所以你必须规避这种行为。其次,这种丑陋的操作没有任何回报;根据我的测试结果,结果并不比嵌套的 for 循环快。 - senderle
尽管性能可能受到影响,但 resize 可以原地重新调整数组形状。 - Samuel
显示剩余5条评论
5个回答

55

只有在空间受到严重限制时,才值得尝试原地操作。如果是这种情况,可以通过迭代数组的平坦视图来稍微加快代码的运行速度。由于reshape 在可能的情况下返回一个新视图,数据本身不会被复制(除非原始数据结构不寻常)。

我不知道有更好的方法来实现对任意Python函数的原地应用。

>>> def flat_for(a, f):
...     a = a.reshape(-1)
...     for i, v in enumerate(a):
...         a[i] = f(v)
... 
>>> a = numpy.arange(25).reshape(5, 5)
>>> flat_for(a, lambda x: x + 5)
>>> a

array([[ 5,  6,  7,  8,  9],
       [10, 11, 12, 13, 14],
       [15, 16, 17, 18, 19],
       [20, 21, 22, 23, 24],
       [25, 26, 27, 28, 29]])

一些时间:

>>> a = numpy.arange(2500).reshape(50, 50)
>>> f = lambda x: x + 5
>>> %timeit flat_for(a, f)
1000 loops, best of 3: 1.86 ms per loop

这个版本的速度是嵌套循环版本的大约两倍:

>>> a = numpy.arange(2500).reshape(50, 50)
>>> def nested_for(a, f):
...     for i in range(len(a)):
...         for j in range(len(a[0])):
...             a[i][j] = f(a[i][j])
... 
>>> %timeit nested_for(a, f)
100 loops, best of 3: 3.79 ms per loop

当然向量化仍然更快,所以如果你可以复制,那么就使用复制:

>>> a = numpy.arange(2500).reshape(50, 50)
>>> g = numpy.vectorize(lambda x: x + 5)
>>> %timeit g(a)
1000 loops, best of 3: 584 us per loop

如果您可以使用内置的ufunc重写dim,那么请不要使用vectorize

>>> a = numpy.arange(2500).reshape(50, 50)
>>> %timeit a + 5
100000 loops, best of 3: 4.66 us per loop

numpy通过就地操作(in-place)执行像+=这样的操作,就像您所期望的那样-因此,您可以获得ufunc速度与无代价的原位应用。有时它甚至更快!请参见此处的示例。


顺便说一下,我对这个问题的最初回答(可以在其编辑历史记录中查看)是荒谬的,涉及到对a索引进行向量化。它不仅必须绕过vectorize类型检测机制,而且结果和嵌套循环版本一样慢。所以聪明反被聪明误!


谢谢你提供的信息(+1)-我会在今天晚些时候开始工作时进行测试。至于为什么我需要这个……数组在pygame.surfarray.pixels2d内部使用。该数组是对图像像素值的引用,而不是副本,所以如果我想要修改场景中的图像/精灵,我需要修改从pygame获得的数组。话虽如此,这是我第一次接触pynum,所以如果我漏掉了什么,请随时纠正我的理解! :) - mac
1
@mac,如果是这样的话,我建议使用eryksun在评论中发布的解决方案:a_values[:] = np.vectorize(dim)(a_values)。它会创建一个副本,但切片赋值(a_values[:])会就地更改数组。如果这不起作用,请告诉我。 - senderle
1
如果你可以使用ufunc重写dim,那么切片赋值+ufunc_dim(a_values[:] = ufunc_dim(a_values))将是最好的解决方案。 - senderle
1
@eryksun,根据mac的最新评论,你的解决方案是最好的。如果你回答的话,我会点赞的。 - senderle
“毫不夸张”简直是轻描淡写了!!!(请看我的回答)。另外,如果真的想走这条路,似乎切片赋值是非NumPy替代方案中最相关的改进。 - mac
显示剩余2条评论

46
这是一篇关于回答和评论中的贡献的撰写,我在接受问题的答案后写下了这篇文章。赞成票总是受欢迎的,但如果您赞成这个答案,请不要忘记也给那些建议下面方法的senderle和(如果他/她写了一个)eryksun投票。
问:是否可能就地映射 numpy 数组? 答:是的,但无法通过单个数组方法实现。您必须编写自己的代码。
以下是比较讨论线程中各种实现的脚本:
import timeit
from numpy import array, arange, vectorize, rint

# SETUP
get_array = lambda side : arange(side**2).reshape(side, side) * 30
dim = lambda x : int(round(x * 0.67328))

# TIMER
def best(fname, reps, side):
    global a
    a = get_array(side)
        t = timeit.Timer('%s(a)' % fname,
                     setup='from __main__ import %s, a' % fname)
    return min(t.repeat(reps, 3))  #low num as in place --> converge to 1

# FUNCTIONS
def mac(array_):
    for row in range(len(array_)):
        for col in range(len(array_[0])):
            array_[row][col] = dim(array_[row][col])

def mac_two(array_):
    li = range(len(array_[0]))
    for row in range(len(array_)):
        for col in li:
            array_[row][col] = int(round(array_[row][col] * 0.67328))

def mac_three(array_):
    for i, row in enumerate(array_):
        array_[i][:] = [int(round(v * 0.67328)) for v in row]


def senderle(array_):
    array_ = array_.reshape(-1)
    for i, v in enumerate(array_):
        array_[i] = dim(v)

def eryksun(array_):
    array_[:] = vectorize(dim)(array_)

def ufunc_ed(array_):
    multiplied = array_ * 0.67328
    array_[:] = rint(multiplied)

# MAIN
r = []
for fname in ('mac', 'mac_two', 'mac_three', 'senderle', 'eryksun', 'ufunc_ed'):
    print('\nTesting `%s`...' % fname)
    r.append(best(fname, reps=50, side=50))
    # The following is for visually checking the functions returns same results
    tmp = get_array(3)
    eval('%s(tmp)' % fname)
    print tmp
tmp = min(r)/100
print('\n===== ...AND THE WINNER IS... =========================')
print('  mac (as in question)       :  %.4fms [%.0f%%]') % (r[0]*1000,r[0]/tmp)
print('  mac (optimised)            :  %.4fms [%.0f%%]') % (r[1]*1000,r[1]/tmp)
print('  mac (slice-assignment)     :  %.4fms [%.0f%%]') % (r[2]*1000,r[2]/tmp)
print('  senderle                   :  %.4fms [%.0f%%]') % (r[3]*1000,r[3]/tmp)
print('  eryksun                    :  %.4fms [%.0f%%]') % (r[4]*1000,r[4]/tmp)
print('  slice-assignment w/ ufunc  :  %.4fms [%.0f%%]') % (r[5]*1000,r[5]/tmp)
print('=======================================================\n')

上述脚本的输出结果 - 至少在我的系统中 - 是:
  mac (as in question)       :  88.7411ms [74591%]
  mac (optimised)            :  86.4639ms [72677%]
  mac (slice-assignment)     :  79.8671ms [67132%]
  senderle                   :  85.4590ms [71832%]
  eryksun                    :  13.8662ms [11655%]
  slice-assignment w/ ufunc  :  0.1190ms [100%]

如您所见,使用numpy的ufunc相比第二和最差的替代方案,速度提高了2个甚至接近3个数量级

如果不能使用ufunc,以下是其他替代方案的比较:

  mac (as in question)       :  91.5761ms [672%]
  mac (optimised)            :  88.9449ms [653%]
  mac (slice-assignment)     :  80.1032ms [588%]
  senderle                   :  86.3919ms [634%]
  eryksun                    :  13.6259ms [100%]

HTH!


4
这是我见过的最好的自问自答之一,值得点赞 :)。 - senderle
此外,“分块切片赋值技巧”(在“mac_three”中)也很有趣,我不禁想知道是否可以使用ufunc代替列表推导式,在空间和时间效率之间达到令人信服的折衷——例如每次迭代处理数组的10%,或类似的方法。 - senderle
@senderle - 感谢您对解决问题的赞赏和建议! ;) 在我的应用程序中,我仅在初始化时使用此函数来生成约100个10x10像素精灵,因此我并不真正追求超级优化... 我最初的问题确实只是出于希望使我的代码更整洁/学习新东西的愿望,但我发布了测试源码,以便其他人如果愿意可以继续玩耍! :) - mac
我知道这已经过时了,但有三个评论。1. 我会在所有情况下将 dim 等变量设为局部变量,以减少开销并更好地显示不同情况之间的差异比例。2. 可以通过使用 array_set = array_.__setitem__; any(array_set(i, dim(x)) for i, x in enumerate(array_)) 来微调 Senderle 的代码。3. 我不确定 eryksun 的版本是否真正是原地操作的。这个问题有被追踪吗?在某些情况下,切片赋值中的右侧项目会被完全评估,以加速实际赋值,因此会暂时创建一个副本。 - agf
@aag - 字符串格式化的语法在py3中已经改变。文档在这里:https://docs.python.org/3.1/library/string.html#format-specification-mini-language。 - mac
显示剩余2条评论

3
为什么不使用numpy的实现,并且使用out_技巧?
from numpy import array, arange, vectorize, rint, multiply, round as np_round 

def fmilo(array_):
    np_round(multiply(array_ ,0.67328, array_), out=array_)

got:

===== ...AND THE WINNER IS... =========================
  mac (as in question)       :  80.8470ms [130422%]
  mac (optimised)            :  80.2400ms [129443%]
  mac (slice-assignment)     :  75.5181ms [121825%]
  senderle                   :  78.9380ms [127342%]
  eryksun                    :  11.0800ms [17874%]
  slice-assignment w/ ufunc  :  0.0899ms [145%]
  fmilo                      :  0.0620ms [100%]
=======================================================

看起来不错...但是它不起作用! :( 你从这个函数得到的结果([[ 0 20 40][ 60 80 100][121 141 161]])与其他测试结果([[ 0 20 40][ 61 81 101][121 141 162]])不一致。如果你能解决这个问题,我很乐意在我的答案中包含你的解决方案并点赞你的回答! :) - mac
3
@mac,@fabrizioM,我想我明白了发生了什么。当你通过“out”将输出数组传递给numpy ufunc时,它会自动将结果转换为输出数组的类型。所以在这种情况下,浮点结果被转换为整数(因此被截断)后再被存储。因此,“fmilo”在功能上等同于“array_ *= 0.67328”。要获得所需的四舍五入行为,您必须执行类似于“rint((array_ * 0.67328), array_)”的操作。但在我的机器上,这实际上比切片赋值慢。 - senderle

2
这只是Mac的论述的更新版本,适用于Python 3.x,并且添加了numbanumpy.frompyfuncnumpy.frompyfunc接受任意的Python函数并返回一个函数,该函数当被转换为numpy.array时,逐个元素地应用该函数。但是,它会改变数组的数据类型为对象,因此不是原位的,并且对该数组的未来计算速度会变慢。
为了避免这种缺点,在测试中将调用numpy.ndarray.astype,将数据类型返回到int类型。
另外说明:
Numba不包含在Python基本库中,如果你想测试它,必须从外部下载。在这个测试中,它实际上什么也没做,如果它被调用时使用@jit(nopython=True),它会给出一个错误消息,表明它无法优化那里的任何东西。然而,由于Numba经常可以加速以函数方式编写的代码,因此为了完整性考虑将其包括在内。
import timeit
from numpy import array, arange, vectorize, rint, frompyfunc
from numba import autojit

# SETUP
get_array = lambda side : arange(side**2).reshape(side, side) * 30
dim = lambda x : int(round(x * 0.67328))

# TIMER
def best(fname, reps, side):
    global a
    a = get_array(side)
    t = timeit.Timer('%s(a)' % fname,
                     setup='from __main__ import %s, a' % fname)
    return min(t.repeat(reps, 3))  #low num as in place --> converge to 1

# FUNCTIONS
def mac(array_):
    for row in range(len(array_)):
        for col in range(len(array_[0])):
            array_[row][col] = dim(array_[row][col])

def mac_two(array_):
    li = range(len(array_[0]))
    for row in range(len(array_)):
        for col in li:
            array_[row][col] = int(round(array_[row][col] * 0.67328))

def mac_three(array_):
    for i, row in enumerate(array_):
        array_[i][:] = [int(round(v * 0.67328)) for v in row]


def senderle(array_):
    array_ = array_.reshape(-1)
    for i, v in enumerate(array_):
        array_[i] = dim(v)

def eryksun(array_):
    array_[:] = vectorize(dim)(array_)

@autojit
def numba(array_):
    for row in range(len(array_)):
        for col in range(len(array_[0])):
            array_[row][col] = dim(array_[row][col])


def ufunc_ed(array_):
    multiplied = array_ * 0.67328
    array_[:] = rint(multiplied)

def ufunc_frompyfunc(array_):
    udim = frompyfunc(dim,1,1)
    array_ = udim(array_)
    array_.astype("int")

# MAIN
r = []
totest = ('mac', 'mac_two', 'mac_three', 'senderle', 'eryksun', 'numba','ufunc_ed','ufunc_frompyfunc')
for fname in totest:
    print('\nTesting `%s`...' % fname)
    r.append(best(fname, reps=50, side=50))
    # The following is for visually checking the functions returns same results
    tmp = get_array(3)
    eval('%s(tmp)' % fname)
    print (tmp)
tmp = min(r)/100
results = list(zip(totest,r))
results.sort(key=lambda x: x[1])

print('\n===== ...AND THE WINNER IS... =========================')
for name,time in results:
    Out = '{:<34}: {:8.4f}ms [{:5.0f}%]'.format(name,time*1000,time/tmp)
    print(Out)
print('=======================================================\n')

最后,结果如下:
===== ...AND THE WINNER IS... =========================
ufunc_ed                          :   0.3205ms [  100%]
ufunc_frompyfunc                  :   3.8280ms [ 1194%]
eryksun                           :   3.8989ms [ 1217%]
mac_three                         :  21.4538ms [ 6694%]
senderle                          :  22.6421ms [ 7065%]
mac_two                           :  24.6230ms [ 7683%]
mac                               :  26.1463ms [ 8158%]
numba                             :  27.5041ms [ 8582%]
=======================================================

2
如果不支持ufuncs,您可以考虑使用cython。它很容易集成,并且可以在特定使用numpy数组的情况下获得显着的加速。

真 (+1)。 - mac

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接