按行对NumPy数组进行索引

5

假设我有一个NumPy数组:

>>> X = np.array([[1, 2, 3, 4], [5, 6, 7, 8], [9, 10, 11, 12]])
>>> X
array([[ 1,  2,  3,  4],
       [ 5,  6,  7,  8],
       [ 9, 10, 11, 12]])

我有一个数组,其中包含每行要选择的索引:

>>> ixs = np.array([[1, 3], [0, 1], [1, 2]])
>>> ixs
array([[1, 3],
       [0, 1],
       [1, 2]])

如何对数组X进行索引,使得对于X中的每一行,我都可以选择ixs指定的两个索引?

因此,在这种情况下,我想为第一行选择元素1和3,为第二行选择元素0和1,依此类推。输出应该是:

array([[2, 4],
       [5, 6],
       [10, 11]])

一个缓慢的解决方案可能是这样的: output = np.array([row[ix] for row, ix in zip(X, ixs)]) 然而,对于非常长的数组,这可能会变得很慢。有没有一种更快速地使用NumPy而不使用循环来实现此功能的方法?
编辑:对于一个2.5K * 1M数组和2K宽的ixs(10GB)进行了一些非常近似的速度测试: np.array([row[ix] for row, ix in zip(X, ixs)]) 0.16秒 X[np.arange(len(ixs)), ixs.T].T 0.175秒 X.take(idx+np.arange(0, X.shape[0]*X.shape[1], X.shape[1])[:,None]) 33秒 np.fromiter((X[i, j] for i, row in enumerate(ixs) for j in row), dtype=X.dtype).reshape(ixs.shape) 2.4秒

这将产生一个接近您所需的数组:X [:,ixs]。 有人可以在此基础上继续吗? - Bill
你认为并行化是一个可接受的解决方案吗? - Mohsin
你能像这样重构ixs吗:ixs2 = np.array(((0, 1), (0, 3), (1, 0), ...))?如果可以的话,那么X[ixs2[:,0], ixs2[:,1]]应该能得到你所需的结果。 - Bill
不要@MohsinBukhari,我已经在更高的循环中进行了并行处理。我也认为并行化在这里没有帮助,因为进程间传递信息很慢。 - mxbi
嗯,你尝试过使用numba JIT的直接for循环实现吗? - juanpa.arrivillaga
4个回答

7
您可以使用以下内容:
X[np.arange(len(ixs)), ixs.T].T

这里 是复杂索引的参考文献。


这是一个更整洁的解决方案,但在我的(大约)测试中实际上要慢一些。我测量了原始版本的1.15秒和1.75秒。 - mxbi
我的数组是10K * 1M @juanpa.arrivillaga - 尽管我最初的测试是在一个小得多的数组上进行的。在大数组上,这个版本只比列表推导略慢一点。 - mxbi
@juanpa.arrivillaga 不,OP 说的是 10K * 1M 矩阵。如果是这样的话,内存大小为80G(10G longlong 是8个字节)。 - llllllllll
@mxbi 这对我来说听起来很奇怪。但是我手头没有测量的机器。如果你感兴趣,也许你可以编写一个 C++ 程序来做同样的事情(应该只需要几行代码),并查看经过的时间。 - llllllllll
@juanpa.arrivillaga 这只是为了测量目的而编写的几行代码。我很好奇为什么这种方法会像Python循环一样慢。 - llllllllll
显示剩余14条评论

3

我相信你可以这样使用.take:

In [185]: X
Out[185]:
array([[ 1,  2,  3,  4],
       [ 5,  6,  7,  8],
       [ 9, 10, 11, 12]])

In [186]: idx
Out[186]:
array([[1, 3],
       [0, 1],
       [1, 2]])

In [187]: X.take(idx + (np.arange(X.shape[0]) * X.shape[1]).reshape(-1, 1))
Out[187]:
array([[ 2,  4],
       [ 5,  6],
       [10, 11]])

如果你的数组维度非常大,使用以下方式可能更快,尽管不太美观:
idx+np.arange(0, X.shape[0]*X.shape[1], X.shape[1])[:,None]

只是为了好玩,看看下面的表现如何:

np.fromiter((X[i, j] for i, row in enumerate(ixs) for j in row), dtype=X.dtype, count=ixs.size).reshape(ixs.shape)

编辑以添加时间

In [15]: X = np.arange(1000*10000, dtype=np.int32).reshape(1000,-1)

In [16]: ixs = np.random.randint(0, 10000, (1000, 2))

In [17]: ixs.sort(axis=1)

In [18]: ixs
Out[18]:
array([[2738, 3511],
       [3600, 7414],
       [7426, 9851],
       ...,
       [1654, 8252],
       [2194, 8200],
       [5497, 8900]])

