如何使用mpi4py并行化这个Python脚本?

5

如果已经有人问过了,我道歉。但我已经阅读了大量文档,仍然不确定如何做我想做的事情。

我希望同时在多个核心上运行Python脚本。

我有1800个.h5文件在一个目录中,它们的名称为“snaphots_s1.h5”,“snapshots_s2.h5”等,每个文件约30MB大小。这个Python脚本:

  1. 从目录中逐个读取h5py文件。
  2. 提取和操作h5py文件中的数据。
  3. 创建所提取数据的图表。

完成后,脚本接着从目录中读取下一个h5py文件并执行相同的过程。因此,在执行此操作期间,没有任何处理器需要通信。

脚本如下:

import h5py
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.colors as colors
import cmocean
import os  

from mpi4py import MPI

de.logging_setup.rootlogger.setLevel('ERROR')

# Plot writes

count = 1
for filename in os.listdir('directory'):  ### [PERF] Applied to ~ 1800 .h5 files
    with h5py.File('directory/{}'.format(filename),'r') as file:

         ### Manipulate 'filename' data.  ### [PERF] Each fileI ~ 0.03 TB in size
         ...

         ### Plot 'filename' data.        ### [PERF] Some fileO is output here
         ...
count = count + 1

理想情况下,我希望使用mpi4py来完成这个任务(出于各种原因),不过我也可以考虑其他选项,比如multiprocessing.Pool(但事实上我没能让它起作用。我试图按照这里所述的方法)。
那么,我的问题是:我需要在脚本中输入哪些命令才能使用mpi4py并行处理呢?或者,如果这个选项不可行,我该如何并行化脚本呢?

mpi4py 中是否有特定的内容会排除 multiprocessing.Pool?我不熟悉 h5py 或 mpi4py,但非常熟悉 multiprocessing。对我来说,这似乎是一个需要将任务拆分为一组带有文件名参数的工作进程池的任务。 - Hannu
@Hannu 我不确定它是否适用于我正在使用的模块。但是,如果你能解释一下multiprocessing模块,我会尝试一下。 - Matthew Cassell
需要 HPC 布局吗?[1]:如果在纯 [SERIAL] 调度下运行,工作包处理需要多少 CPU 天才能从头到尾完成?[2]:此 <Manipulate 'filename; data> + <Plot 'filename' data> 包含多少个文件 x 每个文件有多少 [TB] 需要处理?[3]:在获得 HPC 布局运行您的工作包的批准之前,您计划总共花费多少人天的人力资源来进行原型设计和微调 HPC 部分? - user3666197
@user3666197 我不知道你所说的 HPC fabric 是什么意思。将其依次应用于 1800 个 .h5 文件时,处理需要大约 6 小时。每个文件的大小约为 0.03TB。我不打算在这方面花费太长时间。如果 multiprocessing 模块可行的话,我可能只会学习它并使用它。 - Matthew Cassell
你确定吗?给定上述数字,每个文件的大小为0.03E+12[B],在6 x 60 x 60 ~ 21,600 [秒] / 1800 [1]个文件中,每个文件处理约需要~12 [秒]。如果需要在这12 [秒]内只加载一个文件,则需要具有超过2.33 [GByte/s]的零延迟读取通道,且不对数据进行任何计算并且最终不产生输出。还有其他事情发生。HPC-fabric是HPC基础设施的垂直层次结构{HPC节点+控制节点+HPC文件系统+HPC数据分配连接性+HPC控制平面连接性+HPC工作包流}。 - user3666197
@user3666197 我刚刚意识到,这6个小时对应的是我有3600个文件的时候,其中一半我现在已经删除了,因为数据太多了。所以每个文件实际上只需要大约一半的时间。请原谅我的错误。 - Matthew Cassell
3个回答

2
你应该选择使用多进程,而Javier的例子应该可以工作,但我想把它分解开来,以便你可以理解每个步骤。

一般来说,在使用进程池时,你需要创建一个进程池,这些进程会空闲直到你给它们一些工作。最好的方法是创建一个函数,每个进程将单独执行该函数。

def worker(fn):
    with h5py.File(fn, 'r') as f:
        # process data..
        return result

那很简单。每个进程都会运行它,并将结果返回给父进程。
现在我们有了做工作的函数 worker,让我们为它创建输入数据。它需要一个文件名,因此我们需要列出所有的文件。
full_fns = [os.path.join('directory', filename) for filename in 
            os.listdir('directory')]

接下来初始化进程池。

import multiprocessing as mp
pool = mp.Pool(4)  # pass the amount of processes you want
results = pool.map(worker, full_fns)  

# pool takes a worker function and input data
# you usually need to wait for all the subprocesses done their work before 
using the data; so you don't work on partial data.

pool.join()
poo.close()

现在,您可以通过results访问您的数据。
for r in results:
    print r

请在评论中告诉我这个方法对您是否有效。


我很感激你花时间解释如何使用多进程,我相信这对其他人也会有用。这就是为什么我给了你赏金的原因。然而,这个解决方案对我并没有起作用。当我运行脚本时,飞船出现在我的 Mac 底角,然后几乎瞬间消失,什么都没有计算出来。 - Matthew Cassell
听起来工人们没有得到他们的任务。使用打印调试您的工作函数。首先检查它是否适用于单个进程(使用工作函数),然后再扩展到多个进程。 - Chen A.
{btsdaf} - Javier
@Javier 在池完成处理之前迭代结果意味着你将迭代部分数据。可能还有子进程在运行。为了确保所有子进程都完成,你应该使用 join() 方法加入到池中。 - Chen A.
{btsdaf} - Javier

1
你应该能够轻松地使用库来实现多进程。
from multiprocessing.dummy import Pool

def processData(files):
    print files
    ...
    return result

allFiles = glob.glob("<file path/file mask>")
pool = Pool(6) # for 6 threads for example
results = pool.map(processData, allFiles)

感谢您的帮助。 - Matthew Cassell

1

多进程编程不应该比这更复杂:

def process_one_file(fn):
    with h5py.File(fn, 'r') as f:
        ....
    return is_successful


fns = [os.path.join('directory', fn) for fn in os.listdir('directory')]
pool = multiprocessing.Pool()
for fn, is_successful in zip(fns, pool.imap(process_one_file, fns)):
    print(fn, "succedded?", is_successful)

感谢您的帮助。 - Matthew Cassell

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接