如何将2D数组的块发送到不同的处理器? 假设2D数组的大小为400x400,我想将100X100大小的块发送到不同的处理器。 这个想法是每个处理器将在其单独的块上执行计算,并将其结果发送回第一个处理器以获得最终结果。
我在C程序中使用MPI。
如何将2D数组的块发送到不同的处理器? 假设2D数组的大小为400x400,我想将100X100大小的块发送到不同的处理器。 这个想法是每个处理器将在其单独的块上执行计算,并将其结果发送回第一个处理器以获得最终结果。
我在C程序中使用MPI。
[01234567]
。你可以让任务0发送四个消息(包括一个给自己),来进行分发,并在重新组合时接收四个消息;但这显然在大量进程下会变得非常耗时。对于这些操作,有优化的例程——散布/收集操作。因此,在这种1D情况下,你需要做的是:int global[8]; /* only task 0 has this */
int local[2]; /* everyone has this */
const int root = 0; /* the processor with the initial global data */
if (rank == root) {
for (int i=0; i<7; i++) global[i] = i;
}
MPI_Scatter(global, 2, MPI_INT, /* send everyone 2 ints from global */
local, 2, MPI_INT, /* each proc receives 2 ints into local */
root, MPI_COMM_WORLD); /* sending process is root, all procs in */
/* MPI_COMM_WORLD participate */
task 0: local:[01] global: [01234567]
task 1: local:[23] global: [garbage-]
task 2: local:[45] global: [garbage-]
task 3: local:[67] global: [garbage-]
也就是说,scatter操作将全局数组分成连续的2个整数块,并将它们发送到所有处理器。
为了重新组合数组,我们使用MPI_Gather()操作,其工作方式完全相同,但是反向执行:
for (int i=0; i<2; i++)
local[i] = local[i] + rank;
MPI_Gather(local, 2, MPI_INT, /* everyone sends 2 ints from local */
global, 2, MPI_INT, /* root receives 2 ints each proc into global */
root, MPI_COMM_WORLD); /* recv'ing process is root, all procs in */
/* MPI_COMM_WORLD participate */
现在数据看起来像这样
task 0: local:[01] global: [0134679a]
task 1: local:[34] global: [garbage-]
task 2: local:[67] global: [garbage-]
task 3: local:[9a] global: [garbage-]
Gather将所有数据收集回来,这里a的值为10,因为我在开始这个例子时没有仔细考虑我的格式。
如果数据点的数量不能被进程数整除,我们需要向每个进程发送不同数量的数据怎么办?那么你需要一个广义版本的scatter,MPI_Scatterv()
,它可以让你指定每个处理器的计数和位移——即该数据片段在全局数组中的起始位置。假设你有一个包含9个字符的字符数组[abcdefghi]
,你要分配给每个进程两个字符,最后一个进程得到三个字符。那么你就需要:
char global[9]; /* only task 0 has this */
char local[3]={'-','-','-'}; /* everyone has this */
int mynum; /* how many items */
const int root = 0; /* the processor with the initial global data */
if (rank == 0) {
for (int i=0; i<8; i++) global[i] = 'a'+i;
}
int counts[4] = {2,2,2,3}; /* how many pieces of data everyone has */
mynum = counts[rank];
int displs[4] = {0,2,4,6}; /* the starting point of everyone's data */
/* in the global array */
MPI_Scatterv(global, counts, displs, /* proc i gets counts[i] pts from displs[i] */
MPI_INT,
local, mynum, MPI_INT; /* I'm receiving mynum MPI_INTs into local */
root, MPI_COMM_WORLD);
task 0: local:[ab-] global: [abcdefghi]
task 1: local:[cd-] global: [garbage--]
task 2: local:[ef-] global: [garbage--]
task 3: local:[ghi] global: [garbage--]
现在,对于二维数组,这有点棘手。如果我们想要发送2D数组的2D子块,则我们现在发送的数据不再是连续的。如果我们将6x6数组的3x3子块发送到4个处理器(例如),我们正在发送的数据中有空洞:
2D Array
---------
|000|111|
|000|111|
|000|111|
|---+---|
|222|333|
|222|333|
|222|333|
---------
Actual layout in memory
[000111000111000111222333222333222333]
MPI_Datatype newtype;
int sizes[2] = {6,6}; /* size of global array */
int subsizes[2] = {3,3}; /* size of sub-region */
int starts[2] = {0,0}; /* let's say we're looking at region "0",
which begins at index [0,0] */
MPI_Type_create_subarray(2, sizes, subsizes, starts, MPI_ORDER_C, MPI_INT, &newtype);
MPI_Type_commit(&newtype);
这将创建一个类型,仅从全局数组中选择区域“0”;现在我们可以将该数据片段发送到另一个处理器。
MPI_Send(&(global[0][0]), 1, newtype, dest, tag, MPI_COMM_WORLD); /* region "0" */
接收进程可以将其接收到本地数组中。请注意,如果接收进程只将其接收到一个3x3数组中,则无法将其描述为 newtype
类型;那不再描述内存布局。相反,它只是接收一个3 * 3 = 9个整数的块:
MPI_Recv(&(local[0][0]), 3*3, MPI_INT, 0, tag, MPI_COMM_WORLD);
start
数组)的其他块,或仅通过在特定块的起始点发送来实现。 MPI_Send(&(global[0][3]), 1, newtype, dest, tag, MPI_COMM_WORLD); /* region "1" */
MPI_Send(&(global[3][0]), 1, newtype, dest, tag, MPI_COMM_WORLD); /* region "2" */
MPI_Send(&(global[3][3]), 1, newtype, dest, tag, MPI_COMM_WORLD); /* region "3" */
&(global[0][0])
和 &(local[0][0])
(或者等价地,*global
和 *local
)指向连续的6*6和3*3内存块;这不是通过通常的动态多维数组分配方式所保证的。下面展示了如何做到这一点。MPI_Scatter()
(甚至是scatterv),因为这些类型的范围是16个整数;也就是说,它们结束的地方比它们开始的地方晚了16个整数——而且它们结束的地方与下一个块开始的地方不对齐,所以我们不能直接使用scatter——它会选择错误的位置开始发送数据到下一个处理器。MPI_Scatterv()
并自己指定位移,这就是我们将要做的——除了位移是以发送类型大小的单位表示的,这也没有帮助;这些块从全局数组的起始处偏移(0,3,18,21)个整数开始,事实上块结束于距离它开始的地方16个整数的位置,并不能让我们以整数倍的方式表示这些位移。 MPI_Datatype type, resizedtype;
int sizes[2] = {6,6}; /* size of global array */
int subsizes[2] = {3,3}; /* size of sub-region */
int starts[2] = {0,0}; /* let's say we're looking at region "0",
which begins at index [0,0] */
/* as before */
MPI_Type_create_subarray(2, sizes, subsizes, starts, MPI_ORDER_C, MPI_INT, &type);
/* change the extent of the type */
MPI_Type_create_resized(type, 0, 3*sizeof(int), &resizedtype);
MPI_Type_commit(&resizedtype);
MPI_Type_commit
才能使用该类型;但您只需要提交实际使用的最终类型,而不是任何中间步骤。当完成时,使用MPI_Type_free
释放类型。int counts[4] = {1,1,1,1}; /* how many pieces of data everyone has, in units of blocks */
int displs[4] = {0,1,6,7}; /* the starting point of everyone's data */
/* in the global array, in block extents */
MPI_Scatterv(global, counts, displs, /* proc i gets counts[i] types from displs[i] */
resizedtype,
local, 3*3, MPI_INT; /* I'm receiving 3*3 MPI_INTs into local */
root, MPI_COMM_WORLD);
现在我们完成了对scatter、gather和MPI派生类型的简要介绍。
以下是一个示例代码,展示了使用字符数组进行gather和scatter操作。运行程序:
$ mpirun -n 4 ./gathervarray
Global array is:
0123456789
3456789012
6789012345
9012345678
2345678901
5678901234
8901234567
1234567890
4567890123
7890123456
Local process on rank 0 is:
|01234|
|34567|
|67890|
|90123|
|23456|
Local process on rank 1 is:
|56789|
|89012|
|12345|
|45678|
|78901|
Local process on rank 2 is:
|56789|
|89012|
|12345|
|45678|
|78901|
Local process on rank 3 is:
|01234|
|34567|
|67890|
|90123|
|23456|
Processed grid:
AAAAABBBBB
AAAAABBBBB
AAAAABBBBB
AAAAABBBBB
AAAAABBBBB
CCCCCDDDDD
CCCCCDDDDD
CCCCCDDDDD
CCCCCDDDDD
CCCCCDDDDD
并且代码如下。
#include <stdio.h>
#include <math.h>
#include <stdlib.h>
#include "mpi.h"
int malloc2dchar(char ***array, int n, int m) {
/* allocate the n*m contiguous items */
char *p = (char *)malloc(n*m*sizeof(char));
if (!p) return -1;
/* allocate the row pointers into the memory */
(*array) = (char **)malloc(n*sizeof(char*));
if (!(*array)) {
free(p);
return -1;
}
/* set up the pointers into the contiguous memory */
for (int i=0; i<n; i++)
(*array)[i] = &(p[i*m]);
return 0;
}
int free2dchar(char ***array) {
/* free the memory - the first element of the array is at the start */
free(&((*array)[0][0]));
/* free the pointers into the memory */
free(*array);
return 0;
}
int main(int argc, char **argv) {
char **global, **local;
const int gridsize=10; // size of grid
const int procgridsize=2; // size of process grid
int rank, size; // rank of current process and no. of processes
MPI_Init(&argc, &argv);
MPI_Comm_size(MPI_COMM_WORLD, &size);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
if (size != procgridsize*procgridsize) {
fprintf(stderr,"%s: Only works with np=%d for now\n", argv[0], procgridsize);
MPI_Abort(MPI_COMM_WORLD,1);
}
if (rank == 0) {
/* fill in the array, and print it */
malloc2dchar(&global, gridsize, gridsize);
for (int i=0; i<gridsize; i++) {
for (int j=0; j<gridsize; j++)
global[i][j] = '0'+(3*i+j)%10;
}
printf("Global array is:\n");
for (int i=0; i<gridsize; i++) {
for (int j=0; j<gridsize; j++)
putchar(global[i][j]);
printf("\n");
}
}
/* create the local array which we'll process */
malloc2dchar(&local, gridsize/procgridsize, gridsize/procgridsize);
/* create a datatype to describe the subarrays of the global array */
int sizes[2] = {gridsize, gridsize}; /* global size */
int subsizes[2] = {gridsize/procgridsize, gridsize/procgridsize}; /* local size */
int starts[2] = {0,0}; /* where this one starts */
MPI_Datatype type, subarrtype;
MPI_Type_create_subarray(2, sizes, subsizes, starts, MPI_ORDER_C, MPI_CHAR, &type);
MPI_Type_create_resized(type, 0, gridsize/procgridsize*sizeof(char), &subarrtype);
MPI_Type_commit(&subarrtype);
char *globalptr=NULL;
if (rank == 0) globalptr = &(global[0][0]);
/* scatter the array to all processors */
int sendcounts[procgridsize*procgridsize];
int displs[procgridsize*procgridsize];
if (rank == 0) {
for (int i=0; i<procgridsize*procgridsize; i++) sendcounts[i] = 1;
int disp = 0;
for (int i=0; i<procgridsize; i++) {
for (int j=0; j<procgridsize; j++) {
displs[i*procgridsize+j] = disp;
disp += 1;
}
disp += ((gridsize/procgridsize)-1)*procgridsize;
}
}
MPI_Scatterv(globalptr, sendcounts, displs, subarrtype, &(local[0][0]),
gridsize*gridsize/(procgridsize*procgridsize), MPI_CHAR,
0, MPI_COMM_WORLD);
/* now all processors print their local data: */
for (int p=0; p<size; p++) {
if (rank == p) {
printf("Local process on rank %d is:\n", rank);
for (int i=0; i<gridsize/procgridsize; i++) {
putchar('|');
for (int j=0; j<gridsize/procgridsize; j++) {
putchar(local[i][j]);
}
printf("|\n");
}
}
MPI_Barrier(MPI_COMM_WORLD);
}
/* now each processor has its local array, and can process it */
for (int i=0; i<gridsize/procgridsize; i++) {
for (int j=0; j<gridsize/procgridsize; j++) {
local[i][j] = 'A' + rank;
}
}
/* it all goes back to process 0 */
MPI_Gatherv(&(local[0][0]), gridsize*gridsize/(procgridsize*procgridsize), MPI_CHAR,
globalptr, sendcounts, displs, subarrtype,
0, MPI_COMM_WORLD);
/* don't need the local data anymore */
free2dchar(&local);
/* or the MPI data type */
MPI_Type_free(&subarrtype);
if (rank == 0) {
printf("Processed grid:\n");
for (int i=0; i<gridsize; i++) {
for (int j=0; j<gridsize; j++) {
putchar(global[i][j]);
}
printf("\n");
}
free2dchar(&global);
}
MPI_Finalize();
return 0;
}
#include <stdio.h>
#include <math.h>
#include <stdlib.h>
#include "mpi.h"
/*
This is a version with integers, rather than char arrays, presented in this
very good answer: https://dev59.com/Q2ox5IYBdhLWcg3wVS8J#9271753
It will initialize the 2D array, scatter it, increase every value by 1 and then gather it back.
*/
int malloc2D(int ***array, int n, int m) {
int i;
/* allocate the n*m contiguous items */
int *p = malloc(n*m*sizeof(int));
if (!p) return -1;
/* allocate the row pointers into the memory */
(*array) = malloc(n*sizeof(int*));
if (!(*array)) {
free(p);
return -1;
}
/* set up the pointers into the contiguous memory */
for (i=0; i<n; i++)
(*array)[i] = &(p[i*m]);
return 0;
}
int free2D(int ***array) {
/* free the memory - the first element of the array is at the start */
free(&((*array)[0][0]));
/* free the pointers into the memory */
free(*array);
return 0;
}
int main(int argc, char **argv) {
int **global, **local;
const int gridsize=4; // size of grid
const int procgridsize=2; // size of process grid
int rank, size; // rank of current process and no. of processes
int i, j, p;
MPI_Init(&argc, &argv);
MPI_Comm_size(MPI_COMM_WORLD, &size);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
if (size != procgridsize*procgridsize) {
fprintf(stderr,"%s: Only works with np=%d for now\n", argv[0], procgridsize);
MPI_Abort(MPI_COMM_WORLD,1);
}
if (rank == 0) {
/* fill in the array, and print it */
malloc2D(&global, gridsize, gridsize);
int counter = 0;
for (i=0; i<gridsize; i++) {
for (j=0; j<gridsize; j++)
global[i][j] = ++counter;
}
printf("Global array is:\n");
for (i=0; i<gridsize; i++) {
for (j=0; j<gridsize; j++) {
printf("%2d ", global[i][j]);
}
printf("\n");
}
}
//return;
/* create the local array which we'll process */
malloc2D(&local, gridsize/procgridsize, gridsize/procgridsize);
/* create a datatype to describe the subarrays of the global array */
int sizes[2] = {gridsize, gridsize}; /* global size */
int subsizes[2] = {gridsize/procgridsize, gridsize/procgridsize}; /* local size */
int starts[2] = {0,0}; /* where this one starts */
MPI_Datatype type, subarrtype;
MPI_Type_create_subarray(2, sizes, subsizes, starts, MPI_ORDER_C, MPI_INT, &type);
MPI_Type_create_resized(type, 0, gridsize/procgridsize*sizeof(int), &subarrtype);
MPI_Type_commit(&subarrtype);
int *globalptr=NULL;
if (rank == 0)
globalptr = &(global[0][0]);
/* scatter the array to all processors */
int sendcounts[procgridsize*procgridsize];
int displs[procgridsize*procgridsize];
if (rank == 0) {
for (i=0; i<procgridsize*procgridsize; i++)
sendcounts[i] = 1;
int disp = 0;
for (i=0; i<procgridsize; i++) {
for (j=0; j<procgridsize; j++) {
displs[i*procgridsize+j] = disp;
disp += 1;
}
disp += ((gridsize/procgridsize)-1)*procgridsize;
}
}
MPI_Scatterv(globalptr, sendcounts, displs, subarrtype, &(local[0][0]),
gridsize*gridsize/(procgridsize*procgridsize), MPI_INT,
0, MPI_COMM_WORLD);
/* now all processors print their local data: */
for (p=0; p<size; p++) {
if (rank == p) {
printf("Local process on rank %d is:\n", rank);
for (i=0; i<gridsize/procgridsize; i++) {
putchar('|');
for (j=0; j<gridsize/procgridsize; j++) {
printf("%2d ", local[i][j]);
}
printf("|\n");
}
}
MPI_Barrier(MPI_COMM_WORLD);
}
/* now each processor has its local array, and can process it */
for (i=0; i<gridsize/procgridsize; i++) {
for (j=0; j<gridsize/procgridsize; j++) {
local[i][j] += 1; // increase by one the value
}
}
/* it all goes back to process 0 */
MPI_Gatherv(&(local[0][0]), gridsize*gridsize/(procgridsize*procgridsize), MPI_INT,
globalptr, sendcounts, displs, subarrtype,
0, MPI_COMM_WORLD);
/* don't need the local data anymore */
free2D(&local);
/* or the MPI data type */
MPI_Type_free(&subarrtype);
if (rank == 0) {
printf("Processed grid:\n");
for (i=0; i<gridsize; i++) {
for (j=0; j<gridsize; j++) {
printf("%2d ", global[i][j]);
}
printf("\n");
}
free2D(&global);
}
MPI_Finalize();
return 0;
}
输出:
linux16:>mpicc -o main main.c
linux16:>mpiexec -n 4 main Global array is:
1 2 3 4
5 6 7 8
9 10 11 12
13 14 15 16
Local process on rank 0 is:
| 1 2 |
| 5 6 |
Local process on rank 1 is:
| 3 4 |
| 7 8 |
Local process on rank 2 is:
| 9 10 |
|13 14 |
Local process on rank 3 is:
|11 12 |
|15 16 |
Processed grid:
2 3 4 5
6 7 8 9
10 11 12 13
14 15 16 17