如何加速Python函数中的“for”循环?

4

我有一个名为var的函数。我想知道在这个函数中运行for循环(针对多个坐标:xs和ys)的最佳方法是通过利用系统具有的所有处理器、核心和RAM内存来进行多进程/并行处理。

是否可以使用Dask模块实现?

pysheds文档可以在此处找到。

import numpy as np
from pysheds.grid import Grid

xs = 82.1206, 72.4542, 65.0431, 83.8056, 35.6744
ys = 25.2111, 17.9458, 13.8844, 10.0833, 24.8306

  
for (x,y) in zip(xs,ys):

    grid = Grid.from_raster('E:/data.tif', data_name='map')         
    grid.catchment(data='map', x=x, y=y, out_name='catch', recursionlimit=1500, xytype='label') 
        ....
        ....
    results


你可以尝试使用 numba - Mykola Zotko
2个回答

1
您没有发布 image1.tif 文件的链接,因此下面的示例代码使用 https://github.com/mdbartos/pysheds 中的 pysheds/data/dem.tif。基本思路是将输入参数 xsys 在您的情况下分成子集,然后将每个 CPU 分配给不同的子集进行处理。 main() 会计算两次解决方案,一次按顺序执行,一次并行执行,然后比较每个解决方案。并行解决方案存在一些效率问题,因为每个 CPU 都将读取图像文件,因此有改进的空间(即在并行部分外部读取图像文件,然后将生成的 grid 对象提供给每个实例)。
import numpy as np
from pysheds.grid import Grid
from dask.distributed import Client
from dask import delayed, compute

xs = 10, 20, 30, 40, 50, 60, 70, 80, 90, 100
ys = 25, 35, 45, 55, 65, 75, 85, 95, 105, 115, 125

def var(image_file, x_in, y_in):
    grid = Grid.from_raster(image_file, data_name='map')
    variable_avg = []
    for (x,y) in zip(x_in,y_in):
        grid.catchment(data='map', x=x, y=y, out_name='catch')
        variable = grid.view('catch', nodata=np.nan)
        variable_avg.append( np.array(variable).mean() )
    return(variable_avg)

def var_parallel(n_cpu, image_file, x_in, y_in):
    tasks = []
    for cpu in range(n_cpu):
        x_in = xs[cpu::n_cpu] # eg, cpu = 0: x_in = (10, 40, 70, 100)
        y_in = ys[cpu::n_cpu] # 
        tasks.append( delayed(var)(image_file, x_in, y_in) )
    ans = compute(tasks)
    # reassemble solution in the right order
    par_avg = [None]*len(xs)
    for cpu in range(n_cpu):
        par_avg[cpu::n_cpu] = ans[0][cpu]
    print('AVG (parallel)  =',par_avg)
    return par_avg

def main():
    image_file = 'pysheds/data/dem.tif'
    # sequential solution:
    seq_avg = var(image_file, xs, ys)
    print('AVG (sequential)=',seq_avg)
    # parallel solution:
    n_cpu = 3
    dask_client = Client(n_workers=n_cpu)
    par_avg = var_parallel(n_cpu, image_file, xs, ys)
    dask_client.shutdown()
    print('max error=',
        max([ abs(seq_avg[i]-par_avg[i]) for i in range(len(seq_avg))]))

if __name__ == '__main__': main()

我不明白为什么你不使用:multiprocessing.Pool?并将池大小设置为CPU数量或使用os.cpu_count(),然后执行以下操作: for x, y in zip(xs, ys): _async_proc = processes_pool.apply_async(var, (imag_name, x, y)) - StefanMZ

1

我尝试使用dask提供可重现的代码。您可以在其中添加pysheds的主要处理部分或任何其他函数,以便更快地并行迭代参数。

dask模块的文档可以在这里找到。

import dask
from dask import delayed, compute
from dask.distributed import Client, progress
from pysheds.grid import Grid

client = Client(threads_per_worker=2, n_workers=2) #Choose the number of workers and threads per worker over here to deploy for your task.

xs = 82.1206, 72.4542, 65.0431, 83.8056, 35.6744
ys = 25.2111, 17.9458, 13.8844, 10.0833, 24.8306

#Firstly, a function has to be created, where the iteration of the parameters is involved. 
def var(x,y):
        
    grid = Grid.from_raster('data.tif', data_name='map')
    grid.catchment(data='map', x=x, y=y, out_name='catch', recursionlimit=1500, xytype='label')
    ...
    ...
    return (result)

#Now calling the function in a 'dask' way. 
lazy_results = []

for (x,y) in zip(xs,ys):
    lazy_result = dask.delayed(var)(x,y)
    lazy_results.append(lazy_result)
       
#Final command to execute the function var(x,y) and get the result.
dask.compute(*lazy_results)

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接