为了测试你的代码,我去掉了parfor循环并用for循环代替,然后使用MATLAB分析器。 我在测试中使用了
nsims = 500
。
通过使用分析器,我发现你的代码中有两个关键瓶颈。 第一个是最嵌套的for循环(n-loop)中的
find()
函数。 第二个是三次调用
interpn()
函数。 这4行代码使用了计算时间的88%。
![enter image description here](https://istack.dev59.com/qePae.webp)
在这种情况下,由于函数调用的开销(尤其是考虑到嵌套循环中它接收的调用数量),以及内置的错误检查和管理,
find
函数比理想情况下要慢。将
find
函数替换为硬编码的二分搜索(如下所示)可大大提高性能,这仅替换了n循环中的
find
。对于
nsims = 500
,使用
find
的运行时间为
29.8秒。使用二分搜索,运行时间为
12.1秒。注意:这仅适用于数据已排序的情况,此代码无法替换每个实例中的find。
编辑:使用@EBH在另一个答案中提供的备选方法是更清晰的方式来完成这个任务。
searchVal=idiorand(n, j);
il = 1;
iu = sizeCumQZ;
while (il+1<iu)
lw=floor((il+iu)/2);
if cumQz(lw,zid(n, j-1)) >= searchVal
iu=lw;
else
il=lw;
end
end
if cumQz(il,zid(n, j-1))>=searchVal
zid(n,j) = il;
else
zid(n,j) = iu;
end
interpn
函数在方法检查、错误管理和数据格式管理方面会明显减慢速度。标准的
interpn
使用了大约100行代码,每次调用可以缩减到2行,并通过知道我们只需要一种插值类型并且我们的数据符合特定格式来显著提高性能。为此,我们直接使用
griddedInterpolant
函数(见下文)。同样,在
nsims = 500
的情况下,使用
interpn
函数(仍然使用未更改的
find
代码)运行时间为
29.8秒。使用下面的优化方法,运行时间减少到
20.4秒。
优化方法替换了这里所示的interp
调用。
st(k).ksim(:,j) = interpn(ks, zs , squeeze(kprime(:,xid(j),:,sid(j))), st(k).ksim(:,j-1),zsim(:,j-1),'linear');
st(k).Vsim(:,j) = interpn(ks, zs , squeeze(V(:,xid(j),:,sid(j))), st(k).ksim(:,j-1),zsim(:,j-1),'linear');
st(k).Psim(:,j) = interpn(ks, zs , squeeze(P(:,xid(j),:,sid(j))), st(k).ksim(:,j-1),zsim(:,j-1),'linear');
使用更多直接调用
griddedInterpolant
的方法,如下所示:
F = griddedInterpolant(ks,zs,squeeze(kprime(:,xid(j),:,sid(j))), 'linear','none');
st(k).ksim(:,j) = F(st(k).ksim(:,j-1),zsim(:,j-1));
F = griddedInterpolant(ks,zs,squeeze(V(:,xid(j),:,sid(j))), 'linear','none');
st(k).Vsim(:,j) = F(st(k).ksim(:,j-1),zsim(:,j-1));
F = griddedInterpolant(ks,zs,squeeze(P(:,xid(j),:,sid(j))), 'linear','none');
st(k).Psim(:,j) = F(st(k).ksim(:,j-1),zsim(:,j-1));
将二分搜索与find
替换为对griddedInterpolant
的调用,可以将总运行时间缩短到3.8秒,几乎比初始运行时间快了8倍。
还有三个需要注意的点:
1)我建议遵循先前答案中概述的建议,尽可能避免使用结构,并将您可以移出循环的任何内容移出(特别是ndgrid,因为这肯定只需要运行一次)。
2)我注意到,在使用nsims=5000
时,此脚本使用的总内存接近2.5G。如果这接近您系统的总可用内存,则可能会导致显着的减速。在这种情况下,我建议执行较小的计算批次,保存结果,然后进行进一步的计算。
3)最后,在我的测试中,使用parfor
比使用标准的for循环更慢。我认为这是因为在这种情况下,每个单独的循环都是一个相对较短的操作,而指定并行作业和管理集群工作人员所需的开销超过了并行处理的收益。如果您在一台大型计算机集群上,情况可能不同,但在我的单个(4核)机器上,parfor
只会导致减速。如果您愿意,我建议在进行上述推荐更改后,使用您的实际工作代码对每种情况进行测试。
我所做的全部代码更改如下所示,其中不包括使用结构体或其他小优化所建议的任何更改。
load file
tic;
nsim = 500
T = 12;
N = 1000;
searchVal=1;
il = 1;
iu = 1;
cumQx = cumsum(Qx);
cumQz = cumsum(Qz);
cumQs = cumsum(Qs);
sizeCumQZ = size(cumQz,1);
for k=1:nsim
st(k).ksim = kstar*ones(N, T);
st(k).Vsim = zeros(N,T);
st(k).Psim = zeros(N,T);
end
for k = 1:nsim
sysrand = rand(T, 1);
idiorand = rand(N, T);
sigmarand = rand(T,1);
xid = zeros(T, 1);
zid = zeros(N, T);
sid = zeros(T,1);
xid(1) = 8;
zid(:, 1) = 5;
sid(1) = 1;
simx = zeros(T,1);
zsim = ones(N,T)*zbar;
simsx = zeros(T,1);
[ks,zs] = ndgrid(kgrid,z);
for j = 2:T
sid(j) = find(cumQs(:, sid(j-1)) >= sigmarand(j), 1);
simsx(j-1) = sigmax(sid(j));
xid(j) = find(cumQx(:, xid(j-1)) >= sysrand(j), 1);
simx(j-1) = x(xid(j));
for n = 1:N
searchVal=idiorand(n, j);
il = 1;
iu = sizeCumQZ;
while (il+1<iu)
lw=floor((il+iu)/2);
if cumQz(lw,zid(n, j-1)) >= searchVal
iu=lw;
else
il=lw;
end
end
if cumQz(il,zid(n, j-1))>=searchVal
zid(n,j) = il;
else
zid(n,j) = iu;
end
zsim(n,j-1) = z(zid(n,j));
end
F = griddedInterpolant(ks,zs,squeeze(kprime(:,xid(j),:,sid(j))), 'linear','none');
st(k).ksim(:,j) = F(st(k).ksim(:,j-1),zsim(:,j-1));
F = griddedInterpolant(ks,zs,squeeze(V(:,xid(j),:,sid(j))), 'linear','none');
st(k).Vsim(:,j) = F(st(k).ksim(:,j-1),zsim(:,j-1));
F = griddedInterpolant(ks,zs,squeeze(P(:,xid(j),:,sid(j))), 'linear','none');
st(k).Psim(:,j) = F(st(k).ksim(:,j-1),zsim(:,j-1));
end;
end
toc;
编辑:通过进一步调整griddedInterpolant
,我将三个插值器合并为一个,并通过连接K、V和P插值器的网格和值,成功地获得了额外的15%速度提升。在代码顶部(最好在循环外),我用以下代码替换了原始网格创建:
zRange = max(z(:))-min(z(:))+1;
zzzGrid = [z;z+1*zRange;z+2*zRange];
[ksBig,zsBig] = ndgrid(kgrid,zzzGrid);
nZ = numel(z);
valGrid = zeros(size(ksBig));
将3个对griddedInterpolant
的调用替换为:
valGrid(:,1:nZ) = squeeze(kprime(:,xid(j),:,sid(j)));
valGrid(:,nZ+1:2*nZ) = squeeze(V(:,xid(j),:,sid(j)));
valGrid(:,2*nZ+1:3*nZ) = squeeze(P(:,xid(j),:,sid(j)));
F = griddedInterpolant(ksBig,zsBig,valGrid, 'linear','none');
st(k).ksim(:,j) = F(st(k).ksim(:,j-1),zsim(:,j-1));
st(k).Vsim(:,j) = F(st(k).ksim(:,j-1),zsim(:,j-1)+zRange);
st(k).Psim(:,j) = F(st(k).ksim(:,j-1),zsim(:,j-1)+2*zRange);
如果我们可以接受更加复杂的设置来换取19%的增长而不是15%的增长,我们可以将 griddedInterpolant
移到 j 循环之外。在代码开头,设置网格如下:
zRange = max(z(:))-min(z(:))+1;
zzzGrid = [z;z+1*zRange;z+2*zRange];
zzzRange = max(zzzGrid(:))-min(zzzGrid(:))+1;
zzzTGrid = [];
for j = 2:T
zzzTGrid(end+1:end+numel(zzzGrid)) = zzzGrid+(j-2)*zzzRange;
end
[ksBig,zsBig] = ndgrid(kgrid,zzzTGrid);
nZ = numel(z);
valGrid = zeros(size(ksBig));
将之前的griddedInterpolant
替换为以下内容:
for j = 2:T
valGrid(:,(1:nZ)+3*nZ*(j-2)) = squeeze(kprime(:,xid(j),:,sid(j)));
valGrid(:,(nZ+1:2*nZ)+3*nZ*(j-2)) = squeeze(V(:,xid(j),:,sid(j)));
valGrid(:,(2*nZ+1:3*nZ)+3*nZ*(j-2)) = squeeze(P(:,xid(j),:,sid(j)));
end;
F = griddedInterpolant(ksBig,zsBig,valGrid, 'linear','none');
for j = 2:T
st(k).ksim(:,j) = F(stTest(k).ksim(:,j-1),zsim(:,j-1)+3*zRange*(j-2));
st(k).Vsim(:,j) = F(stTest(k).ksim(:,j-1),zsim(:,j-1)+zRange+3*zRange*(j-2));
st(k).Psim(:,j) = F(stTest(k).ksim(:,j-1),zsim(:,j-1)+2*zRange+3*zRange*(j-2));
end
如果我们想更加挑剔(增加2%的速度),我们可以将所有调用 squeeze
的地方替换为一个不进行错误检查的版本,即 mySqueeze
:
function b = mySqueeze(a)
siz = size(a);
siz(siz==1) = [];
siz = [siz ones(1,2-length(siz))];
b = reshape(a,siz);