使用MPI在C中发送2D数组的块

55

如何将2D数组的块发送到不同的处理器? 假设2D数组的大小为400x400,我想将100X100大小的块发送到不同的处理器。 这个想法是每个处理器将在其单独的块上执行计算,并将其结果发送回第一个处理器以获得最终结果。
我在C程序中使用MPI。

2个回答

158
让我先说一下,通常情况下你并不想这样做——从“主”进程散布和收集大块数据。通常情况下,你希望每个任务都在解决自己的问题,而且你应该尽量避免一个处理器需要整个数据的“全局视图”;一旦你需要这样做,就会限制可扩展性和问题规模。如果你是为了I/O而这样做——一个进程读取数据,然后将其散布,再将其收集回来进行写入,那么你最终需要考虑使用MPI-IO。
不过,回到你的问题,MPI有非常好的方法可以从内存中提取任意数据,并将其散布/收集到一组处理器中。不幸的是,这需要相当多的MPI概念——MPI类型、范围和集合操作。很多基本思想在这个问题的答案中都有所讨论——MPI_Type_create_subarray和MPI_Gather更新——在清晨的阳光下,这是很多代码而不是很多解释。所以让我稍微扩展一下。
考虑一个1D整数全局数组,任务0拥有它,你想把它分发给一些MPI任务,使它们每个人都在他们的本地数组中得到一个部分。假设你有4个任务,全局数组是[01234567]。你可以让任务0发送四个消息(包括一个给自己),来进行分发,并在重新组合时接收四个消息;但这显然在大量进程下会变得非常耗时。对于这些操作,有优化的例程——散布/收集操作。因此,在这种1D情况下,你需要做的是:
int global[8];   /* only task 0 has this */
int local[2];    /* everyone has this */
const int root = 0;   /* the processor with the initial global data */

if (rank == root) {
   for (int i=0; i<7; i++) global[i] = i;
}

MPI_Scatter(global, 2, MPI_INT,      /* send everyone 2 ints from global */
            local,  2, MPI_INT,      /* each proc receives 2 ints into local */
            root, MPI_COMM_WORLD);   /* sending process is root, all procs in */
                                     /* MPI_COMM_WORLD participate */

在此之后,处理器的数据看起来会像这样:
task 0:  local:[01]  global: [01234567]
task 1:  local:[23]  global: [garbage-]
task 2:  local:[45]  global: [garbage-]
task 3:  local:[67]  global: [garbage-]

也就是说,scatter操作将全局数组分成连续的2个整数块,并将它们发送到所有处理器。

为了重新组合数组,我们使用MPI_Gather()操作,其工作方式完全相同,但是反向执行:

for (int i=0; i<2; i++) 
   local[i] = local[i] + rank;

MPI_Gather(local,  2, MPI_INT,      /* everyone sends 2 ints from local */
           global, 2, MPI_INT,      /* root receives 2 ints each proc into global */
           root, MPI_COMM_WORLD);   /* recv'ing process is root, all procs in */
                                    /* MPI_COMM_WORLD participate */

现在数据看起来像这样

task 0:  local:[01]  global: [0134679a]
task 1:  local:[34]  global: [garbage-]
task 2:  local:[67]  global: [garbage-]
task 3:  local:[9a]  global: [garbage-]

Gather将所有数据收集回来,这里a的值为10,因为我在开始这个例子时没有仔细考虑我的格式。

如果数据点的数量不能被进程数整除,我们需要向每个进程发送不同数量的数据怎么办?那么你需要一个广义版本的scatter,MPI_Scatterv(),它可以让你指定每个处理器的计数和位移——即该数据片段在全局数组中的起始位置。假设你有一个包含9个字符的字符数组[abcdefghi],你要分配给每个进程两个字符,最后一个进程得到三个字符。那么你就需要:

char global[9];   /* only task 0 has this */
char local[3]={'-','-','-'};    /* everyone has this */
int  mynum;                     /* how many items */
const int root = 0;   /* the processor with the initial global data */

if (rank == 0) {
   for (int i=0; i<8; i++) global[i] = 'a'+i;
}

