使用numba迭代多个2d numpy数组的最快方法

7
当使用numba并访问多个2d numpy数组中的元素时,是使用索引还是直接迭代数组更好,因为我发现两者的结合是最快的,这似乎与直觉相反?或者有其他更好的方法吗?
为了背景,我正在尝试加速本文中光线追踪方法的实现https://iopscience.iop.org/article/10.1088/1361-6560/ac1f38/pdf
我有一个函数,它接受传播前的强度和传播结果导致的位移图。然后,结果强度是原始强度逐像素通过位移映射进行位移,其中子像素位移在各自相邻像素之间按比例共享。顺便说一句,这是否可以直接在numpy或其他库中实现,因为我注意到它类似于opencv的remap函数。
import numpy as np
from numba import njit

@njit
def raytrace_range(intensity_0, d_y, d_x):
    """

    Args:

        intensity_0 (2d numpy array): intensity before propagation
        d_y (2d numpy array): Displacement along y in pixels
        d_x (2d numpy array): Displacement along x in pixels

    Returns:
        intensity_z (2d numpy array): intensity after propagation 

    """
    n_y, n_x = intensity_0.shape
    intensity_z = np.zeros((n_y, n_x), dtype=np.float64)
    for i in range(n_x):
        for j in range(n_y):
            i_ij = intensity_0[i, j]
            dx_ij=d_x[i,j]
            dy_ij=d_y[i,j]


            # Always the same from here down
            if not dx_ij and not dy_ij:
                intensity_z[i,j]+=i_ij
                continue
            i_new=i
            j_new=j
            #Calculating displacement bigger than a pixel
            if np.abs(dx_ij)>1:
                x = np.floor(dx_ij)
                i_new=int(i+x)
                dx_ij=dx_ij-x
            if np.abs(dy_ij)>1:
                y = np.floor(dy_ij)
                j_new=int(j+y)
                dy_ij=dy_ij-y
            # Calculating sub-pixel displacement
            if 0<=i_new and i_new<n_y and 0<=j_new and j_new<n_x:
                intensity_z[i_new,j_new]+=i_ij*(1-np.abs(dx_ij))*(1-np.abs(dy_ij))
                if i_new<n_y-1 and dx_ij>=0:
                    if j_new<n_y-1 and dy_ij>=0:
                        intensity_z[i_new+1, j_new]+=i_ij*dx_ij*(1-dy_ij)
                        intensity_z[i_new+1, j_new+1]+=i_ij*dx_ij*dy_ij
                        intensity_z[i_new, j_new+1]+=i_ij*(1-dx_ij)*dy_ij
                    if j_new and dy_ij<0:
                        intensity_z[i_new+1,j_new]+=i_ij*dx_ij*(1-np.abs(dy_ij))
                        intensity_z[i_new+1,j_new-1]+=i_ij*dx_ij*np.abs(dy_ij)
                        intensity_z[i_new,j_new-1]+=i_ij*(1-dx_ij)*np.abs(dy_ij)
                if i_new and dx_ij<0:
                    if j_new<n_x-1 and dy_ij>=0:
                        intensity_z[i_new-1,j_new]+=i_ij*np.abs(dx_ij)*(1-dy_ij)
                        intensity_z[i_new-1,j_new+1]+=i_ij*np.abs(dx_ij)*dy_ij
                        intensity_z[i_new,j_new+1]+=i_ij*(1-np.abs(dx_ij))*dy_ij
                    if j_new and dy_ij<0:
                        intensity_z[i_new-1,j_new]+=i_ij*np.abs(dx_ij)*(1-np.abs(dy_ij))
                        intensity_z[i_new-1,j_new-1]+=i_ij*dx_ij*dy_ij
                        intensity_z[i_new,j_new-1]+=i_ij*(1-np.abs(dx_ij))*np.abs(dy_ij)
    return intensity_z

