从1D数组高效构建Numpy二维数组

38

我有一个这样的数组:

A = array([1,2,3,4,5,6,7,8,9,10])

我正在尝试获取这样的数组:

B = array([[1,2,3],
          [2,3,4],
          [3,4,5],
          [4,5,6]])

每一行(宽度固定)都向右移动一个单位。数组 A 的长度为 10k 条记录,我正在尝试在 Numpy 中找到一种高效的方法来实现此操作。目前,我正在使用 vstack 和 for 循环,但速度很慢。是否有更快的方法?

width = 3 # fixed arbitrary width
length = 10000 # length of A which I wish to use
B = A[0:length + 1]
for i in range (1, length):
    B = np.vstack((B, A[i, i + width + 1]))

你能贴出你的vstack/loop解决方案吗? - eumiro
@wxbx:请详细说明您的目标是什么?请注意,B = array([1,2,3],[2,3,4],[3,4,5],[4,5,6]) 不是有效的 numpy - eat
@wxbx - 你的解决方案真的很不幸。你把数组 vstack 了10000次!看看我的答案,我只vstack了一次。 - eumiro
@wxbx:你的 B= array([1, ... 仍然不是有效的 numpy 代码。但你接受答案的速度真的很快;-)。谢谢。 - eat
@wxbx:我当时并不是在指出我的答案。现在你看,有很多好的答案。谢谢。 - eat
显示剩余5条评论
7个回答

61
实际上,有一种更有效的方法来做到这一点...使用vstack等的缺点是你正在复制数组。
顺便说一下,这与@Paul的答案基本相同,但我发表这篇文章只是为了更详细地解释一些东西...
有一种方法可以只使用视图来完成这个过程,以便不会复制任何内存。
我直接从Erik Rigtorp的帖子numpy-discussion中借鉴了这个方法,他又从Keith Goodman的Bottleneck(非常有用!)中借鉴了这个方法。
基本技巧是直接操作数组的步幅(对于一维数组):
import numpy as np

def rolling(a, window):
    shape = (a.size - window + 1, window)
    strides = (a.itemsize, a.itemsize)
    return np.lib.stride_tricks.as_strided(a, shape=shape, strides=strides)

a = np.arange(10)
print rolling(a, 3)

其中a是您的输入数组,window是您想要的窗口长度(在您的情况下为3)。

这将产生以下结果:

[[0 1 2]
 [1 2 3]
 [2 3 4]
 [3 4 5]
 [4 5 6]
 [5 6 7]
 [6 7 8]
 [7 8 9]]

然而,原始的a和返回的数组之间绝不会有任何内存重复。这意味着它比其他选项更快且可扩展性明显更好

例如(使用a = np.arange(100000)window=3):

%timeit np.vstack([a[i:i-window] for i in xrange(window)]).T
1000 loops, best of 3: 256 us per loop

%timeit rolling(a, window)
100000 loops, best of 3: 12 us per loop

如果我们将这个概念推广到N维数组的最后一个轴上的“滚动窗口”,我们就可以得到Erik Rigtorp的“滚动窗口”函数:
import numpy as np

def rolling_window(a, window):
   """
   Make an ndarray with a rolling window of the last dimension

   Parameters
   ----------
   a : array_like
       Array to add rolling window to
   window : int
       Size of rolling window

   Returns
   -------
   Array that is a view of the original array with a added dimension
   of size w.

   Examples
   --------
   >>> x=np.arange(10).reshape((2,5))
   >>> rolling_window(x, 3)
   array([[[0, 1, 2], [1, 2, 3], [2, 3, 4]],
          [[5, 6, 7], [6, 7, 8], [7, 8, 9]]])

   Calculate rolling mean of last dimension:
   >>> np.mean(rolling_window(x, 3), -1)
   array([[ 1.,  2.,  3.],
          [ 6.,  7.,  8.]])

   """
   if window < 1:
       raise ValueError, "`window` must be at least 1."
   if window > a.shape[-1]:
       raise ValueError, "`window` is too long."
   shape = a.shape[:-1] + (a.shape[-1] - window + 1, window)
   strides = a.strides + (a.strides[-1],)
   return np.lib.stride_tricks.as_strided(a, shape=shape, strides=strides)

那么,让我们来看看这里发生了什么......操纵数组的strides可能看起来有点神奇,但一旦你理解了其中的原理,就不难了。numpy数组的步幅描述了沿着给定轴增加一个值所必须采取的步骤的字节数大小。因此,在64位浮点数的1维数组的情况下,每个项目的长度为8个字节,x.strides(8,)

x = np.arange(9)
print x.strides

现在,如果我们将其转换为一个二维的3x3数组,步幅将是(3 * 8, 8),因为我们需要跳过24个字节才能沿第一轴增加一步,并且需要跳过8个字节才能沿第二轴增加一步。
y = x.reshape(3,3)
print y.strides

同样,转置操作等同于仅仅颠倒数组的步长:
print y
y.strides = y.strides[::-1]
print y

显然,一个数组的步幅(strides)和形状(shape)是密切相关的。如果改变其中一个,我们必须相应地改变另一个,否则就无法对实际保存数组值的内存缓冲区进行有效描述。

因此,如果您想同时改变数组的形状和大小,您不能仅通过设置x.stridesx.shape来完成,即使新的步幅和形状是兼容的。

这就是numpy.lib.as_strided所起的作用。它实际上是一个非常简单的函数,只需同时设置数组的步幅和形状即可。

它检查两者是否兼容,但不检查旧的步幅和新形状是否兼容,因为这会发生在将两者分别设置时。(它实际上是通过numpy的__array_interface__来完成的,该接口允许任意类将内存缓冲区描述为numpy数组.)

因此,我们所做的一切都是使它沿一个轴向前移动一个项目(64位数组的8个字节),但同时仅向前移动8个字节

换句话说,在窗口大小为3的情况下,数组的形状为(whatever, 3),但在第二个维度上不是完全移动3*x.itemsize,而是仅向前移动一个项目,从而有效地使新数组的行成为原始数组的“移动窗口”视图。

这也意味着对于你的新数组,x.shape[0] * x.shape[1]不会与x.size相同。
无论如何,希望这能让事情稍微清晰一些...

Kinggton:我非常欣赏你的回答,但是你不觉得这对于OP的问题有点过度了吗?;-)。谢谢。 - eat
5
@eat - 是的! :) 对于一个较短的数组来说这绝对是过度杀伤力了(而且OP的10K元素数组相当短),但了解它仍然很有用。老实说,有时我只是喜欢写过长的答案... - Joe Kington
1
金斯顿:非常感谢你提供如此详细的答案,我从中学到了很多。我还将你的代码与@eumiro的答案进行了基准测试,你的滚动答案使我的程序速度提高了60倍!考虑到我计划在一个更大的数组上使用它,这种加速非常有用。 :) - wxbx
非常感谢你提供这么好的答案!我从来不知道NumPy底层还有这样的东西。绝对值得一看——__array_interface__也非常酷炫! - jolly