In [19]: %timeit  np.array([row[ix] for row, ix in zip(X, ixs)])
928 µs ± 23.8 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)

In [20]: %timeit X[np.arange(len(ixs)), ixs.T].T
23.6 µs ± 491 ns per loop (mean ± std. dev. of 7 runs, 10000 loops each)

In [21]: %timeit X.take(idx+np.arange(0, X.shape[0]*X.shape[1], X.shape[1])[:,None])
20.6 µs ± 530 ns per loop (mean ± std. dev. of 7 runs, 10000 loops each)

In [22]: %timeit np.fromiter((X[i, j] for i, row in enumerate(ixs) for j in row), dtype=X.dtype, count=ixs.size).reshape(ixs.shape)
1.42 ms ± 9.94 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)

@mxbi,我添加了一些时间,我的结果与你的不太一致,你应该检查一下

这是一个更大的数组:

In [33]: X = np.arange(10000*100000, dtype=np.int32).reshape(10000,-1)

In [34]: ixs = np.random.randint(0, 100000, (10000, 2))

In [35]: ixs.sort(axis=1)

In [36]: X.shape
Out[36]: (10000, 100000)

In [37]: ixs.shape
Out[37]: (10000, 2)

有一些结果:

In [42]: %timeit  np.array([row[ix] for row, ix in zip(X, ixs)])
11.4 ms ± 177 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

In [43]: %timeit X[np.arange(len(ixs)), ixs.T].T
596 µs ± 17.8 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)

In [44]: %timeit X.take(ixs+np.arange(0, X.shape[0]*X.shape[1], X.shape[1])[:,None])
540 µs ± 16.6 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)

现在,我们使用500个索引而不是两个,我们看到列表推导式开始胜出:

In [45]: ixs = np.random.randint(0, 100000, (10000, 500))

In [46]: ixs.sort(axis=1)

In [47]: %timeit  np.array([row[ix] for row, ix in zip(X, ixs)])
93 ms ± 1.2 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

In [48]: %timeit X[np.arange(len(ixs)), ixs.T].T
133 ms ± 638 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)

In [49]: %timeit X.take(ixs+np.arange(0, X.shape[0]*X.shape[1], X.shape[1])[:,None])
87.5 ms ± 1.13 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)

这个版本的执行速度比较慢。在一个 10K x 1M 的矩阵上,与列表推导式相比,更快的版本需要 33 秒才能完成,而列表推导式只需要 170 毫秒。 - mxbi
@mxbi,10G的“long”确实太多了,瓶颈可能是操作系统分页。 - llllllllll
@mxbi 你试过这个版本吗:idx+np.arange(0, X.shape[0]*X.shape[1], X.shape[1])[:,None],我不认为它会快到哪里去,但我很好奇它是否比其他方法更快... - juanpa.arrivillaga
@mxbi 在 fromiter 中添加 count 参数会有明显的差异吗? - juanpa.arrivillaga
不超出误差范围。看起来差不多。 - mxbi
显示剩余5条评论

1
通常对于从行中索引项目的建议是:
X[np.arange(X.shape[0])[:,None], ixs]

即创建一个形状为(n,1)的行索引(列向量),它将与形状为(n,m)的ixs广播以给出(n,m)的解决方案。

这基本上与以下相同:

X[np.arange(len(ixs)), ixs.T].T

这段代码是针对一个(m,n)的数组进行(n,)维度的广播和转置操作。

时间复杂度基本相同:

In [299]: X = np.ones((1000,2000))
In [300]: ixs = np.random.randint(0,2000,(1000,200))
In [301]: timeit X[np.arange(len(ixs)), ixs.T].T
6.58 ms ± 71.6 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
In [302]: timeit X[np.arange(X.shape[0])[:,None], ixs]
6.57 ms ± 129 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

并且用于比较:

In [307]: timeit np.array([row[ix] for row, ix in zip(X, ixs)])
6.63 ms ± 229 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

我有点惊讶这个列表推导式表现得如此出色。我想知道当维度变化时相对优势的比较情况,特别是在 Xixs 的相对形状(长,宽等)方面。


第一种解决方案是由ix_生成的索引样式:
In [303]: np.ix_(np.arange(3), np.arange(2))
Out[303]: 
(array([[0],
        [1],
        [2]]), array([[0, 1]]))

0

这应该可以正常工作

[X[i][[y]] for i, y in enumerate(ixs)] 

编辑:我刚注意到您想要无循环解决方案。


这个解决方案也非常接近我的原始解决方案(如果我将其包装在对np.array的调用中)。 - mxbi

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接