我尝试了几种其他方法,其中这是最快的(在上面的注释# Always the same from here down之后包括上述代码,为了让问题相对较短,我已省略):

@njit
def raytrace_enumerate(intensity_0, d_y, d_x):
    n_y, n_x = intensity_0.shape
    intensity_z = np.zeros((n_y, n_x), dtype=np.float64)
    for i, i_i in enumerate(intensity_0):
        for j, i_ij in enumerate(i_i):
            dx_ij=d_x[i,j]
            dy_ij=d_y[i,j]

@njit
def raytrace_npndenumerate(intensity_0, d_y, d_x):
    n_y, n_x = intensity_0.shape
    intensity_z = np.zeros((n_y, n_x), dtype=np.float64)
    for (i, j), i_ij  in np.ndenumerate(intensity_0):
            dx_ij=d_x[i,j]
            dy_ij=d_y[i,j]

@njit
def raytrace_zip(intensity_0, d_y, d_x):
    n_y, n_x = intensity_0.shape
    intensity_z = np.zeros((n_y, n_x), dtype=np.float64)
    for i, (i_i, dy_i, dx_i) in enumerate(zip(intensity_0, d_y, d_x)):
        for j, (i_ij, dy_ij, dx_ij) in enumerate(zip(i_i, dy_i, dx_i)):

@njit
def raytrace_stack1(idydx):
    n_y, _, n_x = idydx.shape
    intensity_z = np.zeros((n_y, n_x), dtype=np.float64)
    for i, (i_i, dy_i, dx_i) in enumerate(idydx):
        for j, (i_ij, dy_ij, dx_ij) in enumerate(zip(i_i, dy_i, dx_i)):

@njit
def raytrace_stack2(idydx):
    n_y, n_x, _ = idydx.shape
    intensity_z = np.zeros((n_y, n_x), dtype=np.float64)
    for i, k in enumerate(idydx):
        for j, (i_ij, dy_ij, dx_ij) in enumerate(k):

制造一些测试数据和时间:

import timeit
rng = np.random.default_rng()
size = (2010, 2000)
margin = 10
test_data = np.pad(10000*rng.random(size=size), margin)
dx = np.pad(10*(rng.random(size=size)-0.5), margin)
dy = np.pad(10*(rng.random(size=size)-0.5), margin)

# Check results are the same
L = [raytrace_range(test_data, dy, dx), raytrace_enumerate(test_data, dy, dx), raytrace_npndenumerate(test_data, dy, dx), raytrace_zip(test_data, dy, dx), raytrace_stack1(np.stack([test_data, dy, dx], axis=1)), raytrace_stack2(np.stack([test_data, dy, dx], axis=2))]
print((np.diff(np.vstack(L).reshape(len(L),-1),axis=0)==0).all())

%timeit raytrace_range(test_data, dy, dx)
%timeit raytrace_enumerate(test_data, dy, dx)
%timeit raytrace_npndenumerate(test_data, dy, dx)
%timeit raytrace_zip(test_data, dy, dx)
%timeit raytrace_stack1(np.stack([test_data, dy, dx], axis=1)) #Note this would be the fastest if the arrays were pre-stacked
%timeit raytrace_stack2(np.stack([test_data, dy, dx], axis=2))

输出:

True
40.4 ms ± 233 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
37.5 ms ± 117 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
46.8 ms ± 112 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
38.6 ms ± 243 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
42 ms ± 234 µs per loop (mean ± std. dev. of 7 runs, 10 loops each) #Note this would be the fastest if the arrays were pre-stacked
47.4 ms ± 203 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)

1
点赞这个问题。很少见到精心阐述的问题。一段时间以前,numba可以从声明变量类型中受益,我不确定现在是否仍然如此。 - Zaero Divide
顺便提一下,看起来你正在用零填充数组,但稍后会被覆盖。如果是这种情况,请使用 np.emptynp.empty_like,它比 np.zeros 更快。 - Zaero Divide
@ZaeroDivide 对于数据类型来说,为Numba提供签名是很有意义的,这样它就可以急切地编译函数而不是懒惰地编译。一旦函数被编译,这就没有任何影响了。由于类型推断,Numba JIT (LLVM-Lite) 不需要更多信息。 - Jérôme Richard
@ZaeroDivide 对于np.emptynp.zeros来说,实际上有点复杂,尽管np.empty应该永远不会比np.zeros慢。请参阅此最近的回答以了解详细解释。 - Jérôme Richard
感谢@Nin17和@Jérome-richard。实际上,在我的测试中,empty始终比zeros稍快。我的数据类型混淆可能是编译和我将其与Cython混合使用的原因...这是我的错。当预编译代码并将其保留为模块时,它可以正常工作。 - Zaero Divide
显示剩余7条评论
2个回答

