MPI python-Open-MPI

9
我有一个20,000*515的numpy矩阵,代表着生物数据。我需要找出生物数据的相关系数,结果将是一个20,000*20,000的矩阵,其中包含相关值。然后,如果每个相关系数大于阈值,我将使用1和0填充numpy数组。
我使用numpy.corrcoef查找相关系数,在我的机器上运行良好。
然后,当我尝试将其放入具有10台计算机和节点从2到8变化的集群中时,每个节点生成(40)随机数字,并从生物数据中获取这些40个随机列,结果会遇到内存问题。
然后,我尝试重写程序,例如获取每一行查找相关系数,如果该值大于阈值,则将1存储在矩阵中,否则存储0,而不是创建相关矩阵。但是,运行此程序需要1.30小时。我需要运行100次。
请问是否可以提出更好的解决方案,例如如何通过在每个核心完成任务后分配作业来解决内存问题。我对MPI很陌生。下面是我的代码。
如果您需要更多信息,请让我知道。 谢谢
    import numpy
    from mpi4py import MPI
    import time


    Size=MPI.COMM_WORLD.Get_size();
    Rank=MPI.COMM_WORLD.Get_rank();
    Name=MPI.Get_processor_name();

    RandomNumbers={};





    rndm_indx=numpy.random.choice(range(515),40,replace=False)
    rndm_indx=numpy.sort(rndm_indx)


    Data=numpy.genfromtxt('MyData.csv',usecols=rndm_indx);




    RandomNumbers[Rank]=rndm_indx;
    CORR_CR=numpy.zeros((Data.shape[0],Data.shape[0]));


    start=time.time();
    for i in range(0,Data.shape[0]):
        Data[i]=Data[i]-np.mean(Data[i]);
        alpha1=1./np.linalg.norm(Data[i]);
        for j in range(i,Data.shape[0]):
           if(i==j):
               CORR_CR[i][j]=1;
           else:
               Data[j]=Data[j]-np.mean(Data[j]);
               alpha2=1./np.linalg.norm(Data[j]);
               corr=np.inner(Data[i],Data[j])*(alpha1*alpha2);
               corr=int(np.absolute(corrcoef)>=0.9)
               CORR_CR[i][j]=CORR_CR[j][i]=corr
    end=time.time();           

    CORR_CR=CORR_CR-numpy.eye(CORR_CR.shape[0]);  


    elapsed=(end-start)
    print('Total Time',elapsed)

1
欢迎来到Stackoverflow!这是一个非常有趣的问题。在MPI中,默认情况下,所有代码都会被所有进程执行。如果你运行mpirun -np 10 python main.py,文件将被读取10次,相关矩阵将被计算10次,并且总时间将被打印10次。为了克服这个困难并减少内存占用,您需要将计算分配给进程,并使用MPI函数将所需数据传递给需要它的进程...请参阅https://mpi4py.scipy.org/docs/usrman/tutorial.html了解这些函数。 - francis
谢谢Francis。我会查看MPI命令,同时你有更好的找到相关系数以减少执行时间的想法吗?现在它需要近2小时。 - RG20
1
操作Data1 - numpy.mean(Data1)diff_Data2 = Data2 - numpy.mean(Data2)不需要在每个相关点执行:您可以在任何对corr_coef的调用之前一次性替换Data[i]=Data[i]- numpy.mean(Data[i])。此外,存储alpha[:]=1./numpy.linalg.norm(Data[i])很轻便,并且可以避免在corr_coef()期间进行任何对numpy.linalg.norm()的调用。最后,corr_coef(i,j)只需编写numpy.inner(Data[i], Data[j])*alpha[i]*alpha[j]即可。 - francis
1
可能有一种方法可以使用nditer代替for循环...对于numpy数组,它通常比for循环更快。请参见这些示例,特别是那些使用标志multi_index的示例。 - francis
1
最后,如果高相关性很少见,使用稀疏矩阵将大大减少您的内存需求。例如,请参阅scipy.sparse.csr_matrix和https://dev59.com/jGUo5IYBdhLWcg3wnwn2。 - francis
谢谢Francis。我已经更新了我的代码,但仍然使用for循环。我是Python的新手,让我尝试使用nditer。再次感谢。 - RG20
1个回答

11

你发布的程序在我的电脑上执行时间约为96秒。在探索并行计算之前,让我们优化一些内容。

  • 将向量的范数存储起来,以避免每次需要时都重新计算。alpha1=1./numpy.linalg.norm(Data[i]);移出第二个循环是一个好的起点。由于向量在计算过程中不会改变,它们的范数可以提前计算:

alpha=numpy.zeros(Data.shape[0])
for i in range(0,Data.shape[0]):
  Data[i]=Data[i]-numpy.mean(Data[i])
  alpha[i]=1./numpy.linalg.norm(Data[i])

