我有以下numpy数组:
a = np.array([1,4,2])
我希望通过将
a
数组中的每个元素平均分成 5 份来创建一个新的数组,得到如下结果:b = [1., 1.75, 2.5, 3.25, 4., 3.5, 3., 2.5, 2.]
如何在Python中高效地完成这个任务?
NumPy.interp
来完成。s = 4 # number of intervals between two numbers
l = (a.size - 1) * s + 1 # total length after interpolation
np.interp(np.arange(l), np.arange(l, step=s), a) # interpolate
# array([1. , 1.75, 2.5 , 3.25, 4. , 3.5 , 3. , 2.5 , 2. ])
arange
:import numpy as np
a = np.array([1,4,2])
res = np.array([float(a[-1])])
for x, y in zip(a, a[1:]):
res = np.insert(res, -1, np.array(np.arange(x,y,(y-x)/4)))
print(res)
#=> [1. 1.75 2.5 3.25 4. 3.5 3. 2.5 2. ]
矢量化的 linspace : create_ranges
-# https://dev59.com/s1kR5IYBdhLWcg3w7hbE#40624614/ @Divakar
def create_ranges(start, stop, N, endpoint=True):
if endpoint==1:
divisor = N-1
else:
divisor = N
steps = (1.0/divisor) * (stop - start)
return steps[:,None]*np.arange(N) + start[:,None]
def ranges_based(a,N):
ranges2D = create_ranges(a[:-1],a[1:],N-1,endpoint=False)
return np.concatenate((ranges2D.ravel(),[a[-1]]))
示例运行 -
In [151]: a
Out[151]: array([1, 4, 2])
In [152]: ranges_based(a,N=5)
Out[152]: array([1. , 1.75, 2.5 , 3.25, 4. , 3.5 , 3. , 2.5 , 2. ])
矢量化解决方案的基准测试
# @Psidom's soln
def interp_based(a,N=5):
s = N-1
l = (a.size - 1) * s + 1 # total length after interpolation
return np.interp(np.arange(l), np.arange(l, step=s), a)
使用5
间隔的大数组计时 -
In [199]: np.random.seed(0)
In [200]: a = np.random.randint(0,10,(10000))
In [201]: %timeit interp_based(a,N=5)
...: %timeit ranges_based(a,N=5)
1000 loops, best of 3: 318 µs per loop
1000 loops, best of 3: 227 µs per loop
In [202]: np.random.seed(0)
In [203]: a = np.random.randint(0,10,(100000))
In [204]: %timeit interp_based(a,N=5)
...: %timeit ranges_based(a,N=5)
100 loops, best of 3: 3.39 ms per loop
100 loops, best of 3: 2.77 ms per loop
使用间隔大于 50
的大型数组的时间 -
In [205]: np.random.seed(0)
In [206]: a = np.random.randint(0,10,(10000))
In [207]: %timeit interp_based(a,N=50)
...: %timeit ranges_based(a,N=50)
100 loops, best of 3: 3.65 ms per loop
100 loops, best of 3: 2.14 ms per loop
In [208]: np.random.seed(0)
In [209]: a = np.random.randint(0,10,(100000))
In [210]: %timeit interp_based(a,N=50)
...: %timeit ranges_based(a,N=50)
10 loops, best of 3: 43.4 ms per loop
10 loops, best of 3: 31.1 ms per loop
create_ranges
可以获得更大的性能提升。def ranges_based_v2(a,N):
start = a
stop = np.concatenate((a[1:],[0]))
return create_ranges(start, stop, N-1, endpoint=False).ravel()[:-N+2]
使用间隔长度为5
和50
的较大数组进行定时 -
In [243]: np.random.seed(0)
In [244]: a = np.random.randint(0,10,(100000))
In [245]: %timeit interp_based(a,N=5)
...: %timeit ranges_based(a,N=5)
...: %timeit ranges_based_v2(a,N=5)
100 loops, best of 3: 3.38 ms per loop
100 loops, best of 3: 2.71 ms per loop
100 loops, best of 3: 2.49 ms per loop
In [246]: %timeit interp_based(a,N=50)
...: %timeit ranges_based(a,N=50)
...: %timeit ranges_based_v2(a,N=50)
10 loops, best of 3: 42.8 ms per loop
10 loops, best of 3: 30.1 ms per loop
10 loops, best of 3: 22.2 ms per loop
使用 numexpr
进一步提升性能
我们可以利用 numexpr
来发挥多核优势 -
# https://dev59.com/s1kR5IYBdhLWcg3w7hbE#40624614/ @Divakar
import numexpr as ne
def create_ranges_numexpr(start, stop, N, endpoint=True):
if endpoint==1:
divisor = N-1
else:
divisor = N
s0 = start[:,None]
s1 = stop[:,None]
r = np.arange(N)
return ne.evaluate('((1.0/divisor) * (s1 - s0))*r + s0')
def ranges_based_v3(a,N):
start = a
stop = np.concatenate((a[1:],[0]))
return create_ranges_numexpr(start, stop, N-1, endpoint=False).ravel()[:-N+2]
In [276]: np.random.seed(0)
In [277]: a = np.random.randint(0,10,(100000))
In [278]: %timeit interp_based(a,N=5)
...: %timeit ranges_based(a,N=5)
...: %timeit ranges_based_v2(a,N=5)
...: %timeit ranges_based_v3(a,N=5)
100 loops, best of 3: 3.39 ms per loop
100 loops, best of 3: 2.75 ms per loop
100 loops, best of 3: 2.49 ms per loop
1000 loops, best of 3: 1.17 ms per loop
In [279]: %timeit interp_based(a,N=50)
...: %timeit ranges_based(a,N=50)
...: %timeit ranges_based_v2(a,N=50)
...: %timeit ranges_based_v3(a,N=50)
10 loops, best of 3: 43.1 ms per loop
10 loops, best of 3: 31.3 ms per loop
10 loops, best of 3: 22.3 ms per loop
100 loops, best of 3: 11.4 ms per loop
ranges_based_v2
,它似乎更好。所以,也请你去看看它! - Divakar多核
的版本以进一步提高性能。 - Divakarv=np.vstack([a[:-1],a[1:]])
ls = np.apply_along_axis(lambda x: np.linspace(*x,5),1,v)
mask = np.ones((len(a)-1,5),dtype='bool')
mask[:-1,-1] = 0
output = ls[mask]
output = np.zeros(5*(len(a)-1)-1)
output[:-1] = np.reshape(ls[:,:-1],-1)
output[-1] = a[-1]