11

由于在使用NumPy数组时,需要避免各种类型检查,因此通过Python循环实现的这种解决方案效率不高。如果您的数组非常高,则使用以下方法可以提高速度:

newshape = (4,3)
newstrides = (A.itemsize, A.itemsize)
B = numpy.lib.stride_tricks.as_strided(A, shape=newshape, strides=newstrides)

这提供了数组A的一个视图。如果你想要一个可以编辑的新数组,做同样的事情但在结尾加上.copy()

步幅的细节:

在这种情况下,newstrides元组将是(4,4),因为数组具有4字节项,您希望在单个项步长中继续沿i维遍历数据。第二个值'4'指的是j维度中的步幅(在正常的4x4数组中,它将是16)。因为在这种情况下,您还希望按4字节步长递增缓冲区中的读取,在j维度中。

当Joe说这个技巧只是同时改变步幅和形状时,他给出了一个不错的、详细的描述,并使事情清晰易懂。


1
+1 你比我快!我正在输入这个... 我还是会发布我的答案,因为它会更详细一些。此外,你的 strides=(4,4) 假设 A.itemsize 是 4(即32位浮点数或整数)。最好使用 strides=(A.itemsize, A.itemsize) - Joe Kington
你能指引我去哪里找这个函数的文档吗?我以前从未见过它... - Benjamin
1
谢谢Joe。我在寻找一些在线文档来链接,但是并没有太多的选择!这是我能找到的最好的:http://mentat.za.net/numpy/numpy_advanced_slides/。 - Paul
1
@Benjamin - 它只是同时设置数组的步幅和形状。这适用于需要同时更改两者的情况,但新步幅与旧形状不兼容,反之亦然,因此您不能只执行 x.strides = new_stridesx.shape = new_shape - Joe Kington
@Paul:到目前为止,没有任何答案中明确使用了Python循环。您能否详细说明一下“因为它们依赖于Python循环”是什么意思?谢谢。 - eat
显示剩余4条评论

