要做到这一点,在MPI中您需要通过至少一个额外的步骤。
问题在于,最常见的聚集/散开例程MPI_Scatterv和MPI_Gatherv允许您传递一个计数/位移的“向量”(v),而不仅仅是Scatter和Gather的一个计数,但是所有类型都被认为是相同的。在这里,没有办法绕过它;每个块的内存布局都不同,因此必须由不同类型处理。如果块之间只有一个区别-某些块具有不同数量的列,或者某些块具有不同数量的行-那么仅使用不同的计数就足够了。但是对于不同的列和行,计数不起作用;您确实需要能够指定不同的类型。
所以你真正想要的是一个经常讨论但从未实现的MPI_Scatterw(其中w表示vv;例如,计数和类型都是向量)程序。但这种方法不存在。您可以使用更通用的
MPI_Alltoallw调用来实现最接近的操作,它允许完全通用的数据全发全收;正如规范所述,
"通过使除一个进程外的所有过程具有sendcounts(i)= 0,这实现了MPI_SCATTERW函数。"。
因此,您可以通过使用MPI_Alltoallw,使除了最初拥有所有数据的任务(我们在这里假设为rank 0)的所有进程将其所有发送计数发送为零来完成此操作。除了第一个要从rank 0获取的数据量之外,所有任务也都将其所有接收计数设置为零。
对于进程0的发送计数,我们首先需要定义四种不同类型(子数组的4种不同大小),然后发送计数都将为1,唯一剩下的部分是找出发送位移量(与scatterv不同,这里以字节为单位,因为没有单个类型可用作单位):
MPI_Datatype blocktypes[4];
int subsizes[2];
int starts[2] = {0,0};
for (int i=0; i<2; i++) {
subsizes[0] = blocksize+i;
for (int j=0; j<2; j++) {
subsizes[1] = blocksize+j;
MPI_Type_create_subarray(2, globalsizes, subsizes, starts, MPI_ORDER_C, MPI_CHAR, &blocktypes[2*i+j]);
MPI_Type_commit(&blocktypes[2*i+j]);
}
}
for (int proc=0; proc<size; proc++) {
int row, col;
rowcol(proc, blocks, &row, &col);
sendcounts[proc] = 1;
senddispls[proc] = (row*blocksize*globalsizes[1] + col*blocksize)*sizeof(char);
int idx = typeIdx(row, col, blocks);
sendtypes[proc] = blocktypes[idx];
}
}
MPI_Alltoallw(globalptr, sendcounts, senddispls, sendtypes,
&(localdata[0][0]), recvcounts, recvdispls, recvtypes,
MPI_COMM_WORLD);
这将起作用。
但问题在于Alltoallw函数非常通用,实现起来很难进行优化;因此,我会惊讶地发现它的性能不如等大小块的散射。
因此,另一种方法是进行两个阶段的通信。
最简单的方法是注意到你可以通过单个MPI_Scatterv()调用几乎将所有数据发送到目标位置:在你的示例中,如果我们按照列=1和行=3(域中大多数块的行数)的单个列向量单位操作,你可以将几乎所有全局数据分散到其他处理器。每个处理器都会收到3或4个这些向量,这样就可以分配除全局数组的最后一行以外的所有数据,最后一行则可以通过简单的第二个scatterv处理。如下所示:
MPI_Datatype vec, localvec;
MPI_Type_vector(blocksize, 1, localsizes[1], MPI_CHAR, &localvec);
MPI_Type_create_resized(localvec, 0, sizeof(char), &localvec);
MPI_Type_commit(&localvec);
MPI_Type_vector(blocksize, 1, globalsizes[1], MPI_CHAR, &vec);
MPI_Type_create_resized(vec, 0, sizeof(char), &vec);
MPI_Type_commit(&vec);
if (rank == 0) {
for (int proc=0; proc<size; proc++) {
int row, col;
rowcol(proc, blocks, &row, &col);
sendcounts[proc] = isLastCol(col, blocks) ? blocksize+1 : blocksize;
senddispls[proc] = (row*blocksize*globalsizes[1] + col*blocksize);
}
}
recvcounts = localsizes[1];
MPI_Scatterv(globalptr, sendcounts, senddispls, vec,
&(localdata[0][0]), recvcounts, localvec, 0, MPI_COMM_WORLD);
MPI_Type_free(&localvec);
if (rank == 0)
MPI_Type_free(&vec);
if (rank == 0) {
for (int proc=0; proc<size; proc++) {
int row, col;
rowcol(proc, blocks, &row, &col);
sendcounts[proc] = 0;
senddispls[proc] = 0;
if ( isLastRow(row,blocks) ) {
sendcounts[proc] = blocksize;
senddispls[proc] = (globalsizes[0]-1)*globalsizes[1]+col*blocksize;
if ( isLastCol(col,blocks) )
sendcounts[proc] += 1;
}
}
}
recvcounts = 0;
if ( isLastRow(myrow, blocks) ) {
recvcounts = blocksize;
if ( isLastCol(mycol, blocks) )
recvcounts++;
}
MPI_Scatterv(globalptr, sendcounts, senddispls, MPI_CHAR,
&(localdata[blocksize][0]), recvcounts, MPI_CHAR, 0, MPI_COMM_WORLD);
迄今为止,一切都很好。但是在最后的“清理”scatterv期间,大多数处理器闲置是很遗憾的。因此,更好的方法是在第一阶段中分散所有行,然后在第二阶段中将该数据分散到列中。在这里,我们创建了新的通信器,每个处理器属于两个新的通信器,一个代表同一块行中的其他处理器,另一个代表同一块列中的其他处理器。在第一步中,源处理器将全局数组的所有行分配给同一列通信器中的其他处理器 - 这可以在单个scatterv中完成。然后,这些处理器使用与前面示例中相同的列数据类型和单个scatterv将列分散到与它相同块行中的每个处理器中。结果是两个相当简单的scatterv分发所有数据:
MPI_Comm colComm, rowComm;
MPI_Comm_split(MPI_COMM_WORLD, myrow, rank, &rowComm);
MPI_Comm_split(MPI_COMM_WORLD, mycol, rank, &colComm);
if (mycol == 0) {
int sendcounts[ blocks[0] ];
int senddispls[ blocks[0] ];
senddispls[0] = 0;
for (int row=0; row<blocks[0]; row++) {
sendcounts[row] = blocksize*globalsizes[1];
if (row > 0)
senddispls[row] = senddispls[row-1] + sendcounts[row-1];
}
sendcounts[blocks[0]-1] += globalsizes[1];
rowdata = allocchar2darray( sendcounts[myrow], globalsizes[1] );
MPI_Scatterv(globalptr, sendcounts, senddispls, MPI_CHAR,
&(rowdata[0][0]), sendcounts[myrow], MPI_CHAR, 0, colComm);
}
int locnrows = blocksize;
if ( isLastRow(myrow, blocks) )
locnrows++;
MPI_Datatype vec, localvec;
MPI_Type_vector(locnrows, 1, globalsizes[1], MPI_CHAR, &vec);
MPI_Type_create_resized(vec, 0, sizeof(char), &vec);
MPI_Type_commit(&vec);
MPI_Type_vector(locnrows, 1, localsizes[1], MPI_CHAR, &localvec);
MPI_Type_create_resized(localvec, 0, sizeof(char), &localvec);
MPI_Type_commit(&localvec);
int sendcounts[ blocks[1] ];
int senddispls[ blocks[1] ];
if (mycol == 0) {
for (int col=0; col<blocks[1]; col++) {
sendcounts[col] = isLastCol(col, blocks) ? blocksize+1 : blocksize;
senddispls[col] = col*blocksize;
}
}
char *rowptr = (mycol == 0) ? &(rowdata[0][0]) : NULL;
MPI_Scatterv(rowptr, sendcounts, senddispls, vec,
&(localdata[0][0]), sendcounts[mycol], localvec, 0, rowComm);
这个方法比较简单,应该在性能和鲁棒性之间取得了相对平衡。
运行这三种方法都可以:
bash-3.2$ mpirun -np 6 ./allmethods alltoall
Global array:
abcdefg
hijklmn
opqrstu
vwxyzab
cdefghi
jklmnop
qrstuvw
xyzabcd
efghijk
lmnopqr
Method - alltoall
Rank 0:
abc
hij
opq
Rank 1:
defg
klmn
rstu
Rank 2:
vwx
cde
jkl
Rank 3:
yzab
fghi
mnop
Rank 4:
qrs
xyz
efg
lmn
Rank 5:
tuvw
abcd
hijk
opqr
bash-3.2$ mpirun -np 6 ./allmethods twophasevecs
Global array:
abcdefg
hijklmn
opqrstu
vwxyzab
cdefghi
jklmnop
qrstuvw
xyzabcd
efghijk
lmnopqr
Method - two phase, vectors, then cleanup
Rank 0:
abc
hij
opq
Rank 1:
defg
klmn
rstu
Rank 2:
vwx
cde
jkl
Rank 3:
yzab
fghi
mnop
Rank 4:
qrs
xyz
efg
lmn
Rank 5:
tuvw
abcd
hijk
opqr
bash-3.2$ mpirun -np 6 ./allmethods twophaserowcol
Global array:
abcdefg
hijklmn
opqrstu
vwxyzab
cdefghi
jklmnop
qrstuvw
xyzabcd
efghijk
lmnopqr
Method - two phase - row, cols
Rank 0:
abc
hij
opq
Rank 1:
defg
klmn
rstu
Rank 2:
vwx
cde
jkl
Rank 3:
yzab
fghi
mnop
Rank 4:
qrs
xyz
efg
lmn
Rank 5:
tuvw
abcd
hijk
opqr
实现这些方法的代码如下;您可以将块大小设置为适合您问题的更典型的大小,并在适量的处理器上运行,以了解哪种方法对于您的应用程序最佳。
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "mpi.h"
char **allocchar2darray(int n, int m);
void freechar2darray(char **a);
void printarray(char **data, int n, int m);
void rowcol(int rank, const int blocks[2], int *row, int *col);
int isLastRow(int row, const int blocks[2]);
int isLastCol(int col, const int blocks[2]);
int typeIdx(int row, int col, const int blocks[2]);
void alltoall(const int myrow, const int mycol, const int rank, const int size,
const int blocks[2], const int blocksize, const int globalsizes[2], const int localsizes[2],
const char *const globalptr, char **localdata) {
int sendcounts[ size ];
int senddispls[ size ];
MPI_Datatype sendtypes[size];
int recvcounts[ size ];
int recvdispls[ size ];
MPI_Datatype recvtypes[size];
for (int proc=0; proc<size; proc++) {
recvcounts[proc] = 0;
recvdispls[proc] = 0;
recvtypes[proc] = MPI_CHAR;
sendcounts[proc] = 0;
senddispls[proc] = 0;
sendtypes[proc] = MPI_CHAR;
}
recvcounts[0] = localsizes[0]*localsizes[1];
recvdispls[0] = 0;
if (rank == 0) {
MPI_Datatype blocktypes[4];
int subsizes[2];
int starts[2] = {0,0};
for (int i=0; i<2; i++) {
subsizes[0] = blocksize+i;
for (int j=0; j<2; j++) {
subsizes[1] = blocksize+j;
MPI_Type_create_subarray(2, globalsizes, subsizes, starts, MPI_ORDER_C, MPI_CHAR, &blocktypes[2*i+j]);
MPI_Type_commit(&blocktypes[2*i+j]);
}
}
for (int proc=0; proc<size; proc++) {
int row, col;
rowcol(proc, blocks, &row, &col);
sendcounts[proc] = 1;
senddispls[proc] = (row*blocksize*globalsizes[1] + col*blocksize)*sizeof(char);
int idx = typeIdx(row, col, blocks);
sendtypes[proc] = blocktypes[idx];
}
}
MPI_Alltoallw(globalptr, sendcounts, senddispls, sendtypes,
&(localdata[0][0]), recvcounts, recvdispls, recvtypes,
MPI_COMM_WORLD);
}
void twophasevecs(const int myrow, const int mycol, const int rank, const int size,
const int blocks[2], const int blocksize, const int globalsizes[2], const int localsizes[2],
const char *const globalptr, char **localdata) {
int sendcounts[ size ];
int senddispls[ size ];
int recvcounts;
for (int proc=0; proc<size; proc++) {
sendcounts[proc] = 0;
senddispls[proc] = 0;
}
MPI_Datatype vec, localvec;
MPI_Type_vector(blocksize, 1, localsizes[1], MPI_CHAR, &localvec);
MPI_Type_create_resized(localvec, 0, sizeof(char), &localvec);
MPI_Type_commit(&localvec);
MPI_Type_vector(blocksize, 1, globalsizes[1], MPI_CHAR, &vec);
MPI_Type_create_resized(vec, 0, sizeof(char), &vec);
MPI_Type_commit(&vec);
if (rank == 0) {
for (int proc=0; proc<size; proc++) {
int row, col;
rowcol(proc, blocks, &row, &col);
sendcounts[proc] = isLastCol(col, blocks) ? blocksize+1 : blocksize;
senddispls[proc] = (row*blocksize*globalsizes[1] + col*blocksize);
}
}
recvcounts = localsizes[1];
MPI_Scatterv(globalptr, sendcounts, senddispls, vec,
&(localdata[0][0]), recvcounts, localvec, 0, MPI_COMM_WORLD);
MPI_Type_free(&localvec);
if (rank == 0)
MPI_Type_free(&vec);
if (rank == 0) {
for (int proc=0; proc<size; proc++) {
int row, col;
rowcol(proc, blocks, &row, &col);
sendcounts[proc] = 0;
senddispls[proc] = 0;
if ( isLastRow(row,blocks) ) {
sendcounts[proc] = blocksize;
senddispls[proc] = (globalsizes[0]-1)*globalsizes[1]+col*blocksize;
if ( isLastCol(col,blocks) )
sendcounts[proc] += 1;
}
}
}
recvcounts = 0;
if ( isLastRow(myrow, blocks) ) {
recvcounts = blocksize;
if ( isLastCol(mycol, blocks) )
recvcounts++;
}
MPI_Scatterv(globalptr, sendcounts, senddispls, MPI_CHAR,
&(localdata[blocksize][0]), recvcounts, MPI_CHAR, 0, MPI_COMM_WORLD);
}
void twophaseRowCol(const int myrow, const int mycol, const int rank, const int size,
const int blocks[2], const int blocksize, const int globalsizes[2], const int localsizes[2],
const char *const globalptr, char **localdata) {
char **rowdata ;
MPI_Comm colComm, rowComm;
MPI_Comm_split(MPI_COMM_WORLD, myrow, rank, &rowComm);
MPI_Comm_split(MPI_COMM_WORLD, mycol, rank, &colComm);
if (mycol == 0) {
int sendcounts[ blocks[0] ];
int senddispls[ blocks[0] ];
senddispls[0] = 0;
for (int row=0; row<blocks[0]; row++) {
sendcounts[row] = blocksize*globalsizes[1];
if (row > 0)
senddispls[row] = senddispls[row-1] + sendcounts[row-1];
}
sendcounts[blocks[0]-1] += globalsizes[1];
rowdata = allocchar2darray( sendcounts[myrow], globalsizes[1] );
MPI_Scatterv(globalptr, sendcounts, senddispls, MPI_CHAR,
&(rowdata[0][0]), sendcounts[myrow], MPI_CHAR, 0, colComm);
}
int locnrows = blocksize;
if ( isLastRow(myrow, blocks) )
locnrows++;
MPI_Datatype vec, localvec;
MPI_Type_vector(locnrows, 1, globalsizes[1], MPI_CHAR, &vec);
MPI_Type_create_resized(vec, 0, sizeof(char), &vec);
MPI_Type_commit(&vec);
MPI_Type_vector(locnrows, 1, localsizes[1], MPI_CHAR, &localvec);
MPI_Type_create_resized(localvec, 0, sizeof(char), &localvec);
MPI_Type_commit(&localvec);
int sendcounts[ blocks[1] ];
int senddispls[ blocks[1] ];
if (mycol == 0) {
for (int col=0; col<blocks[1]; col++) {
sendcounts[col] = isLastCol(col, blocks) ? blocksize+1 : blocksize;
senddispls[col] = col*blocksize;
}
}
char *rowptr = (mycol == 0) ? &(rowdata[0][0]) : NULL;
MPI_Scatterv(rowptr, sendcounts, senddispls, vec,
&(localdata[0][0]), sendcounts[mycol], localvec, 0, rowComm);
MPI_Type_free(&localvec);
MPI_Type_free(&vec);
if (mycol == 0)
freechar2darray(rowdata);
MPI_Comm_free(&rowComm);
MPI_Comm_free(&colComm);
}
int main(int argc, char **argv) {
int rank, size;
int blocks[2] = {0,0};
const int blocksize=3;
int globalsizes[2], localsizes[2];
char **globaldata;
char *globalptr = NULL;
MPI_Init(&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &size);
if (rank == 0 && argc < 2) {
fprintf(stderr,"Usage: %s method\n Where method is one of: alltoall, twophasevecs, twophaserowcol\n", argv[0]);
MPI_Abort(MPI_COMM_WORLD,1);
}
MPI_Dims_create(size, 2, blocks);
int myrow, mycol;
rowcol(rank, blocks, &myrow, &mycol);
globalsizes[0] = blocks[0]*blocksize+1;
globalsizes[1] = blocks[1]*blocksize+1;
if (rank == 0) {
globaldata = allocchar2darray(globalsizes[0], globalsizes[1]);
globalptr = &(globaldata[0][0]);
for (int i=0; i<globalsizes[0]; i++)
for (int j=0; j<globalsizes[1]; j++)
globaldata[i][j] = 'a'+(i*globalsizes[1] + j)%26;
printf("Global array: \n");
printarray(globaldata, globalsizes[0], globalsizes[1]);
}
localsizes[0] = blocksize; localsizes[1] = blocksize;
if ( isLastRow(myrow,blocks)) localsizes[0]++;
if ( isLastCol(mycol,blocks)) localsizes[1]++;
char **localdata = allocchar2darray(localsizes[0],localsizes[1]);
if (!strcasecmp(argv[1], "alltoall")) {
if (rank == 0) printf("Method - alltoall\n");
alltoall(myrow, mycol, rank, size, blocks, blocksize, globalsizes, localsizes, globalptr, localdata);
} else if (!strcasecmp(argv[1],"twophasevecs")) {
if (rank == 0) printf("Method - two phase, vectors, then cleanup\n");
twophasevecs(myrow, mycol, rank, size, blocks, blocksize, globalsizes, localsizes, globalptr, localdata);
} else {
if (rank == 0) printf("Method - two phase - row, cols\n");
twophaseRowCol(myrow, mycol, rank, size, blocks, blocksize, globalsizes, localsizes, globalptr, localdata);
}
for (int proc=0; proc<size; proc++) {
if (proc == rank) {
printf("\nRank %d:\n", proc);
printarray(localdata, localsizes[0], localsizes[1]);
}
MPI_Barrier(MPI_COMM_WORLD);
}
freechar2darray(localdata);
if (rank == 0)
freechar2darray(globaldata);
MPI_Finalize();
return 0;
}
char **allocchar2darray(int n, int m) {
char **ptrs = malloc(n*sizeof(char *));
ptrs[0] = malloc(n*m*sizeof(char));
for (int i=0; i<n*m; i++)
ptrs[0][i]='.';
for (int i=1; i<n; i++)
ptrs[i] = ptrs[i-1] + m;
return ptrs;
}
void freechar2darray(char **a) {
free(a[0]);
free(a);
}
void printarray(char **data, int n, int m) {
for (int i=0; i<n; i++) {
for (int j=0; j<m; j++)
putchar(data[i][j]);
putchar('\n');
}
}
void rowcol(int rank, const int blocks[2], int *row, int *col) {
*row = rank/blocks[1];
*col = rank % blocks[1];
}
int isLastRow(int row, const int blocks[2]) {
return (row == blocks[0]-1);
}
int isLastCol(int col, const int blocks[2]) {
return (col == blocks[1]-1);
}
int typeIdx(int row, int col, const int blocks[2]) {
int lastrow = (row == blocks[0]-1);
int lastcol = (col == blocks[1]-1);
return lastrow*2 + lastcol;
}
MPI_Type_indexed
不适用,因为一个单一的类型仍然只能对应一个确定大小的块。 - Roun