使用多个自定义索引范围构建numpy数组而无需显式循环

6
在Numpy中,有没有一种Pythonic的方法可以从array1和array2创建custom ranges数组array3,而无需使用循环? 迭代范围的简单解决方案有效,但由于我的数组达到了数百万个项目,因此我正在寻找更高效的解决方案(可能是语法糖)。 例如,
array1 = np.array([10, 65, 200]) 
array2 = np.array([14, 70, 204])
array3 = np.concatenate([np.arange(array1[i], array2[i]) for i in
                         np.arange(0,len(array1))])

print array3

结果: [10,11,12,13,65,66,67,68,69,200,201,202,203].


2
如果代码能正常运行、清晰易懂且足够快,则它就是“pythonic”的。 numpy-onic 要求消除显式循环。 :) - hpaulj
另一个友好的提醒查询: 这些解决方案有没有对您有用? - Divakar
@Divakar 很抱歉我回复晚了。所有的回答都很好,最终我使用了您的解决方案。非常优雅,谢谢您分享您的思路。这些数据正在运作中,而我一直在度假直到今天。我想将所有的函数收集在这里,以便运行在我的数据上检查性能,因此还没有回复。 - snowmonkey
@snowmonkey 哦,没关系!很高兴终于收到你的回复! :) - Divakar
4个回答

5
假设范围不重叠,您可以构建一个掩码,在索引在由array1array2指定的范围之间为非零,然后使用np.flatnonzero获取索引数组--所需的array3
import numpy as np

array1 = np.array([10, 65, 200]) 
array2 = np.array([14, 70, 204])

first, last = array1.min(), array2.max()
array3 = np.zeros(last-first+1, dtype='i1')
array3[array1-first] = 1
array3[array2-first] = -1
array3 = np.flatnonzero(array3.cumsum())+first
print(array3)

产量
[ 10  11  12  13  65  66  67  68  69 200 201 202 203]

对于较大的array1,使用using_flatnonzero可能比using_loop更快:

def using_flatnonzero(array1, array2):
    first, last = array1.min(), array2.max()
    array3 = np.zeros(last-first+1, dtype='i1')
    array3[array1-first] = 1
    array3[array2-first] = -1
    return np.flatnonzero(array3.cumsum())+first

def using_loop(array1, array2):
    return np.concatenate([np.arange(array1[i], array2[i]) for i in
                           np.arange(0,len(array1))])


array1, array2 = (np.random.choice(range(1, 11), size=10**4, replace=True)
                  .cumsum().reshape(2, -1, order='F'))


assert np.allclose(using_flatnonzero(array1, array2), using_loop(array1, array2))

In [260]: %timeit using_loop(array1, array2)
100 loops, best of 3: 9.36 ms per loop

In [261]: %timeit using_flatnonzero(array1, array2)
1000 loops, best of 3: 564 µs per loop

如果范围重叠,那么using_loop将返回一个包含重复项的array3。而using_flatnonzero则返回一个没有重复项的数组。 解释:让我们来看一个小例子。
array1 = np.array([10, 65, 200]) 
array2 = np.array([14, 70, 204])

目标是构建一个类似下面的数组goal。1的位置位于索引值[10, 11, 12, 13, 65, 66, 67, 68, 69, 200, 201, 202, 203](即array3)。
In [306]: goal
Out[306]: 
array([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0,
       0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
       0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1,
       1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
       0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
       0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
       0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
       0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
       0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1], dtype=int8)

一旦我们有了goal数组,就可以通过调用np.flatnonzero来获得array3

In [307]: np.flatnonzero(goal)
Out[307]: array([ 10,  11,  12,  13,  65,  66,  67,  68,  69, 200, 201, 202, 203])

goal的长度与array2.max()相同:

In [308]: array2.max()
Out[308]: 204

In [309]: goal.shape
Out[309]: (204,)

因此,我们可以开始分配

goal = np.zeros(array2.max()+1, dtype='i1')

然后在由array1给出的索引位置处填入1,在由array2给出的索引位置处填入-1:

In [311]: goal[array1] = 1
In [312]: goal[array2] = -1
In [313]: goal
Out[313]: 
array([ 0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  1,  0,  0,  0, -1,  0,  0,
        0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,
        0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,
        0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  1,  0,  0,
        0,  0, -1,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,
        0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,
        0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,
        0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,
        0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,
        0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,
        0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,
        0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  1,  0,  0,  0,
       -1], dtype=int8)

