如何在NumPy向量中的每个元素之间应用linspace?

4

我有以下numpy数组:

a = np.array([1,4,2])

我希望通过将 a 数组中的每个元素平均分成 5 份来创建一个新的数组,得到如下结果:
b = [1., 1.75, 2.5, 3.25, 4., 3.5, 3., 2.5, 2.]

如何在Python中高效地完成这个任务?
4个回答

6
你正在寻找一个一维数组的 线性插值,可以使用 NumPy.interp 来完成。
s = 4       # number of intervals between two numbers
l = (a.size - 1) * s + 1          # total length after interpolation
np.interp(np.arange(l), np.arange(l, step=s), a)        # interpolate

# array([1.  , 1.75, 2.5 , 3.25, 4.  , 3.5 , 3.  , 2.5 , 2.  ])

1
这非常好!谢谢你。由于代码涉及在for循环中处理大量数据,我正在寻找更多答案。我正在为一些代码进行timeit。感谢解决方案。 - naseefo
1
我的猜测是,这将是最佳解决方案,因为它使用编译的插值函数,而我和其他答案使用Python循环。 - tch

1
另一个选项使用 arange
import numpy as np
a = np.array([1,4,2])

res = np.array([float(a[-1])])
  for x, y in zip(a, a[1:]):
  res = np.insert(res, -1, np.array(np.arange(x,y,(y-x)/4)))

print(res)
#=> [1.   1.75 2.5  3.25 4.   3.5  3.   2.5  2.  ]

1
我们可以使用矢量化的 linspace : create_ranges -
# https://dev59.com/s1kR5IYBdhLWcg3w7hbE#40624614/ @Divakar
def create_ranges(start, stop, N, endpoint=True):
    if endpoint==1:
        divisor = N-1
    else:
        divisor = N
    steps = (1.0/divisor) * (stop - start)
    return steps[:,None]*np.arange(N) + start[:,None]

def ranges_based(a,N):
    ranges2D = create_ranges(a[:-1],a[1:],N-1,endpoint=False)
    return np.concatenate((ranges2D.ravel(),[a[-1]]))

示例运行 -

In [151]: a
Out[151]: array([1, 4, 2])

In [152]: ranges_based(a,N=5)
Out[152]: array([1.  , 1.75, 2.5 , 3.25, 4.  , 3.5 , 3.  , 2.5 , 2.  ])

矢量化解决方案的基准测试

# @Psidom's soln
def interp_based(a,N=5):
    s = N-1
    l = (a.size - 1) * s + 1    # total length after interpolation
    return np.interp(np.arange(l), np.arange(l, step=s), a)   

使用5间隔的大数组计时 -

In [199]: np.random.seed(0)

In [200]: a = np.random.randint(0,10,(10000))

In [201]: %timeit interp_based(a,N=5)
     ...: %timeit ranges_based(a,N=5)
1000 loops, best of 3: 318 µs per loop
1000 loops, best of 3: 227 µs per loop

In [202]: np.random.seed(0)

In [203]: a = np.random.randint(0,10,(100000))

In [204]: %timeit interp_based(a,N=5)
     ...: %timeit ranges_based(a,N=5)
100 loops, best of 3: 3.39 ms per loop
100 loops, best of 3: 2.77 ms per loop

使用间隔大于 50 的大型数组的时间 -

In [205]: np.random.seed(0)

In [206]: a = np.random.randint(0,10,(10000))

In [207]: %timeit interp_based(a,N=50)
     ...: %timeit ranges_based(a,N=50)
100 loops, best of 3: 3.65 ms per loop
100 loops, best of 3: 2.14 ms per loop

In [208]: np.random.seed(0)

In [209]: a = np.random.randint(0,10,(100000))

In [210]: %timeit interp_based(a,N=50)
     ...: %timeit ranges_based(a,N=50)
10 loops, best of 3: 43.4 ms per loop
10 loops, best of 3: 31.1 ms per loop

随着区间长度的增加,使用create_ranges可以获得更大的性能提升。
进一步改进: 我们可以通过在开头进行连接,然后在结尾进行切片,从而避免在那里进行连接,如下所示-
def ranges_based_v2(a,N):
    start = a
    stop = np.concatenate((a[1:],[0]))
    return create_ranges(start, stop, N-1, endpoint=False).ravel()[:-N+2]

使用间隔长度为550的较大数组进行定时 -

In [243]: np.random.seed(0)

In [244]: a = np.random.randint(0,10,(100000))

In [245]: %timeit interp_based(a,N=5)
     ...: %timeit ranges_based(a,N=5)
     ...: %timeit ranges_based_v2(a,N=5)
100 loops, best of 3: 3.38 ms per loop
100 loops, best of 3: 2.71 ms per loop
100 loops, best of 3: 2.49 ms per loop

