你发布的程序在我的电脑上执行时间约为96秒。在探索并行计算之前,让我们优化一些内容。
alpha=numpy.zeros(Data.shape[0])
for i in range(0,Data.shape[0]):
Data[i]=Data[i]-numpy.mean(Data[i])
alpha[i]=1./numpy.linalg.norm(Data[i])
for i in range(0,Data.shape[0]):
for j in range(i,Data.shape[0]):
if(i==j):
CORR_CR[i][j]=1;
else:
corr=numpy.inner(Data[i],Data[j])*(alpha[i]*alpha[j]);
corr=int(numpy.absolute(corr)>=0.9)
CORR_CR[i][j]=CORR_CR[j][i]=corr
计算时间已经降至17秒。
data=[]
ii=[]
jj=[]
...
if(corr!=0):
data.append(corr)
ii.append(i)
jj.append(j)
data.append(corr)
ii.append(j)
jj.append(i)
...
CORR_CR=scipy.sparse.coo_matrix((data,(ii,jj)), shape=(Data.shape[0],Data.shape[0]))
计算时间降至13秒(改进可以忽略不计?),内存占用大大减少。如果考虑更大的数据集,这将是一个重大的改进。
correlator.pyx
的内容,从Numpy vs Cython speed开始,如下所示:
import numpy
cimport numpy
cimport scipy.linalg.cython_blas
ctypedef numpy.float64_t DTYPE_t
cimport cython
@cython.boundscheck(False)
@cython.wraparound(False)
@cython.nonecheck(False)
def process(numpy.ndarray[DTYPE_t, ndim=2] array,numpy.ndarray[DTYPE_t, ndim=1] alpha,int imin,int imax):
cdef unsigned int rows = array.shape[0]
cdef int cols = array.shape[1]
cdef unsigned int row, row2
cdef int one=1
ii=[]
jj=[]
data=[]
for row in range(imin,imax):
for row2 in range(row,rows):
if row==row2:
data.append(0)
ii.append(row)
jj.append(row2)
else:
corr=scipy.linalg.cython_blas.ddot(&cols,&array[row,0],&one,&array[row2,0],&one)*alpha[row]*alpha[row2]
corr=int(numpy.absolute(corr)>=0.9)
if(corr!=0):
data.append(corr)
ii.append(row)
jj.append(row2)
data.append(corr)
ii.append(row2)
jj.append(row)
return ii,jj,data
使用scipy.linalg.cython_blas.ddot()
函数进行内积计算。
为了将.pyx文件进行cythonize和编译,可以使用以下makefile(假设您正在使用Linux...)
all: correlator correlatorb
correlator: correlator.pyx
cython -a correlator.pyx
correlatorb: correlator.c
gcc -shared -pthread -fPIC -fwrapv -O2 -Wall -fno-strict-aliasing -I/usr/include/python2.7 -o correlator.so correlator.c
新的相关函数在主要的Python文件中被称为:
import correlator
ii,jj,data=correlator.process(Data,alpha,0,Data.shape[0])
通过使用编译好的循环,时间缩短到了5.4秒!已经快了十倍。而且这些计算是在单个进程上执行的!
让我们介绍并行计算。
请注意,向函数process
添加了两个参数:imin
和imax
。这些参数用于表示要计算的CORR_CR
行的范围。 这是为了预先考虑使用并行计算。 实际上,并行化程序的简单方法是将外部for
循环(索引i
)拆分成不同的进程。
每个进程将为特定的i
索引范围执行外部for循环,以平衡进程之间的工作负载。
程序必须完成以下操作:
- 进程0("根进程")从文件中读取向量
Data
。
- 使用MPI函数
bcast()
将Data
广播到所有进程。
- 计算每个进程要考虑的索引
i
的范围。
- 每个进程都会为相应的索引计算相关系数。矩阵的非零项会存储在各自进程的
data
,ii
,jj
列表中。
- 通过调用MPI函数
gather()
将这些列表汇集到根进程上。它会生成三个Size
列表的列表,可以将其连接起来以获取创建稀疏邻接矩阵所需的3个列表。
以下是代码:
import numpy
from mpi4py import MPI
import time
import scipy.sparse
import warnings
warnings.simplefilter('ignore',scipy.sparse.SparseEfficiencyWarning)
Size=MPI.COMM_WORLD.Get_size();
Rank=MPI.COMM_WORLD.Get_rank();
Name=MPI.Get_processor_name();
RandomNumbers={};
rndm_indx=numpy.random.choice(range(515),40,replace=False)
rndm_indx=numpy.sort(rndm_indx)
Data=numpy.ascontiguousarray(numpy.zeros((2000,515),dtype=numpy.float64))
if Rank==0:
Data=numpy.ascontiguousarray(numpy.random.rand(2000,515))
lin=numpy.linspace(0.,1.,515)
for i in range(Data.shape[0]):
Data[i]+=numpy.sin((1+i%10)*10*lin)*100
start=time.time();
Data=MPI.COMM_WORLD.bcast(Data, root=0)
RandomNumbers[Rank]=rndm_indx;
print Data.shape[0]
alpha=numpy.zeros(Data.shape[0],dtype=numpy.float64)
for i in range(0,Data.shape[0]):
Data[i]=Data[i]-numpy.mean(Data[i])
if numpy.linalg.norm(Data[i])==0:
print "process "+str(Rank)+" is facing a big problem"
else:
alpha[i]=1./numpy.linalg.norm(Data[i])
ilimits=numpy.zeros(Size+1,numpy.int32)
if Rank==0:
nbtaskperprocess=Data.shape[0]*Data.shape[0]/(2*Size)
icurr=0
for i in range(Size):
nbjob=0
while(nbjob<nbtaskperprocess and icurr<=Data.shape[0]):
nbjob+=(Data.shape[0]-icurr)
icurr+=1
ilimits[i+1]=icurr
ilimits[Size]=Data.shape[0]
ilimits=MPI.COMM_WORLD.bcast(ilimits, root=0)
import correlator
ii,jj,data=correlator.process(Data,alpha,ilimits[Rank],ilimits[Rank+1])
data = MPI.COMM_WORLD.gather(data, root=0)
ii = MPI.COMM_WORLD.gather(ii, root=0)
jj = MPI.COMM_WORLD.gather(jj, root=0)
if Rank==0:
data2=sum(data,[])
ii2=sum(ii,[])
jj2=sum(jj,[])
CORR_CR=scipy.sparse.coo_matrix((data2,(ii2,jj2)), shape=(Data.shape[0],Data.shape[0]))
print CORR_CR
end=time.time();
elapsed=(end-start)
print('Total Time',elapsed)
通过运行
mpirun -np 4 main.py
,计算时间为3.4秒。它并不比单进程快4倍...这很可能是由于计算的瓶颈在于标量积的计算,需要大量的内存带宽。而我的个人电脑在内存带宽方面非常有限...
最后一句话:有很多改进的可能性。
- 在每个进程中复制
Data
向量...这会影响程序所需的内存。以不同的方式分配计算,并通过交换内存与通信来解决此问题...
- 每个进程仍在计算所有向量的范数...可以通过让每个进程计算某些向量的范数,并在
alpha
上使用MPI函数
allreduce()
来改善...
该如何处理这个邻接矩阵?
我认为你已经知道这个问题的答案,但你可以将这个邻接矩阵提供给
sparse.csgraph
例程,如
connected_components()
或
laplacian()
。实际上,你离
谱聚类并不远!
例如,如果聚类很明显,则使用
connected_components()
就足够了:
if Rank==0:
S=CORR_CR.tocsr()
print S
n_components, labels=scipy.sparse.csgraph.connected_components(S, directed=False, connection='weak', return_labels=True)
print "number of different famillies "+str(n_components)
numpy.set_printoptions(threshold=numpy.nan)
print labels
mpirun -np 10 python main.py
,文件将被读取10次,相关矩阵将被计算10次,并且总时间将被打印10次。为了克服这个困难并减少内存占用,您需要将计算分配给进程,并使用MPI函数将所需数据传递给需要它的进程...请参阅https://mpi4py.scipy.org/docs/usrman/tutorial.html了解这些函数。 - francisData1 - numpy.mean(Data1)
,diff_Data2 = Data2 - numpy.mean(Data2)
不需要在每个相关点执行:您可以在任何对corr_coef
的调用之前一次性替换Data[i]=Data[i]- numpy.mean(Data[i])
。此外,存储alpha[:]=1./numpy.linalg.norm(Data[i])
很轻便,并且可以避免在corr_coef()
期间进行任何对numpy.linalg.norm()
的调用。最后,corr_coef(i,j)
只需编写numpy.inner(Data[i], Data[j])*alpha[i]*alpha[j]
即可。 - francisnditer
代替for循环...对于numpy数组,它通常比for循环更快。请参见这些示例,特别是那些使用标志multi_index
的示例。 - francisscipy.sparse.csr_matrix
和https://dev59.com/jGUo5IYBdhLWcg3wnwn2。 - francis