int counts[4] = {2,2,2,3};   /* how many pieces of data everyone has */
mynum = counts[rank];
int displs[4] = {0,2,4,6};   /* the starting point of everyone's data */
                             /* in the global array */

MPI_Scatterv(global, counts, displs, /* proc i gets counts[i] pts from displs[i] */
            MPI_INT,      
            local, mynum, MPI_INT;   /* I'm receiving mynum MPI_INTs into local */
            root, MPI_COMM_WORLD);

现在的数据看起来像这样:
task 0:  local:[ab-]  global: [abcdefghi]
task 1:  local:[cd-]  global: [garbage--]
task 2:  local:[ef-]  global: [garbage--]
task 3:  local:[ghi]  global: [garbage--]

您现在使用scatterv来分发不规则数量的数据。在每种情况下,位移量为从数组开头开始的两倍*秩(以字符为单位测量;对于散布或聚集发送的类型单位,位移量通常不是字节或其他单位),计数为{2,2,2,3}。如果我们想要第一个处理器有3个字符,我们会设置counts={3,2,2,2},位移量将为{0,3,5,7}。Gatherv的工作方式完全相同,但是反向。计数和位移量数组将保持不变。

现在,对于二维数组,这有点棘手。如果我们想要发送2D数组的2D子块,则我们现在发送的数据不再是连续的。如果我们将6x6数组的3x3子块发送到4个处理器(例如),我们正在发送的数据中有空洞:

2D Array

   ---------
   |000|111|
   |000|111|
   |000|111|
   |---+---|
   |222|333|
   |222|333|
   |222|333|
   ---------

Actual layout in memory

   [000111000111000111222333222333222333]

(请注意,所有高性能计算最终都归结为理解内存中数据的布局。)
如果我们想将标记为“1”的数据发送到任务1,则需要跳过三个值,发送三个值,跳过三个值,发送三个值,跳过三个值,发送三个值。 第二个复杂之处在于子区域的停止和开始位置;请注意,区域“1”不是从区域“0”的结束位置开始;在区域“0”的最后一个元素之后,内存中的下一个位置部分位于区域“1”中。
让我们先解决第一个布局问题,即如何仅提取要发送的数据。 我们可以将所有“0”区域数据复制到另一个连续数组中,并发送该数组;如果我们足够仔细地计划,甚至可以以这样的方式进行操作,以便我们可以对结果调用MPI_Scatter。 但是,我们不想以那种方式转置整个主数据结构。
到目前为止,我们使用的所有MPI数据类型都是简单的MPI_INT - 它指定了(例如)一行中的4个字节。 但是,MPI允许您创建自己的数据类型,描述内存中任意复杂的数据布局。 而这种情况 - 数组的矩形子区域 - 是如此常见,以至于有一个专门的调用用于它。 对于我们上面描述的二维情况,可以使用特定的MPI函数处理。
    MPI_Datatype newtype;
    int sizes[2]    = {6,6};  /* size of global array */
    int subsizes[2] = {3,3};  /* size of sub-region */
    int starts[2]   = {0,0};  /* let's say we're looking at region "0",
                                 which begins at index [0,0] */

    MPI_Type_create_subarray(2, sizes, subsizes, starts, MPI_ORDER_C, MPI_INT, &newtype);
    MPI_Type_commit(&newtype);

这将创建一个类型,仅从全局数组中选择区域“0”;现在我们可以将该数据片段发送到另一个处理器。

    MPI_Send(&(global[0][0]), 1, newtype, dest, tag, MPI_COMM_WORLD);  /* region "0" */

接收进程可以将其接收到本地数组中。请注意,如果接收进程只将其接收到一个3x3数组中,则无法将其描述为 newtype 类型;那不再描述内存布局。相反,它只是接收一个3 * 3 = 9个整数的块:

    MPI_Recv(&(local[0][0]), 3*3, MPI_INT, 0, tag, MPI_COMM_WORLD);