现在应用 cumsum(累计求和)可以生成所需的 goal 数组:
In [314]: goal = goal.cumsum(); goal
Out[314]: 
array([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0,
       0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
       0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1,
       1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
       0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
       0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
       0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
       0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
       0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 0])

In [315]: np.flatnonzero(goal)
Out[315]: array([ 10,  11,  12,  13,  65,  66,  67,  68,  69, 200, 201, 202, 203])

这就是使用using_flatnonzero的主要思想。减去first只是为了节省一点内存。

3

前瞻性方法

我将逆向思考如何解决这个问题。

以问题中列出的示例为例,我们有:

array1 = np.array([10, 65, 200])
array2 = np.array([14, 70, 204])

现在,让我们看一下所需的结果 -
result: [10,11,12,13,65,66,67,68,69,200,201,202,203]

让我们计算分组长度,因为我们需要这些来解释下一步的解决方案。
In [58]: lens = array2 - array1

In [59]: lens
Out[59]: array([4, 5, 4])

这个想法是使用初始化为1的数组,当整个长度进行累加时,可以得到所需的结果。 这个累加求和将是我们解决方案的最后一步。 为什么要初始化为1?因为我们有一个数组,它以1的步长递增,除了在特定位置有移位对应新组的出现。

现在,由于cumsum是最后一步,因此它之前的步骤应该给我们类似于-

array([ 10,   1,   1,   1,  52,   1,   1,   1,   1, 131,   1,   1,   1])

如之前所讨论的,1 的特定位置填充了 [10,52,131]。第一个值 10 来自于 array1 的第一个元素,但其余的值呢?
第二个值 52 通过 65-13(根据 result)得出,其中的 13 来自于以 10 开始的一组,并且由于第一组的长度为 4 而被计算。因此,如果我们进行计算:65 - 10 - 4,就可以得到 51,然后再加上边界停止符 1,我们就得到了期望的偏移量 52。类似地,我们可以得到 131
因此,这些“偏移值”可以按如下方式计算 -
In [62]: np.diff(array1) - lens[:-1]+1
Out[62]: array([ 52, 131])

接下来,为了获得这些发生转换的shifting-places,我们可以对分组长度进行累加求和 -
In [65]: lens[:-1].cumsum()
Out[65]: array([4, 9])

为了完整起见,我们需要在“移位位置”的数组和“移位值”的array1[0]之前添加0
所以,我们准备以逐步的方式呈现我们的方法!

拼装碎片

1] 获取每个组的长度:

lens = array2 - array1

2] 获取位移发生的索引和要放入1的初始化数组中的值:

shift_idx = np.hstack((0,lens[:-1].cumsum()))
shift_vals = np.hstack((array1[0],np.diff(array1) - lens[:-1]+1))

3] 设置1的初始化ID数组,用于在前一步中列出的那些索引处插入这些值:

id_arr = np.ones(lens.sum(),dtype=array1.dtype)
id_arr[shift_idx] = shift_vals

4] 最后对ID数组进行累积求和:

output = id_arr.cumsum() 

以函数格式列出,我们将有 -
def using_ones_cumsum(array1, array2):
    lens = array2 - array1
    shift_idx = np.hstack((0,lens[:-1].cumsum()))
    shift_vals = np.hstack((array1[0],np.diff(array1) - lens[:-1]+1))
    id_arr = np.ones(lens.sum(),dtype=array1.dtype)
    id_arr[shift_idx] = shift_vals
    return id_arr.cumsum()
  

它还可以在重叠的范围上工作!

In [67]: array1 = np.array([10, 11, 200]) 
    ...: array2 = np.array([14, 18, 204])
    ...: 

In [68]: using_ones_cumsum(array1, array2)
Out[68]: 
array([ 10,  11,  12,  13,  11,  12,  13,  14,  15,  16,  17, 200, 201,
       202, 203])

运行时测试

我们来比较一下拟议方法和@unutbu的基于flatnonzero的矢量化方法,后者已经证明比循环方法要好得多 -

In [38]: array1, array2 = (np.random.choice(range(1, 11), size=10**4, replace=True)
    ...:                   .cumsum().reshape(2, -1, order='F'))

In [39]: %timeit using_flatnonzero(array1, array2)
1000 loops, best of 3: 889 µs per loop

In [40]: %timeit using_ones_cumsum(array1, array2)
1000 loops, best of 3: 235 µs per loop

改进!

现在,从代码层面来看,NumPy 不支持添加操作。因此,可以避免使用 np.hstack 函数,以获得稍微改进的版本,如下所示 -