2

编辑3:事实证明,去掉if语句后,rangeenumerate更快。请参见下面的第2次编辑。

有趣的是,在我的机器上,stack1stack2选项的时间变得糟糕了,而且enumerate似乎最快。也许多亏了enumerate,numba理解它是一个循环变量吧?

In [1]: %timeit raytrace_range(test_data, dy, dx)
   ...: %timeit raytrace_enumerate(test_data, dy, dx)
   ...: %timeit raytrace_npndenumerate(test_data, dy, dx)
   ...: %timeit raytrace_zip(test_data, dy, dx)
   ...: %timeit raytrace_stack1(np.stack([test_data, dy, dx], axis=1)) #Note this would be the fastest if the arrays we
   ...: re pre-stacked
   ...: %timeit raytrace_stack2(np.stack([test_data, dy, dx], axis=2))
61 ms ± 785 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
53.9 ms ± 998 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
69.9 ms ± 471 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
57.5 ms ± 1.07 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
109 ms ± 478 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
146 ms ± 1.67 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)

编辑:使用fastmath=True并没有显著提高速度,仅仅只有约3毫秒。

编辑2:虽然这与OP的问题无关,但在尝试过函数后,发现删除“多余”的(*)条件语句可以显著提高速度。在我的机器上大约提升了20%。结果表明,即使没有它们(至少提供的测试返回True),该实现也可以正常工作:

(*) 这些操作似乎仍然有效,因为它们被较低级别的操作“捕获”。至少,提供的测试向量没有报告任何问题。

#! Using this it is faster:
# Always the same from here down
# if dx_ij==0 and dy_ij==0:
#     intensity_z[i,j]+=i_ij
#     continue
#Calculating displacement bigger than a pixel
x = np.floor(dx_ij)
i_new=int(i+x)
dx_ij=dx_ij-x
y = np.floor(dy_ij)
j_new=int(j+y)
dy_ij=dy_ij-y
# Calculating sub-pixel displacement


In [2]: %timeit raytrace_range(test_data, dy, dx)
   ...: %timeit raytrace_range2(test_data, dy, dx)
   ...: %timeit raytrace_enumerate(test_data, dy, dx)
64.8 ms ± 1.37 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
52.9 ms ± 1.34 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
56.1 ms ± 1.85 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)

我同意,测试np.abs(dx_ij)>1np.abs(dy_ij)>1的if语句是多余的 - 我已经从原始函数中删除了一些这样的if语句,但错过了那些,但是在实际数据中,dx_ij==0 and dy_ij==0是相当常见的情况,所以保留它可能会更快。 - Nin17
我建议尝试一下,有时候检查和分支需要的时钟周期比执行操作还要多。撇开管道、缓存和其他超出我的知识范围的东西。 - Zaero Divide

2

通常情况下,迭代数组的最快方式是使用基本的低级整数迭代器。这种模式在Numba中引起的转换次数最少,因此编译器应该能够优化代码。类似于 zipenumerate 的函数往往会增加额外的开销,因为存在无法完美优化的间接代码转换。

以下是一个基本示例:

import numba as nb

@nb.njit('(int_[::1],)')
def test(arr):
    s1 = s2 = 0
    for i in range(arr.shape[0]):
        s1 += i
        s2 += arr[i]
    return (s1, s2)

arr = np.arange(200_000)
test(arr)