请注意,我们也可以为其他子区域执行此操作,方法是创建不同类型(具有不同的start数组)的其他块,或仅通过在特定块的起始点发送来实现。
    MPI_Send(&(global[0][3]), 1, newtype, dest, tag, MPI_COMM_WORLD);  /* region "1" */
    MPI_Send(&(global[3][0]), 1, newtype, dest, tag, MPI_COMM_WORLD);  /* region "2" */
    MPI_Send(&(global[3][3]), 1, newtype, dest, tag, MPI_COMM_WORLD);  /* region "3" */

最后,需要注意的是我们在这里要求全局和本地是连续的内存块;也就是说,&(global[0][0])&(local[0][0])(或者等价地,*global*local)指向连续的6*6和3*3内存块;这不是通过通常的动态多维数组分配方式所保证的。下面展示了如何做到这一点。
现在我们已经了解了如何指定子区域,在使用scatter/gather操作之前只剩下一件事情需要讨论,那就是这些类型的“大小”。我们现在还不能使用MPI_Scatter()(甚至是scatterv),因为这些类型的范围是16个整数;也就是说,它们结束的地方比它们开始的地方晚了16个整数——而且它们结束的地方与下一个块开始的地方不对齐,所以我们不能直接使用scatter——它会选择错误的位置开始发送数据到下一个处理器。
当然,我们可以使用MPI_Scatterv()并自己指定位移,这就是我们将要做的——除了位移是以发送类型大小的单位表示的,这也没有帮助;这些块从全局数组的起始处偏移(0,3,18,21)个整数开始,事实上块结束于距离它开始的地方16个整数的位置,并不能让我们以整数倍的方式表示这些位移。
为了解决这个问题,MPI允许您设置类型的范围以进行这些计算。它不截断类型;它只是用于确定给定上一个元素后下一个元素开始的位置。对于这些具有空洞的类型,通常很方便将范围设置为小于实际类型末尾内存距离的值。
我们可以将范围设置为任何方便我们的值。我们可以将范围设置为1个整数,然后按整数单位设置位移。在这种情况下,我喜欢将范围设置为3个整数——子行的大小——这样,块“1”紧随块“0”之后开始,块“3”紧随块“2”之后开始。不幸的是,从块“2”跳转到块“3”时并没有那么好的效果,但这是无法避免的。
因此,在这种情况下,我们要分散子块,需要执行以下操作:
    MPI_Datatype type, resizedtype;
    int sizes[2]    = {6,6};  /* size of global array */
    int subsizes[2] = {3,3};  /* size of sub-region */
    int starts[2]   = {0,0};  /* let's say we're looking at region "0",
                                 which begins at index [0,0] */

    /* as before */
    MPI_Type_create_subarray(2, sizes, subsizes, starts, MPI_ORDER_C, MPI_INT, &type);  
    /* change the extent of the type */
    MPI_Type_create_resized(type, 0, 3*sizeof(int), &resizedtype);
    MPI_Type_commit(&resizedtype);

在这里,我们创建了与之前相同的块类型,但我们已经调整了大小;我们没有改变类型“开始”的位置(0),但我们已经改变了它“结束”的位置(3个整数)。我们之前没有提到过,但是需要使用MPI_Type_commit才能使用该类型;但您只需要提交实际使用的最终类型,而不是任何中间步骤。当完成时,使用MPI_Type_free释放类型。
现在,最后,我们可以分散块:上面的数据操作有点复杂,但一旦完成,分散就像以前一样。
int counts[4] = {1,1,1,1};   /* how many pieces of data everyone has, in units of blocks */
int displs[4] = {0,1,6,7};   /* the starting point of everyone's data */
                             /* in the global array, in block extents */

MPI_Scatterv(global, counts, displs, /* proc i gets counts[i] types from displs[i] */
            resizedtype,      
            local, 3*3, MPI_INT;   /* I'm receiving 3*3 MPI_INTs into local */
            root, MPI_COMM_WORLD);

现在我们完成了对scatter、gather和MPI派生类型的简要介绍。

以下是一个示例代码,展示了使用字符数组进行gather和scatter操作。运行程序:

