Python Numpy与Matlab:数组赋值性能

5
我已经写了多年的Matlab代码,最近开始写Python。让我试着解释一下我面临的问题:
我的代码的一部分将大数组中的单元格(例如大小为1080x1400的图像)与较小的数组(大小为770x700的网格)关联起来。大数组中的所有单元格都可以与整个网格或较小的部分相关联,这意味着大量大数组中的单元格可能与小数组中的同一单元格相关联。我编写了两组代码,一组是Matlab,另一组是Python。
由于某种原因,Matlab代码平均运行时间为41毫秒,而Python代码在Pycharm中平均运行时间为4.1秒(均测量100次)。有什么办法可以大幅提高Numpy的性能吗?
虽然我总是以向量化形式编写,但在这种情况下,代码是用for循环编写的,我认为这是适当的。
谢谢
示例输入数据链接:

https://technionmail-my.sharepoint.com/:x:/g/personal/barakp_campus_technion_ac_il/Eb-JELhUNslJm219qI6bflEBxEv3XnOsGTQaTZN7GfzUbA?e=CeUjRT

https://technionmail-my.sharepoint.com/:i:/g/personal/barakp_campus_technion_ac_il/ETOjjmtzedpBi9YMKfI7778Bz3also9U9acvosMM1gKK0w?e=cQ4afV

Matlab 代码:

%%
clear;clc;
InputCoord = readmatrix('InputCoord.csv');
%%
Wx = InputCoord(:,3)' + 1;
Wy = InputCoord(:,4)' + 1;
OutMtx = zeros(770,770);

%%
fp_Row = InputCoord(:,1)' + 1;
fp_Col = InputCoord(:,2)' + 1;
DataMtx = single(imread('DataMtx.tif'))./255;
%%
number_of_times = 100;
t_stop = zeros(number_of_times,1);
for jj = 1:number_of_times
    N = 1;
    t_start = tic;
    for ii = 1:size(Wx,2)
        Wx_ind = Wx(ii);
        Wy_ind = Wy(ii);
        fp_Row_ind = fp_Row(ii);
        fp_Col_ind = fp_Col(ii);
        if ii>1 && (Wx(ii)~=Wx(ii-1) || Wy(ii)~=Wy(ii-1))
            N = 1;
        end
        
        OutMtx(Wx_ind, Wy_ind) = ((N-1)*OutMtx(Wx_ind, Wy_ind) + DataMtx(fp_Row_ind, fp_Col_ind))/N;
        N = N + 1;
    end
    
    t_stop(jj) = toc(t_start);
end

Python 代码:

import numpy as np
import cv2
import time


InputCoord = np.genfromtxt('InputCoord.csv', delimiter=',')
number_of_coords = np.shape(InputCoord)[0]
Wx = InputCoord[:, 2].astype(dtype=np.int32).reshape((1, number_of_coords))
Wy = InputCoord[:, 3].astype(dtype=np.int32).reshape((1, number_of_coords))
OutMtx = np.zeros((770, 770))

fp_Row = InputCoord[:, 0].astype(dtype=np.int32).reshape((1, number_of_coords))
fp_Col = InputCoord[:, 1].astype(dtype=np.int32).reshape((1, number_of_coords))
DataMtx = cv2.imread('DataMtx.tif', -1).astype(dtype=np.float32) / 255
# print(f' DataMtx flags:{DataMtx.flags}')
DataMtxf = np.asarray(DataMtx, order='F')
number_of_times = 100
t_stop = np.zeros((1, number_of_times))
for jj in range(number_of_times):
    t_start = time.time()
    N = 1
    for ii in range(number_of_coords):
        Wx_ind = Wx[0, ii]
        Wy_ind = Wy[0, ii]
        fp_Row_ind = fp_Row[0, ii]
        fp_Col_ind = fp_Col[0, ii]
        if (ii > 1) and ((Wx[0, ii] != Wx[0, ii - 1]) or (Wy[0, ii] != Wy[0, ii - 1])):
            N = 1

        OutMtx[Wx_ind, Wy_ind] = ((N - 1) * OutMtx[Wx_ind, Wy_ind] + DataMtx[fp_Row_ind, fp_Col_ind]) / N
        N = N + 1
    t_stop[0, jj] = time.time() - t_start
print(f'mean update time = {np.mean(t_stop)}')

这可能有些过度,但如果你真的想提高numpy的性能,那么你可以通过使用C扩展来扩展你的Python代码,并创建一个使用C进行优化并行实现。我知道这会引入更多的复杂性,但如果你的代码确实依赖于更快的速度,那么这可能值得一试。这里有一篇有用的文章,可以帮助你入门。https://medium.com/analytics-vidhya/beating-numpy-performance-by-extending-python-with-c-c9b644ee2ca8 - A Webb
1
你的[mre]应该包括示例数据:WxWyfp_Rowfp_ColDataMtxf。你的Python代码是否实现了你想要的功能? - wwii
MATLAB进行某种jit编译,因此您可以使用ii迭代。 numpy不会这样做。尽可能矢量化(就像我多年前在MATLAB中所做的那样)。或者使用numba创建已编译版本。 - hpaulj
1
你可能会更有帮助,如果使用简化版本的ii循环和[mcve]值。在numpy中,像reshape((1, number_of_coords))Wx[0, ii]这样的东西看起来像是从MATLAB中继承过来的。它们不会影响性能,但会使代码变得混乱。但是,N的迭代性质可能是通过使用整个数组的numpy操作(“向量化”)加速代码的最大障碍。我对此的情况没有清晰的认识。 - hpaulj
Wx,Wy,fp_Row,fp_Col是与帖子中共享的关联数组的坐标,它们的大小为1440x1080=1555200。DataMtxf只是我尝试将内存顺序从行式(C)更改为列式(Fortran)的一种方法。我询问的加速是ii for循环,代码实现了我的要求。ii for循环计算了与同一个OutMtx单元格相关联的DataMtx单元格的移动平均值,可以通过使用逻辑索引创建掩码并对DataMtx单元格执行平均值来部分矢量化,但仍然需要for循环。 - BarakP
1
“shared in links” - 你的 [mre] 中的示例数据应该在问题中提供,我们不应该从外部资源获取它。我同意 @hpaulj 的评论。 - wwii
1个回答

2

问题已解决:

我使用了numba进行jit编译,现在Python代码的平均运行时间为17毫秒!!

感谢。

import numpy as np
import cv2
import time
import numba

@numba.jit(nopython=True)
def Pix2Grid_MovAvg(DataMtx, OutMtx, Wx, Wy, fp_Row, fp_Col, number_of_coords):
    N = 1

    for ii in range(number_of_coords):
        Wx_ind = Wx[0, ii]
        Wy_ind = Wy[0, ii]
        fp_Row_ind = fp_Row[0, ii]
        fp_Col_ind = fp_Col[0, ii]
        if (ii > 1) and ((Wx[0, ii] != Wx[0, ii - 1]) or (Wy[0, ii] != Wy[0, ii - 1])):
            N = 1

        OutMtx[Wx_ind, Wy_ind] = ((N - 1) * OutMtx[Wx_ind, Wy_ind] + DataMtx[fp_Row_ind, fp_Col_ind]) / N
        N += 1
    return OutMtx


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接