for i in range(0,Data.shape[0]):
  for j in range(i,Data.shape[0]):
    if(i==j):
       CORR_CR[i][j]=1;
    else:
       corr=numpy.inner(Data[i],Data[j])*(alpha[i]*alpha[j]);
       corr=int(numpy.absolute(corr)>=0.9)
       CORR_CR[i][j]=CORR_CR[j][i]=corr

计算时间已经降至17秒。

  • 假设向量之间不高度相关,大部分相关系数将会四舍五入为零。因此矩阵很可能是稀疏的(充满零)。让我们使用非常易于填充的scipy.sparse.coo_matrix稀疏矩阵格式:非空项和它们的坐标 i,j 将被存储在列表中。

data=[]
ii=[]
jj=[]
...
  if(corr!=0):
           data.append(corr)
           ii.append(i)
           jj.append(j)
           data.append(corr)
           ii.append(j)
           jj.append(i)
...
CORR_CR=scipy.sparse.coo_matrix((data,(ii,jj)), shape=(Data.shape[0],Data.shape[0]))

计算时间降至13秒(改进可以忽略不计?),内存占用大大减少。如果考虑更大的数据集,这将是一个重大的改进。

  • Python中的for循环效率相对较低。例如,请参见for loop in python is 10x slower than matlab。但是有很多解决方法,例如使用向量化操作或优化的迭代器,例如由numpy.nditer提供的迭代器。for循环效率低的原因之一是Python是解释性语言:在处理过程中没有编译发生。因此,为了克服这个问题,代码中最棘手的部分可以通过使用Cython这样的工具进行编译。

    1. 代码的关键部分写在Cython中,放在专门的文件correlator.pyx中。

    2. Cython将此文件转换为correlator.c文件

    3. 此文件由您喜欢的C编译器gcc编译,以构建共享库correlator.so

    4. 导入correlator后,可以在程序中使用优化的函数。

correlator.pyx的内容,从Numpy vs Cython speed开始,如下所示:

import numpy

cimport numpy
cimport scipy.linalg.cython_blas
ctypedef numpy.float64_t DTYPE_t
cimport cython

@cython.boundscheck(False)
@cython.wraparound(False)
@cython.nonecheck(False)
def process(numpy.ndarray[DTYPE_t, ndim=2] array,numpy.ndarray[DTYPE_t, ndim=1] alpha,int imin,int imax):

    cdef unsigned int rows = array.shape[0]
    cdef  int cols = array.shape[1]
    cdef unsigned int row, row2
    cdef  int one=1
    ii=[]
    jj=[]
    data=[]

    for row in range(imin,imax):
        for row2 in range(row,rows):
            if row==row2:
               data.append(0)
               ii.append(row)
               jj.append(row2)
            else:
                corr=scipy.linalg.cython_blas.ddot(&cols,&array[row,0],&one,&array[row2,0],&one)*alpha[row]*alpha[row2]
                corr=int(numpy.absolute(corr)>=0.9)
                if(corr!=0):
                    data.append(corr)
                    ii.append(row)
                    jj.append(row2)

                    data.append(corr)
                    ii.append(row2)
                    jj.append(row)

    return ii,jj,data

使用scipy.linalg.cython_blas.ddot()函数进行内积计算。

为了将.pyx文件进行cythonize和编译,可以使用以下makefile(假设您正在使用Linux...)

all: correlator correlatorb


correlator: correlator.pyx
    cython -a correlator.pyx

correlatorb: correlator.c
    gcc -shared -pthread -fPIC -fwrapv -O2 -Wall -fno-strict-aliasing -I/usr/include/python2.7 -o correlator.so correlator.c
新的相关函数在主要的Python文件中被称为:
import correlator
ii,jj,data=correlator.process(Data,alpha,0,Data.shape[0])

通过使用编译好的循环,时间缩短到了5.4秒!已经快了十倍。而且这些计算是在单个进程上执行的!

让我们介绍并行计算。 请注意,向函数process添加了两个参数:iminimax。这些参数用于表示要计算的CORR_CR行的范围。 这是为了预先考虑使用并行计算。 实际上,并行化程序的简单方法是将外部for循环(索引i)拆分成不同的进程。

每个进程将为特定的i索引范围执行外部for循环,以平衡进程之间的工作负载。

程序必须完成以下操作:

  1. 进程0("根进程")从文件中读取向量Data
  2. 使用MPI函数bcast()Data广播到所有进程。
  3. 计算每个进程要考虑的索引i的范围。
  4. 每个进程都会为相应的索引计算相关系数。矩阵的非零项会存储在各自进程的dataiijj列表中。
  5. 通过调用MPI函数gather()将这些列表汇集到根进程上。它会生成三个Size列表的列表,可以将其连接起来以获取创建稀疏邻接矩阵所需的3个列表。

以下是代码:

import numpy
from mpi4py import MPI
import time
import scipy.sparse

import warnings
warnings.simplefilter('ignore',scipy.sparse.SparseEfficiencyWarning)

Size=MPI.COMM_WORLD.Get_size();
Rank=MPI.COMM_WORLD.Get_rank();
Name=MPI.Get_processor_name();