$ mpirun -n 4 ./gathervarray
Global array is:
0123456789
3456789012
6789012345
9012345678
2345678901
5678901234
8901234567
1234567890
4567890123
7890123456
Local process on rank 0 is:
|01234|
|34567|
|67890|
|90123|
|23456|
Local process on rank 1 is:
|56789|
|89012|
|12345|
|45678|
|78901|
Local process on rank 2 is:
|56789|
|89012|
|12345|
|45678|
|78901|
Local process on rank 3 is:
|01234|
|34567|
|67890|
|90123|
|23456|
Processed grid:
AAAAABBBBB
AAAAABBBBB
AAAAABBBBB
AAAAABBBBB
AAAAABBBBB
CCCCCDDDDD
CCCCCDDDDD
CCCCCDDDDD
CCCCCDDDDD
CCCCCDDDDD

并且代码如下。

#include <stdio.h>
#include <math.h>
#include <stdlib.h>
#include "mpi.h"

int malloc2dchar(char ***array, int n, int m) {

    /* allocate the n*m contiguous items */
    char *p = (char *)malloc(n*m*sizeof(char));
    if (!p) return -1;

    /* allocate the row pointers into the memory */
    (*array) = (char **)malloc(n*sizeof(char*));
    if (!(*array)) {
       free(p);
       return -1;
    }

    /* set up the pointers into the contiguous memory */
    for (int i=0; i<n; i++)
       (*array)[i] = &(p[i*m]);

    return 0;
}

int free2dchar(char ***array) {
    /* free the memory - the first element of the array is at the start */
    free(&((*array)[0][0]));

    /* free the pointers into the memory */
    free(*array);

    return 0;
}

int main(int argc, char **argv) {
    char **global, **local;
    const int gridsize=10; // size of grid
    const int procgridsize=2;  // size of process grid
    int rank, size;        // rank of current process and no. of processes

    MPI_Init(&argc, &argv);
    MPI_Comm_size(MPI_COMM_WORLD, &size);
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);


    if (size != procgridsize*procgridsize) {
        fprintf(stderr,"%s: Only works with np=%d for now\n", argv[0], procgridsize);
        MPI_Abort(MPI_COMM_WORLD,1);
    }


    if (rank == 0) {
        /* fill in the array, and print it */
        malloc2dchar(&global, gridsize, gridsize);
        for (int i=0; i<gridsize; i++) {
            for (int j=0; j<gridsize; j++)
                global[i][j] = '0'+(3*i+j)%10;
        }


        printf("Global array is:\n");
        for (int i=0; i<gridsize; i++) {
            for (int j=0; j<gridsize; j++)
                putchar(global[i][j]);

            printf("\n");
        }
    }

    /* create the local array which we'll process */
    malloc2dchar(&local, gridsize/procgridsize, gridsize/procgridsize);

    /* create a datatype to describe the subarrays of the global array */

    int sizes[2]    = {gridsize, gridsize};         /* global size */
    int subsizes[2] = {gridsize/procgridsize, gridsize/procgridsize};     /* local size */
    int starts[2]   = {0,0};                        /* where this one starts */
    MPI_Datatype type, subarrtype;
    MPI_Type_create_subarray(2, sizes, subsizes, starts, MPI_ORDER_C, MPI_CHAR, &type);
    MPI_Type_create_resized(type, 0, gridsize/procgridsize*sizeof(char), &subarrtype);
    MPI_Type_commit(&subarrtype);

    char *globalptr=NULL;
    if (rank == 0) globalptr = &(global[0][0]);

    /* scatter the array to all processors */
    int sendcounts[procgridsize*procgridsize];
    int displs[procgridsize*procgridsize];

    if (rank == 0) {
        for (int i=0; i<procgridsize*procgridsize; i++) sendcounts[i] = 1;
        int disp = 0;
        for (int i=0; i<procgridsize; i++) {
            for (int j=0; j<procgridsize; j++) {
                displs[i*procgridsize+j] = disp;
                disp += 1;
            }
            disp += ((gridsize/procgridsize)-1)*procgridsize;
        }
    }


    MPI_Scatterv(globalptr, sendcounts, displs, subarrtype, &(local[0][0]),
                 gridsize*gridsize/(procgridsize*procgridsize), MPI_CHAR,
                 0, MPI_COMM_WORLD);

    /* now all processors print their local data: */

    for (int p=0; p<size; p++) {
        if (rank == p) {
            printf("Local process on rank %d is:\n", rank);
            for (int i=0; i<gridsize/procgridsize; i++) {
                putchar('|');
                for (int j=0; j<gridsize/procgridsize; j++) {
                    putchar(local[i][j]);
                }
                printf("|\n");
            }
        }
        MPI_Barrier(MPI_COMM_WORLD);
    }

    /* now each processor has its local array, and can process it */
    for (int i=0; i<gridsize/procgridsize; i++) {
        for (int j=0; j<gridsize/procgridsize; j++) {
            local[i][j] = 'A' + rank;
        }
    }

    /* it all goes back to process 0 */
    MPI_Gatherv(&(local[0][0]), gridsize*gridsize/(procgridsize*procgridsize),  MPI_CHAR,
                 globalptr, sendcounts, displs, subarrtype,
                 0, MPI_COMM_WORLD);

    /* don't need the local data anymore */
    free2dchar(&local);

    /* or the MPI data type */
    MPI_Type_free(&subarrtype);

    if (rank == 0) {
        printf("Processed grid:\n");
        for (int i=0; i<gridsize; i++) {
            for (int j=0; j<gridsize; j++) {
                putchar(global[i][j]);
            }
            printf("\n");
        }

        free2dchar(&global);
    }


    MPI_Finalize();

    return 0;
}

