为了背景,我正在尝试加速本文中光线追踪方法的实现https://iopscience.iop.org/article/10.1088/1361-6560/ac1f38/pdf。
我有一个函数,它接受传播前的强度和传播结果导致的位移图。然后,结果强度是原始强度逐像素通过位移映射进行位移,其中子像素位移在各自相邻像素之间按比例共享。顺便说一句,这是否可以直接在numpy或其他库中实现,因为我注意到它类似于opencv的remap函数。
import numpy as np
from numba import njit
@njit
def raytrace_range(intensity_0, d_y, d_x):
"""
Args:
intensity_0 (2d numpy array): intensity before propagation
d_y (2d numpy array): Displacement along y in pixels
d_x (2d numpy array): Displacement along x in pixels
Returns:
intensity_z (2d numpy array): intensity after propagation
"""
n_y, n_x = intensity_0.shape
intensity_z = np.zeros((n_y, n_x), dtype=np.float64)
for i in range(n_x):
for j in range(n_y):
i_ij = intensity_0[i, j]
dx_ij=d_x[i,j]
dy_ij=d_y[i,j]
# Always the same from here down
if not dx_ij and not dy_ij:
intensity_z[i,j]+=i_ij
continue
i_new=i
j_new=j
#Calculating displacement bigger than a pixel
if np.abs(dx_ij)>1:
x = np.floor(dx_ij)
i_new=int(i+x)
dx_ij=dx_ij-x
if np.abs(dy_ij)>1:
y = np.floor(dy_ij)
j_new=int(j+y)
dy_ij=dy_ij-y
# Calculating sub-pixel displacement
if 0<=i_new and i_new<n_y and 0<=j_new and j_new<n_x:
intensity_z[i_new,j_new]+=i_ij*(1-np.abs(dx_ij))*(1-np.abs(dy_ij))
if i_new<n_y-1 and dx_ij>=0:
if j_new<n_y-1 and dy_ij>=0:
intensity_z[i_new+1, j_new]+=i_ij*dx_ij*(1-dy_ij)
intensity_z[i_new+1, j_new+1]+=i_ij*dx_ij*dy_ij
intensity_z[i_new, j_new+1]+=i_ij*(1-dx_ij)*dy_ij
if j_new and dy_ij<0:
intensity_z[i_new+1,j_new]+=i_ij*dx_ij*(1-np.abs(dy_ij))
intensity_z[i_new+1,j_new-1]+=i_ij*dx_ij*np.abs(dy_ij)
intensity_z[i_new,j_new-1]+=i_ij*(1-dx_ij)*np.abs(dy_ij)
if i_new and dx_ij<0:
if j_new<n_x-1 and dy_ij>=0:
intensity_z[i_new-1,j_new]+=i_ij*np.abs(dx_ij)*(1-dy_ij)
intensity_z[i_new-1,j_new+1]+=i_ij*np.abs(dx_ij)*dy_ij
intensity_z[i_new,j_new+1]+=i_ij*(1-np.abs(dx_ij))*dy_ij
if j_new and dy_ij<0:
intensity_z[i_new-1,j_new]+=i_ij*np.abs(dx_ij)*(1-np.abs(dy_ij))
intensity_z[i_new-1,j_new-1]+=i_ij*dx_ij*dy_ij
intensity_z[i_new,j_new-1]+=i_ij*(1-np.abs(dx_ij))*np.abs(dy_ij)
return intensity_z
我尝试了几种其他方法,其中这是最快的(在上面的注释# Always the same from here down
之后包括上述代码,为了让问题相对较短,我已省略):
@njit
def raytrace_enumerate(intensity_0, d_y, d_x):
n_y, n_x = intensity_0.shape
intensity_z = np.zeros((n_y, n_x), dtype=np.float64)
for i, i_i in enumerate(intensity_0):
for j, i_ij in enumerate(i_i):
dx_ij=d_x[i,j]
dy_ij=d_y[i,j]
@njit
def raytrace_npndenumerate(intensity_0, d_y, d_x):
n_y, n_x = intensity_0.shape
intensity_z = np.zeros((n_y, n_x), dtype=np.float64)
for (i, j), i_ij in np.ndenumerate(intensity_0):
dx_ij=d_x[i,j]
dy_ij=d_y[i,j]
@njit
def raytrace_zip(intensity_0, d_y, d_x):
n_y, n_x = intensity_0.shape
intensity_z = np.zeros((n_y, n_x), dtype=np.float64)
for i, (i_i, dy_i, dx_i) in enumerate(zip(intensity_0, d_y, d_x)):
for j, (i_ij, dy_ij, dx_ij) in enumerate(zip(i_i, dy_i, dx_i)):
@njit
def raytrace_stack1(idydx):
n_y, _, n_x = idydx.shape
intensity_z = np.zeros((n_y, n_x), dtype=np.float64)
for i, (i_i, dy_i, dx_i) in enumerate(idydx):
for j, (i_ij, dy_ij, dx_ij) in enumerate(zip(i_i, dy_i, dx_i)):
@njit
def raytrace_stack2(idydx):
n_y, n_x, _ = idydx.shape
intensity_z = np.zeros((n_y, n_x), dtype=np.float64)
for i, k in enumerate(idydx):
for j, (i_ij, dy_ij, dx_ij) in enumerate(k):
制造一些测试数据和时间:
import timeit
rng = np.random.default_rng()
size = (2010, 2000)
margin = 10
test_data = np.pad(10000*rng.random(size=size), margin)
dx = np.pad(10*(rng.random(size=size)-0.5), margin)
dy = np.pad(10*(rng.random(size=size)-0.5), margin)
# Check results are the same
L = [raytrace_range(test_data, dy, dx), raytrace_enumerate(test_data, dy, dx), raytrace_npndenumerate(test_data, dy, dx), raytrace_zip(test_data, dy, dx), raytrace_stack1(np.stack([test_data, dy, dx], axis=1)), raytrace_stack2(np.stack([test_data, dy, dx], axis=2))]
print((np.diff(np.vstack(L).reshape(len(L),-1),axis=0)==0).all())
%timeit raytrace_range(test_data, dy, dx)
%timeit raytrace_enumerate(test_data, dy, dx)
%timeit raytrace_npndenumerate(test_data, dy, dx)
%timeit raytrace_zip(test_data, dy, dx)
%timeit raytrace_stack1(np.stack([test_data, dy, dx], axis=1)) #Note this would be the fastest if the arrays were pre-stacked
%timeit raytrace_stack2(np.stack([test_data, dy, dx], axis=2))
输出:
True
40.4 ms ± 233 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
37.5 ms ± 117 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
46.8 ms ± 112 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
38.6 ms ± 243 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
42 ms ± 234 µs per loop (mean ± std. dev. of 7 runs, 10 loops each) #Note this would be the fastest if the arrays were pre-stacked
47.4 ms ± 203 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
np.empty
或np.empty_like
,它比np.zeros
更快。 - Zaero Dividenp.empty
和np.zeros
来说,实际上有点复杂,尽管np.empty
应该永远不会比np.zeros
慢。请参阅此最近的回答以了解详细解释。 - Jérôme Richardempty
始终比zeros
稍快。我的数据类型混淆可能是编译和我将其与Cython混合使用的原因...这是我的错。当预编译代码并将其保留为模块时,它可以正常工作。 - Zaero Divide