#a dummy set of data is created here. 
#Samples such that (i-j)%10==0 are highly correlated.
RandomNumbers={};
rndm_indx=numpy.random.choice(range(515),40,replace=False)
rndm_indx=numpy.sort(rndm_indx)

Data=numpy.ascontiguousarray(numpy.zeros((2000,515),dtype=numpy.float64))
if Rank==0:
    #Data=numpy.genfromtxt('MyData.csv',usecols=rndm_indx);
    Data=numpy.ascontiguousarray(numpy.random.rand(2000,515))
    lin=numpy.linspace(0.,1.,515)
    for i in range(Data.shape[0]):
         Data[i]+=numpy.sin((1+i%10)*10*lin)*100

start=time.time();

#braodcasting the matrix
Data=MPI.COMM_WORLD.bcast(Data, root=0)

RandomNumbers[Rank]=rndm_indx;
print Data.shape[0]

#an array to store the inverse of the norm of each sample
alpha=numpy.zeros(Data.shape[0],dtype=numpy.float64)
for i in range(0,Data.shape[0]):
        Data[i]=Data[i]-numpy.mean(Data[i])
        if numpy.linalg.norm(Data[i])==0:
            print "process "+str(Rank)+" is facing a big problem"
        else:
            alpha[i]=1./numpy.linalg.norm(Data[i])


#balancing the computational load between the processes. 
#Each process must perform about Data.shape[0]*Data.shape[0]/(2*Size) correlation.
#each process cares for a set of rows. 
#Of course, the last rank must care about more rows than the first one.

ilimits=numpy.zeros(Size+1,numpy.int32)
if Rank==0:
    nbtaskperprocess=Data.shape[0]*Data.shape[0]/(2*Size)
    icurr=0
    for i in range(Size):
        nbjob=0
        while(nbjob<nbtaskperprocess and icurr<=Data.shape[0]):
            nbjob+=(Data.shape[0]-icurr)
            icurr+=1
        ilimits[i+1]=icurr
    ilimits[Size]=Data.shape[0]
ilimits=MPI.COMM_WORLD.bcast(ilimits, root=0)          

#the "local" job has been cythonized in file main2.pyx
import correlator
ii,jj,data=correlator.process(Data,alpha,ilimits[Rank],ilimits[Rank+1])

#gathering the matrix inputs from every processes on the root process.
data = MPI.COMM_WORLD.gather(data, root=0)
ii = MPI.COMM_WORLD.gather(ii, root=0)
jj = MPI.COMM_WORLD.gather(jj, root=0)

if Rank==0:
   #concatenate the lists
   data2=sum(data,[])
   ii2=sum(ii,[])
   jj2=sum(jj,[])
   #create the adjency matrix
   CORR_CR=scipy.sparse.coo_matrix((data2,(ii2,jj2)), shape=(Data.shape[0],Data.shape[0]))

   print CORR_CR

end=time.time();           

elapsed=(end-start)
print('Total Time',elapsed)
通过运行mpirun -np 4 main.py,计算时间为3.4秒。它并不比单进程快4倍...这很可能是由于计算的瓶颈在于标量积的计算,需要大量的内存带宽。而我的个人电脑在内存带宽方面非常有限...
最后一句话:有很多改进的可能性。 - 在每个进程中复制Data向量...这会影响程序所需的内存。以不同的方式分配计算,并通过交换内存与通信来解决此问题... - 每个进程仍在计算所有向量的范数...可以通过让每个进程计算某些向量的范数,并在alpha上使用MPI函数allreduce()来改善... 该如何处理这个邻接矩阵? 我认为你已经知道这个问题的答案,但你可以将这个邻接矩阵提供给sparse.csgraph例程,如connected_components()laplacian()。实际上,你离谱聚类并不远!
例如,如果聚类很明显,则使用connected_components()就足够了:
if Rank==0:
    # coo to csr format
    S=CORR_CR.tocsr()
    print S
    n_components, labels=scipy.sparse.csgraph.connected_components(S, directed=False, connection='weak', return_labels=True)

    print "number of different famillies "+str(n_components)

    numpy.set_printoptions(threshold=numpy.nan)
    print labels

谢谢你,Francis。我非常感激。 - RG20
我在哪里可以找到更多关于scipy.linalg.cython_blas.ddot()的信息?谢谢。 - RG20
1
scipy.linalg.cython_blas.ddot()的文档不是很详细... 它只说明scipy.linalg.cython_blas.ddot是一个原始函数指针(Fortran风格的指针参数),对应于BLAS的函数ddot() - francis
1
另外,对于一维数组,numpy.inner()numpy.dot() 相似。https://dev59.com/BGgu5IYBdhLWcg3w2ahm 此外,numpy.dot() 通常会分派到 BLAS 的 ddot() 上。https://dev59.com/8WIj5IYBdhLWcg3w04Xd - francis
3
非常感谢您提供如此详细和清晰的答复!这对我优化代码非常有帮助。 - Sheece Gardazi

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接