7
这个问题一遍又一遍地出现在这里的某些版本中,我希望写一个答案,我们可以把人们指向它。但还是谢谢 :) - Jonathan Dursi
我非常熟练地掌握Fortran MPI,但将其收藏以备将来参考。同时,我也支持mort的评论。 - milancurcic
整个过程在Fortran中更容易,因为多维数组已经内置于语言中;而C语言则选择不包含此功能。而且你们两个在Stack Overflow上已经给出了相当强有力的答案... - Jonathan Dursi
我想知道使用MPI解决这个问题的最佳方法是什么。 - Sundevil
我已经在https://dev59.com/G10b5IYBdhLWcg3wUP4X上添加了赏金,希望能找到块大小不同的情况下的答案。 - Flash
显示剩余2条评论

1
我只是发现这样检查更容易。
#include <stdio.h>
#include <math.h>
#include <stdlib.h>
#include "mpi.h"

/*
 This is a version with integers, rather than char arrays, presented in this
 very good answer: https://dev59.com/Q2ox5IYBdhLWcg3wVS8J#9271753
 It will initialize the 2D array, scatter it, increase every value by 1 and then gather it back.
*/

int malloc2D(int ***array, int n, int m) {
    int i;
    /* allocate the n*m contiguous items */
    int *p = malloc(n*m*sizeof(int));
    if (!p) return -1;

    /* allocate the row pointers into the memory */
    (*array) = malloc(n*sizeof(int*));
    if (!(*array)) {
       free(p);
       return -1;
    }

    /* set up the pointers into the contiguous memory */
    for (i=0; i<n; i++)
       (*array)[i] = &(p[i*m]);

    return 0;
}

int free2D(int ***array) {
    /* free the memory - the first element of the array is at the start */
    free(&((*array)[0][0]));

    /* free the pointers into the memory */
    free(*array);

    return 0;
}