2
你使用的是哪种方法?
import numpy as np
A = np.array([1,2,3,4,5,6,7,8,9,10])
width = 3

np.vstack([A[i:i-len(A)+width] for i in xrange(len(A)-width)])
# needs 26.3µs

np.vstack([A[i:i-width] for i in xrange(width)]).T
# needs 13.2µs

如果您的宽度相对较低(3),并且有一个很大的 A (10000 个元素),那么差异就更加重要:第一种情况需要32.4毫秒,而第二种情况只需要44微秒。

谢谢!这正是我所需要的!今天刚开始使用numpy,正在慢慢学习。 - wxbx

2

仅仅是进一步回答@Joe general的问题

import numpy as np
def rolling(a, window):
    step = 2 
    shape = ( (a.size-window)/step + 1   , window)


    strides = (a.itemsize*step, a.itemsize)

    return np.lib.stride_tricks.as_strided(a, shape=shape, strides=strides)

a = np.arange(10)

print rolling(a, 3)

输出结果为:

[[0 1 2]
 [2 3 4]
 [4 5 6]
 [6 7 8]]

为了进一步概括2D情况,即从图像中提取补丁,请使用它。
def rolling2d(a,win_h,win_w,step_h,step_w):

    h,w = a.shape
    shape = ( ((h-win_h)/step_h + 1)  * ((w-win_w)/step_w + 1) , win_h , win_w)

    strides = (step_w*a.itemsize, h*a.itemsize,a.itemsize)


    return np.lib.stride_tricks.as_strided(a, shape=shape, strides=strides)

a = np.arange(36).reshape(6,6)
print a
print rolling2d (a,3,3,2,2)

输出如下:

[[ 0  1  2  3  4  5]
 [ 6  7  8  9 10 11]
 [12 13 14 15 16 17]
 [18 19 20 21 22 23]
 [24 25 26 27 28 29]
 [30 31 32 33 34 35]]
[[[ 0  1  2]
  [ 6  7  8]
  [12 13 14]]

 [[ 2  3  4]
  [ 8  9 10]
  [14 15 16]]

 [[ 4  5  6]
  [10 11 12]
  [16 17 18]]

 [[ 6  7  8]
  [12 13 14]
  [18 19 20]]]

1
在上面的例子中,是否有可能不获取环绕到原始数组右侧的结果。例如,第三个输出[4,5,6;10,11,12;16,17,18]会“环绕”回来。对于图像处理,我想避免这种情况,只需跳到下一个返回的结果即可。 - Phil Glau

1
请看:view_as_windows
import numpy as np
from skimage.util.shape import view_as_windows
window_shape = (4, )
aa = np.arange(1000000000) # 1 billion
bb = view_as_windows(aa, window_shape)

大约1秒钟。

1

当宽度固定在较低的数字时,我认为这可能比循环更快...

import numpy
a = numpy.array([1,2,3,4,5,6])
b = numpy.reshape(a, (numpy.shape(a)[0],1))
b = numpy.concatenate((b, numpy.roll(b,-1,0), numpy.roll(b,-2,0)), 1)
b = b[0:(numpy.shape(a)[0]/2) + 1,:]

显然,使用步幅的解决方案比这个更优秀,唯一的主要缺点是它们尚未得到很好的记录...


1

我正在使用一种更加通用的函数,类似于@JustInTime的函数,但适用于ndarray

def sliding_window(x, size, overlap=0):
    step = size - overlap # in npts
    nwin = (x.shape[-1]-size)//step + 1
    shape = x.shape[:-1] + (nwin, size)
    strides = x.strides[:-1] + (step*x.strides[-1], x.strides[-1])
    return stride_tricks.as_strided(x, shape=shape, strides=strides)

一个例子,
x = np.arange(10)
M.sliding_window(x, 5, 3)
Out[1]: 
array([[0, 1, 2, 3, 4],
       [2, 3, 4, 5, 6],
       [4, 5, 6, 7, 8]])


x = np.arange(10).reshape((2,5))
M.sliding_window(x, 3, 1)
Out[2]: 
array([[[0, 1, 2],
        [2, 3, 4]],

       [[5, 6, 7],
        [7, 8, 9]]])

谢谢分享这个。请注意,如果窗口不完全匹配,则会截断最后一行(在您的第一个示例中,它会丢失应包含“9”的行)。但是,如果将“nwin”行更改为nwin = int(np.ceil((x.shape[-1]-size)/step + 1)),则似乎会用零填充结果。(有点惊讶这不会导致一些段错误,但我猜这是内置于stride_tricks中的。) - sh37211

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接