使用多进程的for循环以追加到列表中作为结果

3
我需要将一个for循环并行化。我的当前代码遍历了一个从xarray数据集中获取的id列表,使用相应的id从xarray数据集中获取行数据,调用一个函数(计算数据的三角分布),将函数结果附加到列表中,完成后将列表转换为 xarray 数据集,其中每个结果与相应的 id 相关联,以便稍后将此数据集附加到“主”数据集中。
我的代码大致如下:
from sklearn.preprocessing import MinMaxScaler
import xarray as xr
import scipy.stats as st

function call_func(data):
   scaler = MinMaxScaler()
   norm_data = scaler.fit_transform(np.reshape(data, (len(data),1)))
   params = st.triang.fit(norm_data)
   arg,loc,scale = params[:-2],params[-2],params[-1]
   dist = st.triang(loc=loc, scale=scale, *arg)
   return dist

if __name__ == "__main__":
for id in my_dataset['id'].values:
        row_data= my_dataset.sel(id=id)['data'].values[0]
        if len(row_data)>3 and all(row_data== 0) == False:
                result = call_func(row_data)
                result_list.append(result)
        else:
            result_list.append([])

new_dataset = xr.Dataset({'id': my_dataset['id'].values,
                          'dist_data':(['id','dist'],
                           np.reshape(np.array(result_list),(len(result_list),1)))
                           })

由于id_array很大,我希望可以并行化循环。这是一个通用的问题,但我对多进程工具还不够熟悉。您有什么建议如何将多进程与此任务结合起来吗?我的研究表明,使用multiprocessing和向列表中添加元素并不是最明智的选择。

1个回答

5
我会尝试给出一个简单的示例,希望你能推断出所需的修改并将内容更加易懂。以下是一段常规循环代码的版本:
id_array = [*range(10)]

result = []
for id in id_array:
    if id % 2 == 0:
        result.append((id, id))
    else:
        result.append((id, id ** 2))

print(result)

输出:

[(0, 0), (1, 1), (2, 2), (3, 9), (4, 4), (5, 25), (6, 6), (7, 49), (8, 8), (9, 81)]


在这里,我使用ProcessPoolExecutor,将其分为4个进程并行化:

from concurrent.futures import ProcessPoolExecutor

id_array = [*range(10)]


def myfunc(id):
    if id % 2 == 0:
        return id, id
    else:
        return id, id ** 2


result = []
with ProcessPoolExecutor(max_workers=4) as executor:
    for r in executor.map(myfunc, id_array):
        result.append(r)

print(result)

输出结果(相同):

[(0, 0)、(1, 1)、(2, 2)、(3, 9)、(4, 4)、(5, 25)、(6, 6)、(7, 49)、(8, 8)、(9, 81)]


基本步骤如下:

  1. for 循环中的内容提取到一个函数中,该函数返回所需的值
  2. 使用 ProcessPoolExecutorexecutor.map(myfunc, id_array)
  3. 将返回的值附加到您的结果列表中。

谢谢您的建议。我会尝试实现它。然而,在Jupyter中尝试您的代码时,出现了以下错误消息: BrokenProcessPool:在未来运行或挂起时,进程池中的一个进程突然终止。 - zeniapy
2
也许这个关于在交互式 shell 中运行 ProcessPoolExecutor 的答案可以帮到你:https://dev59.com/52Uo5IYBdhLWcg3wnwj2#15918742 - Adam.Er8

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接