int main(int argc, char **argv) {
    int **global, **local;
    const int gridsize=4; // size of grid
    const int procgridsize=2;  // size of process grid
    int rank, size;        // rank of current process and no. of processes
    int i, j, p;

    MPI_Init(&argc, &argv);
    MPI_Comm_size(MPI_COMM_WORLD, &size);
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);


    if (size != procgridsize*procgridsize) {
        fprintf(stderr,"%s: Only works with np=%d for now\n", argv[0], procgridsize);
        MPI_Abort(MPI_COMM_WORLD,1);
    }


    if (rank == 0) {
        /* fill in the array, and print it */
        malloc2D(&global, gridsize, gridsize);
        int counter = 0;
        for (i=0; i<gridsize; i++) {
            for (j=0; j<gridsize; j++)
                global[i][j] = ++counter;
        }


        printf("Global array is:\n");
        for (i=0; i<gridsize; i++) {
            for (j=0; j<gridsize; j++) {
                printf("%2d ", global[i][j]);
            }
            printf("\n");
        }
    }
    //return;

    /* create the local array which we'll process */
    malloc2D(&local, gridsize/procgridsize, gridsize/procgridsize);

    /* create a datatype to describe the subarrays of the global array */
    int sizes[2]    = {gridsize, gridsize};         /* global size */
    int subsizes[2] = {gridsize/procgridsize, gridsize/procgridsize};     /* local size */
    int starts[2]   = {0,0};                        /* where this one starts */
    MPI_Datatype type, subarrtype;
    MPI_Type_create_subarray(2, sizes, subsizes, starts, MPI_ORDER_C, MPI_INT, &type);
    MPI_Type_create_resized(type, 0, gridsize/procgridsize*sizeof(int), &subarrtype);
    MPI_Type_commit(&subarrtype);

    int *globalptr=NULL;
    if (rank == 0)
        globalptr = &(global[0][0]);

    /* scatter the array to all processors */
    int sendcounts[procgridsize*procgridsize];
    int displs[procgridsize*procgridsize];

    if (rank == 0) {
        for (i=0; i<procgridsize*procgridsize; i++)
            sendcounts[i] = 1;
        int disp = 0;
        for (i=0; i<procgridsize; i++) {
            for (j=0; j<procgridsize; j++) {
                displs[i*procgridsize+j] = disp;
                disp += 1;
            }
            disp += ((gridsize/procgridsize)-1)*procgridsize;
        }
    }


    MPI_Scatterv(globalptr, sendcounts, displs, subarrtype, &(local[0][0]),
                 gridsize*gridsize/(procgridsize*procgridsize), MPI_INT,
                 0, MPI_COMM_WORLD);

    /* now all processors print their local data: */

    for (p=0; p<size; p++) {
        if (rank == p) {
            printf("Local process on rank %d is:\n", rank);
            for (i=0; i<gridsize/procgridsize; i++) {
                putchar('|');
                for (j=0; j<gridsize/procgridsize; j++) {
                    printf("%2d ", local[i][j]);
                }
                printf("|\n");
            }
        }
        MPI_Barrier(MPI_COMM_WORLD);
    }

    /* now each processor has its local array, and can process it */
    for (i=0; i<gridsize/procgridsize; i++) {
        for (j=0; j<gridsize/procgridsize; j++) {
            local[i][j] += 1; // increase by one the value
        }
    }

    /* it all goes back to process 0 */
    MPI_Gatherv(&(local[0][0]), gridsize*gridsize/(procgridsize*procgridsize),  MPI_INT,
                 globalptr, sendcounts, displs, subarrtype,
                 0, MPI_COMM_WORLD);

    /* don't need the local data anymore */
    free2D(&local);

    /* or the MPI data type */
    MPI_Type_free(&subarrtype);

    if (rank == 0) {
        printf("Processed grid:\n");
        for (i=0; i<gridsize; i++) {
            for (j=0; j<gridsize; j++) {
                printf("%2d ", global[i][j]);
            }
            printf("\n");
        }

        free2D(&global);
    }


    MPI_Finalize();

    return 0;
}

输出:

linux16:>mpicc -o main main.c
linux16:>mpiexec -n 4 main Global array is:
 1  2  3  4
 5  6  7  8
 9 10 11 12
13 14 15 16
Local process on rank 0 is:
| 1  2 |
| 5  6 |
Local process on rank 1 is:
| 3  4 |
| 7  8 |
Local process on rank 2 is:
| 9 10 |
|13 14 |
Local process on rank 3 is:
|11 12 |
|15 16 |
Processed grid:
 2  3  4  5
 6  7  8  9
10 11 12 13
14 15 16 17

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接