我想编写一个函数,它将获取形状为(N_ROWS,)
的索引lefts
,并创建一个矩阵out = (N_ROWS, N_COLS)
,其中out[i, j] = 1
当且仅当j >= lefts[i]
。以下是在循环中执行此操作的简单示例:
class Looped(Strategy):
def copy(self, lefts):
out = np.zeros([N_ROWS, N_COLS])
for k, l in enumerate(lefts):
out[k, l:] = 1
return out
现在我希望它尽可能快,因此我有不同实现该函数的方法:
- 纯Python循环
- 使用
@njit
的numba - 一个纯C++实现,我用
ctypes
调用
以下是100次运行的平均结果:
Looped took 0.0011599776260009093
Numba took 8.886413300206186e-05
CPP took 0.00013200821400096175
Numba的运行速度比下一个最快的实现——C++快约1.5倍,为什么呢?
- 在类似的问题中,我听说Cython速度较慢是因为它没有使用所有优化标志进行编译,但是C++实现是用
-O3
编译的,这样足以让我获得编译器可以给出的所有可能的优化吗? - 我不完全理解如何把NumPy数组传递给C++,我是否无意中复制了数据?
# numba implementation
@njit
def numba_copy(lefts):
out = np.zeros((N_ROWS, N_COLS), dtype=np.float32)
for k, l in enumerate(lefts):
out[k, l:] = 1.
return out
class Numba(Strategy):
def __init__(self) -> None:
# avoid compilation time when timing
numba_copy(np.array([1]))
def copy(self, lefts):
return numba_copy(lefts)
// array copy cpp
extern "C" void copy(const long *lefts, float *outdatav, int n_rows, int n_cols)
{
for (int i = 0; i < n_rows; i++) {
for (int j = lefts[i]; j < n_cols; j++){
outdatav[i*n_cols + j] = 1.;
}
}
}
// compiled to a .so using g++ -O3 -shared -o array_copy.so array_copy.cpp
# using cpp implementation
class CPP(Strategy):
def __init__(self) -> None:
lib = ctypes.cdll.LoadLibrary("./array_copy.so")
fun = lib.copy
fun.restype = None
fun.argtypes = [
ndpointer(ctypes.c_long, flags="C_CONTIGUOUS"),
ndpointer(ctypes.c_float, flags="C_CONTIGUOUS"),
ctypes.c_long,
ctypes.c_long,
]
self.fun = fun
def copy(self, lefts):
outdata = np.zeros((N_ROWS, N_COLS), dtype=np.float32, )
self.fun(lefts, outdata, N_ROWS, N_COLS)
return outdata
带有时间等信息的完整代码:
import time
import ctypes
from itertools import combinations
import numpy as np
from numpy.ctypeslib import ndpointer
from numba import njit
N_ROWS = 1000
N_COLS = 1000
class Strategy:
def copy(self, lefts):
raise NotImplementedError
def __call__(self, lefts):
s = time.perf_counter()
n = 1000
for _ in range(n):
out = self.copy(lefts)
print(f"{type(self).__name__} took {(time.perf_counter() - s)/n}")
return out
class Looped(Strategy):
def copy(self, lefts):
out = np.zeros([N_ROWS, N_COLS])
for k, l in enumerate(lefts):
out[k, l:] = 1
return out
@njit
def numba_copy(lefts):
out = np.zeros((N_ROWS, N_COLS), dtype=np.float32)
for k, l in enumerate(lefts):
out[k, l:] = 1.
return out
class Numba(Strategy):
def __init__(self) -> None:
numba_copy(np.array([1]))
def copy(self, lefts):
return numba_copy(lefts)
class CPP(Strategy):
def __init__(self) -> None:
lib = ctypes.cdll.LoadLibrary("./array_copy.so")
fun = lib.copy
fun.restype = None
fun.argtypes = [
ndpointer(ctypes.c_long, flags="C_CONTIGUOUS"),
ndpointer(ctypes.c_float, flags="C_CONTIGUOUS"),
ctypes.c_long,
ctypes.c_long,
]
self.fun = fun
def copy(self, lefts):
outdata = np.zeros((N_ROWS, N_COLS), dtype=np.float32, )
self.fun(lefts, outdata, N_ROWS, N_COLS)
return outdata
def copy_over(lefts):
strategies = [Looped(), Numba(), CPP()]
outs = []
for strategy in strategies:
o = strategy(lefts)
outs.append(o)
for s_0, s_1 in combinations(outs, 2):
for a, b in zip(s_0, s_1):
np.testing.assert_allclose(a, b)
if __name__ == "__main__":
copy_over(np.random.randint(0, N_COLS, size=N_ROWS))