def get_ranges_arr(starts,ends):
    counts = ends - starts
    counts_csum = counts.cumsum()
    id_arr = np.ones(counts_csum[-1],dtype=int)
    id_arr[0] = starts[0]
    id_arr[counts_csum[:-1]] = starts[1:] - ends[:-1] + 1
    return id_arr.cumsum()

让我们对比一下我们原来的方法 -

In [151]: array1,array2 = (np.random.choice(range(1, 11),size=10**4, replace=True)\
     ...:                                      .cumsum().reshape(2, -1, order='F'))

In [152]: %timeit using_ones_cumsum(array1, array2)
1000 loops, best of 3: 276 µs per loop

In [153]: %timeit get_ranges_arr(array1, array2)
10000 loops, best of 3: 193 µs per loop

因此,我们在那里获得了30%的性能提升!

@unutbu 谢谢!你的也很聪明! :) - Divakar

1
这是我将向量化连接结合的方法: 实现:
import numpy as np

array1, array2 = np.array([10, 65, 200]), np.array([14, 70, 204])

ranges = np.vectorize(lambda a, b: np.arange(a, b), otypes=[np.ndarray])
result = np.concatenate(ranges(array1, array2), axis=0)

print result
# [ 10  11  12  13  65  66  67  68  69 200 201 202 203]

Performance:

%timeit np.concatenate(ranges(array1, array2), axis=0)

100000次循环,3次中最佳结果:每次循环13.9微秒。

我预计vectorize相比于列表连接会有一个适度的速度提升,大约20%。但它仍然需要迭代。 - hpaulj

0
你是指这个吗?
In [440]: np.r_[10:14,65:70,200:204]
Out[440]: array([ 10,  11,  12,  13,  65,  66,  67,  68,  69, 200, 201, 202, 203])

或者概括一下:

In [454]: np.r_[tuple([slice(i,j) for i,j in zip(array1,array2)])]
Out[454]: array([ 10,  11,  12,  13,  65,  66,  67,  68,  69, 200, 201, 202, 203])

虽然这涉及到双重循环,一个用于生成切片,另一个在r_内部将切片转换为arange

    for k in range(len(key)):
        scalar = False
        if isinstance(key[k], slice):
            step = key[k].step
            start = key[k].start
                ...
                newobj = _nx.arange(start, stop, step)

我提到这个是因为它表明numpy的开发人员认为您的迭代方式是正常的。

我预计@unutbu的聪明,尽管有些晦涩(我还没有弄清楚它在做什么),但解决方案是您获得速度的最佳机会。cumsum是一个很好的工具,当您需要处理长度可能不同的范围时。它可能在处理许多小范围时获得最大收益。我认为它不能处理重叠的范围。

================

np.vectorize 使用 np.frompyfunc。因此,这个迭代也可以用以下方式表示:

In [467]: f=np.frompyfunc(lambda x,y: np.arange(x,y), 2,1)

In [468]: f(array1,array2)
Out[468]: 
array([array([10, 11, 12, 13]), array([65, 66, 67, 68, 69]),
       array([200, 201, 202, 203])], dtype=object)

In [469]: timeit np.concatenate(f(array1,array2))
100000 loops, best of 3: 17 µs per loop

In [470]: timeit np.r_[tuple([slice(i,j) for i,j in zip(array1,array2)])]
10000 loops, best of 3: 65.7 µs per loop

使用 @Darius 的 vectorize 解决方案:

In [474]: timeit result = np.concatenate(ranges(array1, array2), axis=0)
10000 loops, best of 3: 52 µs per loop

vectorize 需要做一些额外的工作来允许更强大的广播使用。如果 array1 很大,则相对速度可能会发生变化。

@unutbu 的解决方案在这个小的 array1 上不是特别特殊。

In [478]: timeit using_flatnonzero(array1,array2)
10000 loops, best of 3: 57.3 µs per loop

原帖的解决方案,迭代式的没有我的r_中间人是不错的。

In [483]: timeit array3 = np.concatenate([np.arange(array1[i], array2[i]) for i in np.arange(0,len(array1))])
10000 loops, best of 3: 24.8 µs per loop

通常情况下,使用少量循环时,列表推导比更高级的numpy操作更快。
对于@unutbu的较大测试用例,我的计时与他的一致-速度提升了17倍。

===================

对于小样本数组,@Divakar的解决方案速度较慢,但对于大型数组而言,比@unutbu的快3倍。因此它具有更高的设置成本,但缩放速度较慢。

我喜欢你的比较。 - Darius

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接