In [246]: %timeit interp_based(a,N=50)
     ...: %timeit ranges_based(a,N=50)
     ...: %timeit ranges_based_v2(a,N=50)
10 loops, best of 3: 42.8 ms per loop
10 loops, best of 3: 30.1 ms per loop
10 loops, best of 3: 22.2 ms per loop

使用 numexpr 进一步提升性能

我们可以利用 numexpr 来发挥多核优势 -

# https://dev59.com/s1kR5IYBdhLWcg3w7hbE#40624614/ @Divakar
import numexpr as ne
def create_ranges_numexpr(start, stop, N, endpoint=True):
    if endpoint==1:
        divisor = N-1
    else:
        divisor = N
    s0 = start[:,None]
    s1 = stop[:,None]
    r = np.arange(N)
    return ne.evaluate('((1.0/divisor) * (s1 - s0))*r + s0')

def ranges_based_v3(a,N):
    start = a
    stop = np.concatenate((a[1:],[0]))
    return create_ranges_numexpr(start, stop, N-1, endpoint=False).ravel()[:-N+2]

计时 -
In [276]: np.random.seed(0)

In [277]: a = np.random.randint(0,10,(100000))

In [278]: %timeit interp_based(a,N=5)
     ...: %timeit ranges_based(a,N=5)
     ...: %timeit ranges_based_v2(a,N=5)
     ...: %timeit ranges_based_v3(a,N=5)
100 loops, best of 3: 3.39 ms per loop
100 loops, best of 3: 2.75 ms per loop
100 loops, best of 3: 2.49 ms per loop
1000 loops, best of 3: 1.17 ms per loop

In [279]: %timeit interp_based(a,N=50)
     ...: %timeit ranges_based(a,N=50)
     ...: %timeit ranges_based_v2(a,N=50)
     ...: %timeit ranges_based_v3(a,N=50)
10 loops, best of 3: 43.1 ms per loop
10 loops, best of 3: 31.3 ms per loop
10 loops, best of 3: 22.3 ms per loop
100 loops, best of 3: 11.4 ms per loop

感谢您的解决方案。我看到您比Psidolm快了1.29倍的结果。我尝试了一个适用于我的实际情况的场景-7000个元素,120个分区重复50次。ranges_based解决方案花费了320毫秒(比interp_based快1.8倍),而interp_based花费了554毫秒。arange解决方案花费了356秒。我想知道是否可以通过使用Cython将其进一步降低到20-30毫秒? - naseefo
@NaseefUmmer 我对Cython并不是很了解,所以对此没有任何建议。另外,我刚刚添加了ranges_based_v2,它似乎更好。所以,也请你去看看它! - Divakar
@NaseefUmmer 还添加了基于多核的版本以进一步提高性能。 - Divakar
你说得对!在7000个元素和120个重复的情况下,Psidom花费了26ms;range_based_v1:15ms(1.7倍);range_based_v2:10ms(2.5倍)。我之前提到的时间需要除以20。而且之前只重复了20次。我认为你成功地帮我达到了我的目标 :-) 我希望我能给这个点赞十次! - naseefo
第三版通过Cython只需7毫秒的时间。使用Cython后仅有微小差异。我只是为它添加了类型。不管怎样,非常感谢你的时间和努力,Divakar。 - naseefo
另外一个观察结果是:对于较少的元素,Psidolm的解决方案会更快。虽然这与我的当前问题无关,但我发现这很有趣。 - naseefo

0
你可以先创建一个起始点和结束点的数组,然后对该数组进行线性插值映射。
v=np.vstack([a[:-1],a[1:]])
ls = np.apply_along_axis(lambda x: np.linspace(*x,5),1,v)

最后一列包含重复的端点(除了最后一行)。我们可以使用掩码获取“正确”的元素。
mask = np.ones((len(a)-1,5),dtype='bool')
mask[:-1,-1] = 0

output = ls[mask]

请注意,您还可以使用切片和重塑来选择行。
output = np.zeros(5*(len(a)-1)-1)
output[:-1] = np.reshape(ls[:,:-1],-1)
output[-1] = a[-1]

它不起作用了。它在每一行中堆叠了三个元素。它应该只有两个端点,对吧? - naseefo
抱歉,我还没有完成答案。 - tch
我在这个实现中遇到了一些错误。第一个错误是IndexError:布尔索引与沿维度0索引的数组不匹配;维度为2,但相应的布尔维度为4。而对于第二个错误,它是ValueError:无法将形状为(2)的输入数组广播到形状为(18)的数组。 - naseefo
我想已经解决了。我从我的电脑上复制粘贴时弄错了一些东西。 - tch

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接