不复制数组的替代numpy roll方法

11
我正在做以下代码的类似操作,但是np.roll()函数的性能令我不满意。我正在将baseArray和otherArray相加,其中在每次迭代中,baseArray都会向前滚动一个元素。但是当我滚动它时,我并不需要baseArray的副本,我更喜欢一个视图,例如当我将baseArray与其他数组相加时,如果baseArray被滚动了两次,那么basearray的第二个元素将与otherArray的第零个元素相加,baseArray的第三个元素将与otherArray的第一元素相加等等。
即实现与np.roll()相同的结果,但不复制数组。
import numpy as np
from numpy import random
import cProfile

def profile():
    baseArray = np.zeros(1000000)
    for i in range(1000):
        baseArray= np.roll(baseArray,1)
        otherArray= np.random.rand(1000000)
        baseArray=baseArray+otherArray

cProfile.run('profile()')

输出(请注意第三行的roll函数):

         9005 function calls in 26.741 seconds

   Ordered by: standard name

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
        1    5.123    5.123   26.740   26.740 <ipython-input-101-9006a6c0d2e3>:5(profile)
        1    0.001    0.001   26.741   26.741 <string>:1(<module>)
     1000    0.237    0.000    8.966    0.009 numeric.py:1327(roll)
     1000    0.004    0.000    0.005    0.000 numeric.py:476(asanyarray)
        1    0.000    0.000    0.000    0.000 {method 'disable' of '_lsprof.Profiler' objects}
     1000   12.650    0.013   12.650    0.013 {method 'rand' of 'mtrand.RandomState' objects}
     1000    0.005    0.000    0.005    0.000 {method 'reshape' of 'numpy.ndarray' objects}
     1000    6.390    0.006    6.390    0.006 {method 'take' of 'numpy.ndarray' objects}
     2000    1.345    0.001    1.345    0.001 {numpy.core.multiarray.arange}
     1000    0.001    0.000    0.001    0.000 {numpy.core.multiarray.array}
     1000    0.985    0.001    0.985    0.001 {numpy.core.multiarray.concatenate}
        1    0.000    0.000    0.000    0.000 {numpy.core.multiarray.zeros}
        1    0.000    0.000    0.000    0.000 {range}
2个回答

5

我相信由于numpy数组内部的表示方式,避免复制是不可能的一个数组由一块连续的内存地址和一些元数据组成,包括数组的维度、项大小和每个维度之间的元素分隔(“步幅”)。向前或向后“滚动”每个元素需要在同一维度上具有不同长度的步幅,这是不可能的。


话虽如此,您可以使用切片索引避免复制除baseArray之外的所有元素:

import numpy as np

def profile1(seed=0):
    gen = np.random.RandomState(seed)
    baseArray = np.zeros(1000000)
    for i in range(1000):
        baseArray= np.roll(baseArray,1)
        otherArray= gen.rand(1000000)
        baseArray=baseArray+otherArray
    return baseArray

def profile2(seed=0):
    gen = np.random.RandomState(seed)
    baseArray = np.zeros(1000000)
    for i in range(1000):
        otherArray = gen.rand(1000000)
        tmp1 = baseArray[:-1]               # view of the first n-1 elements
        tmp2 = baseArray[-1]                # copy of the last element
        baseArray[1:]=tmp1+otherArray[1:]   # write the last n-1 elements
        baseArray[0]=tmp2+otherArray[0]     # write the first element
    return baseArray

这些将会给出相同的结果:

In [1]: x1 = profile1()

In [2]: x2 = profile2()

In [3]: np.allclose(x1, x2)
Out[3]: True

实际上,在性能方面并没有太大的区别:

In [4]: %timeit profile1()
1 loop, best of 3: 23.4 s per loop

In [5]: %timeit profile2()
1 loop, best of 3: 17.3 s per loop

12
谢谢。仅仅是一条评论:实际上性能存在差异,因为您测量的23.4和17.3秒包括生成随机数(在我的真实世界算法中我不会使用),如果只比较np.roll()性能,例如通过在for循环之前放置otherArray创建,则我的时间是14 vs 4秒。 - Marcel

2

我的函数profile3()比其他函数快了四倍。在累加过程中,它使用递增移位的切片索引而不是任何滚动操作。循环结束后,将1000个元素进行一次滚动操作即可与其他函数实现相同的对齐效果。

import numpy as np
from timeit import timeit

def profile1(seed=0):
    gen = np.random.RandomState(seed)
    otherArray= gen.rand(1000000)           # outside the loop after Marcel's comment above
    baseArray = np.zeros(1000000)
    for i in range(1000):
        baseArray= np.roll(baseArray,1)
        baseArray=baseArray+otherArray
    return baseArray

def profile2(seed=0):
    gen = np.random.RandomState(seed)
    otherArray= gen.rand(1000000)
    baseArray = np.zeros(1000000)
    for i in range(1000):
        tmp1 = baseArray[:-1]               # view of the first n-1 elements
        tmp2 = baseArray[-1]                # copy of the last element
        baseArray[1:]=tmp1+otherArray[1:]   # write the last n-1 elements
        baseArray[0]=tmp2+otherArray[0]     # write the first element
    return baseArray

def profile3(seed=0):
    gen = np.random.RandomState(seed)
    otherArray= gen.rand(1000000)
    baseArray = np.zeros(1000000)
    for i in range(1,1001): # use % or itertools.cycle if range > shape
        baseArray[:-i] += otherArray[i:]
        baseArray[-i:] += otherArray[:i]
    return np.roll(baseArray,1000)

print(timeit(profile1,number=1))  # 7.0
print(timeit(profile2,number=1))  # 4.7
print(timeit(profile3,number=1))  # 1.2

x2 = profile2()
x3 = profile3()
print(np.allclose(x2, x3))  # True

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接