然而,当您同时读取/写入多个数组时(这是您的情况),情况就更加复杂了。实际上,Numpy数组可以使用负索引进行索引,因此Numba需要每次执行边界检查。与实际访问相比,这种检查是昂贵的,甚至可能破坏其他优化(例如向量化)。
因此,Numba已经进行了优化,以分析代码并检测不需要边界检查的情况,并防止在运行时添加昂贵的检查。在上面的代码中就是这种情况,但在您的raytrace_range函数中却不是。 "enumerate" 和 "enumerate"+"zip" 可以大大帮助去除边界检查,因为Numba可以轻松证明索引位于数组的边界之内(理论上,它可以证明 "raytrace_range" 的情况,但当前实现还不够聪明)。
您可以使用 "assertions" 来解决这个问题。这不仅对优化有好处,也使您的代码更加健壮!
此外,底层JIT(LLVM-Lite)有时无法完全优化多维数组的索引。它们没有不被优化的理由,但编译器使用启发式方法来优化代码,这些方法远非完美(尽管平均而言相当不错)。您可以通过计算线的视图来帮助优化,这通常会带来微小的改进。
以下是改进后的代码:
@njit
def raytrace_range_opt(intensity_0, d_y, d_x):
    n_y, n_x = intensity_0.shape
    assert intensity_0.shape == d_y.shape
    assert intensity_0.shape == d_x.shape

    intensity_z = np.zeros((n_y, n_x), dtype=np.float64)

    for i in range(n_x):
        row_intensity_0 = intensity_0[i, :]
        row_d_x = d_x[i, :]
        row_d_y = d_y[i, :]

        for j in range(n_y):
            assert j >= 0  # Crazy optimization (see later)

            i_ij = row_intensity_0[j]
            dx_ij = row_d_x[j]
            dy_ij = row_d_y[j]

            # Always the same from here down
            if not dx_ij and not dy_ij:
                row_intensity_0[j] += i_ij
                continue

            # Remaining code left unmodified

注意事项

请注意,我认为函数raytrace_enumerate的索引是错误的:它应该是for i in range(n_y): for j in range(n_x):,因为访问是使用intensity_0[i, j]进行的,而你写的是n_y, n_x = intensity_0.shape。请注意,交换轴也可以根据您的验证函数给出正确的结果(这很可疑)。

assert j >= 0指令就可以使速度提高8%,这太不可思议了,因为如果n_x为正,整数迭代器j保证为正,而这始终是情况,因为它是一个形状!这显然是Numba的一个疏忽,LLVM-Lite无法优化(因为LLVM-Lite不知道什么是形状,以及它们总是正的)。Numba代码中这个明显的缺失假设导致了额外的边界检查(三个数组中的每一个),这些检查非常昂贵。


基准测试

下面是我的机器上的结果:

raytrace_range:           47.8 ms ± 265 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
raytrace_enumerate:       38.9 ms ± 208 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
raytrace_npndenumerate:   54.1 ms ± 363 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
raytrace_zip:             41 ms ± 657 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
raytrace_stack1:          86.7 ms ± 268 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
raytrace_stack2:          84 ms ± 432 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)

raytrace_range_opt:       38.6 ms ± 421 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)

正如您所看到的,在我的机器上,raytrace_range_opt 是最快的实现


这不就是和我的原始代码 raytrace_range 一样的吗(只不过加了'(int_[::1],)')?而且在性能上被enumerate和zip两者都超越了吧? - Nin17
@ZaeroDivide 我不确定这是否是过早的优化。如果 OP 使用了一个有很多零的数组,那么使用这个条件是有意义的,因为代码的很大一部分将被跳过。随机数据集可能不代表 OP 实际使用的输入。我们还不知道 OP 是否实际上有一个有用的条件输入。到目前为止,我们只能猜测。 - Jérôme Richard
1
我同意,if np.abs(dx_ij)>1 and if np.abs(dy_ij)>1是多余的,但在真实数据中,if not dx_ij and not dy_ij是常见的情况。我很想听听您认为代码的其余部分可以进行哪些优化,我已经将索引从[i, j]更改为[i][j],因为我发现这样更快。 - Nin17
好的,我会研究一下索引,但你可能是对的。至于assert j >= 0,我发现这没有任何影响,也许这取决于numba版本?我正在使用0.55.1。至于我的测试函数,我直接复制了它以回答这个问题,所以我没有进行过多的测试。 - Nin17
1
顺便提一下,现代处理器使用分支预测来提高速度。这意味着像你这样有很多条件的算法会受到这种行为的影响,数据集也会对结果性能产生很大影响。这是一个非常复杂的话题。欲了解更多信息,请阅读这篇优秀的文章。我同意ZaeroDivide关于测试“真实数据场景”的观点,但设置起来可能会相当困难。至于性能,需要注意的是SIMD是获得高性能的关键,但在这里使代码适应SIMD很困难。 - Jérôme Richard
显示剩余12条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接