T(i) = Tm(i) + (T(i-1)-Tm(i))**(-tau(i))
Tm
和tau
是之前计算过的NumPy向量,且长度相同,目标是创建一个新的向量T
。其中的i
只是为了指示所需元素的索引。在这种情况下,需要使用for循环吗?
T(i) = Tm(i) + (T(i-1)-Tm(i))**(-tau(i))
Tm
和tau
是之前计算过的NumPy向量,且长度相同,目标是创建一个新的向量T
。其中的i
只是为了指示所需元素的索引。import numpy as np
n = len(Tm)
t = np.empty(n)
t[0] = 0 # or whatever the initial condition is
t[1:] = Tm[1:] + (t[0:n-1] - Tm[1:])**(-tau[1:])
tt = np.empty(n)
tt[0] = 0.
for i in range(1,n):
tt[i] = Tm[i] + (tt[i-1] - Tm[i])**(-tau[i])
2019更新。 Numba代码在新版本的numba中出现了故障。将dtype =“float32”
更改为dtype = np.float32
即可解决。
我进行了一些基准测试,发现在2019年使用Numba是人们加速Numpy递归函数的首选方法(Aronstef的调整建议)。Numba已经预安装在Anaconda包中,并且具有最快的时间(大约比任何Python快20倍)。在2019年,Python支持@numba注释而无需其他步骤(至少版本3.6、3.7和3.8)。以下是三个基准测试:分别于2019-12-05、2018-10-20和2016-05-18执行。
正如Jaffe所提到的,在2018年仍然不可能矢量化递归函数。我检查了Aronstef的矢量化,但它不起作用。
按执行时间排序的基准测试:
-------------------------------------------
|Variant |2019-12 |2018-10 |2016-05 |
-------------------------------------------
|Pure C | na | na | 2.75 ms|
|C extension | na | na | 6.22 ms|
|Cython float32 | 0.55 ms| 1.01 ms| na |
|Cython float64 | 0.54 ms| 1.05 ms| 6.26 ms|
|Fortran f2py | 4.65 ms| na | 6.78 ms|
|Numba float32 |73.0 ms| 2.81 ms| na |
|(Aronstef) | | | |
|Numba float32v2| 1.82 ms| 2.81 ms| na |
|Numba float64 |78.9 ms| 5.28 ms| na |
|Numba float64v2| 4.49 ms| 5.28 ms| na |
|Append to list |73.3 ms|48.2 ms|91.0 ms|
|Using a.item() |36.9 ms|58.3 ms|74.4 ms|
|np.fromiter() |60.8 ms|60.0 ms|78.1 ms|
|Loop over Numpy|71.3 ms|71.9 ms|87.9 ms|
|(Jaffe) | | | |
|Loop over Numpy|74.6 ms|74.4 ms| na |
|(Aronstef) | | | |
-------------------------------------------
回答末尾提供了相应的代码。
看起来随着时间的推移,Numba和Cython的时间变得更好了。现在它们两个都比Fortran f2py更快。 Cython现在快了8.6倍,而Numba 32位则快了2.5倍。2016年,Fortran非常难以调试和编译,所以现在没有理由再使用Fortran了。
我在2019年和2018年没有检查过Pure C和C扩展,因为在Jupyter笔记本中编译它们不容易。
我在2019年的设置如下:
Processor: Intel i5-9600K 3.70GHz
Versions:
Python: 3.8.0
Numba: 0.46.0
Cython: 0.29.14
Numpy: 1.17.4
我在2018年有以下的设置:
Processor: Intel i7-7500U 2.7GHz
Versions:
Python: 3.7.0
Numba: 0.39.0
Cython: 0.28.5
Numpy: 1.15.1
@numba.jit("float32[:](float32[:], float32[:])", nopython=True, nogil=True)
def calc_py_jit32v2(Tm_, tau_):
tt = np.empty(len(Tm_),dtype=np.float32)
tt[0] = Tm_[0]
for i in range(1, len(Tm_)):
tt[i] = Tm_[i] - (tt[i-1] + Tm_[i])**(-tau_[i])
return tt[1:]
所有其他代码:
数据创建(例如Aronstef + Mike T的评论):
np.random.seed(0)
n = 100000
Tm = np.cumsum(np.random.uniform(0.1, 1, size=n).astype('float64'))
tau = np.random.uniform(-1, 0, size=n).astype('float64')
ar = np.column_stack([Tm,tau])
Tm32 = Tm.astype('float32')
tau32 = tau.astype('float32')
Tm_l = list(Tm)
tau_l = list(tau)
2016年的代码略有不同,我使用了abs()函数来防止nans,而不是Mike T的变量。在2018年,该函数与OP(原始帖子)编写的完全相同。
Cython float32使用Jupyter %%魔法。该函数可以直接在Python
中使用。Cython需要一个C++编译器,Python就是用这个编译器编译的。安装正确版本的Visual C++编译器(对于Windows)可能会有问题:
%%cython
import cython
import numpy as np
cimport numpy as np
from numpy cimport ndarray
cdef extern from "math.h":
np.float32_t exp(np.float32_t m)
@cython.boundscheck(False)
@cython.wraparound(False)
@cython.infer_types(True)
@cython.initializedcheck(False)
def cy_loop32(np.float32_t[:] Tm,np.float32_t[:] tau,int alen):
cdef np.float32_t[:] T=np.empty(alen, dtype=np.float32)
cdef int i
T[0]=0.0
for i in range(1,alen):
T[i] = Tm[i] + (T[i-1] - Tm[i])**(-tau[i])
return T
%%cython
cdef extern from "math.h":
double exp(double m)
import cython
import numpy as np
cimport numpy as np
from numpy cimport ndarray
@cython.boundscheck(False)
@cython.wraparound(False)
@cython.infer_types(True)
@cython.initializedcheck(False)
def cy_loop(double[:] Tm,double[:] tau,int alen):
cdef double[:] T=np.empty(alen)
cdef int i
T[0]=0.0
for i in range(1,alen):
T[i] = Tm[i] + (T[i-1] - Tm[i])**(-tau[i])
return T
Numba float64:
@numba.jit("float64[:](float64[:], float64[:])", nopython=False, nogil=True)
def calc_py_jitv2(Tm_, tau_):
tt = np.empty(len(Tm_),dtype=np.float64)
tt[0] = Tm_[0]
for i in range(1, len(Tm_)):
tt[i] = Tm_[i] - (tt[i-1] + Tm_[i])**(-tau_[i])
return tt[1:]
追加到列表。最快的非编译解决方案:
def rec_py_loop(Tm,tau,alen):
T = [Tm[0]]
for i in range(1,alen):
T.append(Tm[i] - (T[i-1] + Tm[i])**(-tau[i]))
return np.array(T)
使用a.item()方法:
def rec_numpy_loop_item(Tm_,tau_):
n_ = len(Tm_)
tt=np.empty(n_)
Ti=tt.item
Tis=tt.itemset
Tmi=Tm_.item
taui=tau_.item
Tis(0,Tm_[0])
for i in range(1,n_):
Tis(i,Tmi(i) - (Ti(i-1) + Tmi(i))**(-taui(i)))
return tt[1:]
np.fromiter():
def it(Tm,tau):
T=Tm[0]
i=0
while True:
yield T
i+=1
T=Tm[i] - (T + Tm[i])**(-tau[i])
def rec_numpy_iter(Tm,tau,alen):
return np.fromiter(it(Tm,tau), np.float64, alen)[1:]
Numpy中的循环(基于Jaffe的想法):
def rec_numpy_loop(Tm,tau,alen):
tt=np.empty(alen)
tt[0]=Tm[0]
for i in range(1,alen):
tt[i] = Tm[i] - (tt[i-1] + Tm[i])**(-tau[i])
return tt[1:]
循环遍历Numpy(Aronstef的代码)。 在我的电脑上,np.empty
的默认类型是float64
。
def calc_py(Tm_, tau_):
tt = np.empty(len(Tm_),dtype="float64")
tt[0] = Tm_[0]
for i in range(1, len(Tm_)):
tt[i] = (Tm_[i] - (tt[i-1] + Tm_[i])**(-tau_[i]))
return tt[1:]
纯C代码,完全不使用Python
。这是2016年的版本(包含fabs()函数):
#include <stdio.h>
#include <math.h>
#include <stdlib.h>
#include <windows.h>
#include <sys\timeb.h>
double randn() {
double u = rand();
if (u > 0.5) {
return sqrt(-1.57079632679*log(1.0 - pow(2.0 * u - 1, 2)));
}
else {
return -sqrt(-1.57079632679*log(1.0 - pow(1 - 2.0 * u,2)));
}
}
void rec_pure_c(double *Tm, double *tau, int alen, double *T)
{
for (int i = 1; i < alen; i++)
{
T[i] = Tm[i] + pow(fabs(T[i - 1] - Tm[i]), (-tau[i]));
}
}
int main() {
int N = 100000;
double *Tm= calloc(N, sizeof *Tm);
double *tau = calloc(N, sizeof *tau);
double *T = calloc(N, sizeof *T);
double time = 0;
double sumtime = 0;
for (int i = 0; i < N; i++)
{
Tm[i] = randn();
tau[i] = randn();
}
LARGE_INTEGER StartingTime, EndingTime, ElapsedMicroseconds;
LARGE_INTEGER Frequency;
for (int j = 0; j < 1000; j++)
{
for (int i = 0; i < 3; i++)
{
QueryPerformanceFrequency(&Frequency);
QueryPerformanceCounter(&StartingTime);
rec_pure_c(Tm, tau, N, T);
QueryPerformanceCounter(&EndingTime);
ElapsedMicroseconds.QuadPart = EndingTime.QuadPart - StartingTime.QuadPart;
ElapsedMicroseconds.QuadPart *= 1000000;
ElapsedMicroseconds.QuadPart /= Frequency.QuadPart;
if (i == 0)
time = (double)ElapsedMicroseconds.QuadPart / 1000;
else {
if (time > (double)ElapsedMicroseconds.QuadPart / 1000)
time = (double)ElapsedMicroseconds.QuadPart / 1000;
}
}
sumtime += time;
}
printf("1000 loops,best of 3: %.3f ms per loop\n",sumtime/1000);
free(Tm);
free(tau);
free(T);
}
Fortran f2py. 这个函数可以从 Python
中使用。它是2016年版本的(带有 abs() 函数):
subroutine rec_fortran(tm,tau,alen,result)
integer*8, intent(in) :: alen
real*8, dimension(alen), intent(in) :: tm
real*8, dimension(alen), intent(in) :: tau
real*8, dimension(alen) :: res
real*8, dimension(alen), intent(out) :: result
res(1)=0
do i=2,alen
res(i) = tm(i) + (abs(res(i-1) - tm(i)))**(-tau(i))
end do
result=res
end subroutine rec_fortran
更新:2018年10月21日
根据评论,我已经更正了我的回答。
只要计算不是递归的,就可以将对向量的操作向量化。因为递归操作取决于先前计算的值,所以无法并行处理该操作。 因此,这种方法不起作用:
def calc_vect(Tm_, tau_):
return Tm_[1:] - (Tm_[:-1] + Tm_[1:]) ** (-tau_[1:])
由于(串行处理/循环)是必要的,因此最佳性能是通过尽可能接近优化的机器代码来实现的,因此Numba和Cython是最佳选择。
Numba方法可以按以下方式实现:
init_string = """
from math import pow
import numpy as np
from numba import jit, float32
np.random.seed(0)
n = 100000
Tm = np.cumsum(np.random.uniform(0.1, 1, size=n).astype('float32'))
tau = np.random.uniform(-1, 0, size=n).astype('float32')
def calc_python(Tm_, tau_):
tt = np.empty(len(Tm_))
tt[0] = Tm_[0]
for i in range(1, len(Tm_)):
tt[i] = Tm_[i] - pow(tt[i-1] + Tm_[i], -tau_[i])
return tt
@jit(float32[:](float32[:], float32[:]), nopython=False, nogil=True)
def calc_numba(Tm_, tau_):
tt = np.empty(len(Tm_))
tt[0] = Tm_[0]
for i in range(1, len(Tm_)):
tt[i] = Tm_[i] - pow(tt[i-1] + Tm_[i], -tau_[i])
return tt
"""
import timeit
py_time = timeit.timeit('calc_python(Tm, tau)', init_string, number=100)
numba_time = timeit.timeit('calc_numba(Tm, tau)', init_string, number=100)
print("Python Solution: {}".format(py_time))
print("Numba Soltution: {}".format(numba_time))
Python和Numba函数的Timeit比较:
Python Solution: 54.58057559299999
Numba Soltution: 1.1389029540000024
nan
值,因为它使用负底数的实指数。可以使用Tm = np.cumsum(np.random.uniform(0, 1, size=n).astype('f'))
作为解决方法。 - Mike Tnumpy.ufunc.accumulate
正如@Karl Knechtel所提到的那样,这似乎是一个有前途的选择。你需要先创建一个ufunc
。这个网页解释了如何创建。import numpy as np
def test_add(x, data):
return x + data
assert test_add(1, 2) == 3
assert test_add(2, 3) == 5
# Make a Numpy ufunc from my test_add function
test_add_ufunc = np.frompyfunc(test_add, 2, 1)
assert test_add_ufunc(1, 2) == 3
assert test_add_ufunc(2, 3) == 5
assert np.all(test_add_ufunc([1, 2], [2, 3]) == [3, 5])
data_sequence = np.array([1, 2, 3, 4])
f_out = test_add_ufunc.accumulate(data_sequence, dtype=object)
assert np.array_equal(f_out, [1, 3, 6, 10])
[请注意,需要使用dtype=object
参数,如上面链接的网页所解释的那样]
但在您的情况下(以及我的情况下),我们希望计算具有多个数据输入(可能还有多个状态变量)的递归方程。
当我尝试使用以上ufunc.accumulate
方法时,我得到了ValueError: accumulate only supported for binary functions
的错误。
如果有人知道绕过这个限制的方法,我会非常感兴趣。
选项2。Python内置的accumulate函数
与此同时,这个解决方案虽然没有实现您想要的numpy向量化计算,但至少避免了for循环。
from itertools import accumulate, chain
def t_next(t, data):
Tm, tau = data # Unpack more than one data input
return Tm + (t - Tm)**tau
assert t_next(2, (0.38, 0)) == 1.38
t0 = 2 # Initial t
Tm_values = np.array([0.38, 0.88, 0.56, 0.67, 0.45, 0.98, 0.58, 0.72, 0.92, 0.82])
tau_values = np.linspace(0, 0.9, 10)
# Combine the input data into a 2D array
data_sequence = np.vstack([Tm_values, tau_values]).T
t_out = np.fromiter(accumulate(chain([t0], data_sequence), t_next), dtype=float)
print(t_out)
# [2. 1.38 1.81303299 1.60614649 1.65039964 1.52579703
# 1.71878078 1.66109554 1.67839293 1.72152195 1.73091672]
# Slightly more readable version possible in Python 3.8+
t_out = np.fromiter(accumulate(data_sequence, t_next, initial=t0), dtype=float)
print(t_out)
# [2. 1.38 1.81303299 1.60614649 1.65039964 1.52579703
# 1.71878078 1.66109554 1.67839293 1.72152195 1.73091672]
继续NPE的回答,我同意必须要有循环。也许你的目标是避免使用Python for循环的开销?在这种情况下,numpy.fromiter可以击败for循环,但只能稍微快一点:
使用非常简单的递归关系,
x[i+1] = x[i] + 0.1
我理解
#FOR LOOP
def loopit(n):
x = [0.0]
for i in range(n-1): x.append(x[-1] + 0.1)
return np.array(x)
#FROMITER
#define an iterator (a better way probably exists -- I'm a novice)
def it():
x = 0.0
while True:
yield x
x += 0.1
#use the iterator with np.fromiter
def fi_it(n):
return np.fromiter(it(), np.float, n)
%timeit -n 100 loopit(100000)
#100 loops, best of 3: 31.7 ms per loop
%timeit -n 100 fi_it(100000)
#100 loops, best of 3: 18.6 ms per loop
有趣的是,预先分配一个numpy数组会导致性能大幅下降。这对我来说是个谜,但我猜测访问数组元素可能会有更多的开销,而不是添加到列表中。
def loopit(n):
x = np.zeros(n)
for i in range(n-1): x[i+1] = x[i] + 0.1
return x
%timeit -n 100 loopit(100000)
#100 loops, best of 3: 50.1 ms per loop
tau
是一个向量,它也应该由i
进行索引吗? - mtrwi=0
时会发生什么? - Andrew Jaffenumpy
内部发生,而不是在Python中。如果这是真正的问题,那么可以尝试一些有创意的使用numpy.fromiter
的方法。 - NPEnumpy.<ufunc>.accumulate
来完成一些操作? - Karl Knechtel