在MPI中将二维数组的分布块发送给根进程

3
我有一个2D数组,它分布在MPI进程网格中(在此示例中为3 x 2个进程)。数组的值是在该块数组分布到的进程内生成的,我想将所有这些块聚集到根进程以显示它们。
到目前为止,我有以下代码。这将生成笛卡尔通信器,找出MPI进程的坐标并计算出基于此应该获得多少数组(因为数组不需要是笛卡尔网格大小的倍数)。然后,我创建了一个新的MPI派生数据类型,它将整个进程的子数组作为一个项目发送(也就是说,每个进程的步幅、块长度和计数都不同,因为每个进程具有不同大小的数组)。但是,当我使用MPI_Gather将数据聚集在一起时,我会遇到分段错误。
我认为这是因为我不应该在MPI_Gather调用中使用相同的数据类型进行发送和接收。数据类型对于发送数据来说是正确的,因为它具有正确的计数、步幅和块长度,但是当它到达另一端时,它将需要非常不同的派生数据类型。我不确定如何计算此数据类型的参数 - 有人有任何想法吗?
此外,如果我完全从错误的角度来处理此问题,请告诉我!
#include<stdio.h>
#include<array_alloc.h>
#include<math.h>
#include<mpi.h>

int main(int argc, char ** argv)
{
    int size, rank;
    int dim_size[2];
    int periods[2];
    int A = 2;
    int B = 3;
    MPI_Comm cart_comm;
    MPI_Datatype block_type;
    int coords[2];

    float **array;
    float **whole_array;

    int n = 10;
    int rows_per_core;
    int cols_per_core;
    int i, j;

    int x_start, x_finish;
    int y_start, y_finish;

    /* Initialise MPI */
    MPI_Init(&argc, &argv);

    /* Get the rank for this process, and the number of processes */
    MPI_Comm_size(MPI_COMM_WORLD, &size);
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);

    if (rank == 0)
    {
        /* If we're the master process */
        whole_array = alloc_2d_float(n, n);

        /* Initialise whole array to silly values */
        for (i = 0; i < n; i++)
        {
            for (j = 0; j < n; j++)
            {
                whole_array[i][j] = 9999.99;
            }
        }

        for (j = 0; j < n; j ++)
        {
            for (i = 0; i < n; i++)
            {
                printf("%f ", whole_array[j][i]);
            }
            printf("\n");
        }
    }

    /* Create the cartesian communicator */
    dim_size[0] = B;
    dim_size[1] = A;
    periods[0] = 1;
    periods[1] = 1;

    MPI_Cart_create(MPI_COMM_WORLD, 2, dim_size, periods, 1, &cart_comm);

    /* Get our co-ordinates within that communicator */
    MPI_Cart_coords(cart_comm, rank, 2, coords);

    rows_per_core = ceil(n / (float) A);
    cols_per_core = ceil(n / (float) B);

    if (coords[0] == (B - 1))
    {
        /* We're at the far end of a row */
        cols_per_core = n - (cols_per_core * (B - 1));
    }
    if (coords[1] == (A - 1))
    {
        /* We're at the bottom of a col */
        rows_per_core = n - (rows_per_core * (A - 1));
    }

    printf("X: %d, Y: %d, RpC: %d, CpC: %d\n", coords[0], coords[1], rows_per_core, cols_per_core);

    MPI_Type_vector(rows_per_core, cols_per_core, cols_per_core + 1, MPI_FLOAT, &block_type);
    MPI_Type_commit(&block_type);

    array = alloc_2d_float(rows_per_core, cols_per_core);

    if (array == NULL)
    {
        printf("Problem with array allocation.\nExiting\n");
        return 1;
    }

    for (j = 0; j < rows_per_core; j++)
    {
        for (i = 0; i < cols_per_core; i++)
        {
            array[j][i] = (float) (i + 1);
        }
    }

    MPI_Barrier(MPI_COMM_WORLD);

    MPI_Gather(array, 1, block_type, whole_array, 1, block_type, 0, MPI_COMM_WORLD);

    /*
    if (rank == 0)
    {
        for (j = 0; j < n; j ++)
        {
            for (i = 0; i < n; i++)
            {
                printf("%f ", whole_array[j][i]);
            }
            printf("\n");
        }
    }
    */
    /* Close down the MPI environment */
    MPI_Finalize();
}

我使用的2D数组分配程序实现如下:

float **alloc_2d_float( int ndim1, int ndim2 ) {

  float **array2 = malloc( ndim1 * sizeof( float * ) );

  int i;

  if( array2 != NULL ){

    array2[0] = malloc( ndim1 * ndim2 * sizeof( float ) );

    if( array2[ 0 ] != NULL ) {

      for( i = 1; i < ndim1; i++ )
    array2[i] = array2[0] + i * ndim2;

    }

    else {
      free( array2 );
      array2 = NULL;
    }

  }

  return array2;

}

你的二维数组是如何分配的?你能发一下alloc_2d_float的实现吗? - suszterpatt
啊,对不起,我使用了一个朋友提供的库例程,并忘记提供那段代码。我已经更新了问题并加入了那段代码。 - robintw
3个回答

4

这是一个棘手的问题。你走在正确的轨道上,是的,你需要不同类型来发送和接收。

发送部分很容易--如果你要发送整个子数组array,那么你甚至不需要使用向量类型;你可以发送整个连续的(rows_per_core)*(cols_per_core)个浮点数,从&(array[0][0])(或者如果你喜欢,可以用array[0])开始。

接收部分是棘手的部分,正如你所了解的那样。让我们从最简单的情况开始--假设一切都能被平均分割,因此所有块的大小都相同。然后,你可以使用非常有帮助的MPI_Type_create_subarray(你总是可以用向量类型拼凑它,但对于高维数组而言,这变得很繁琐,因为你需要为除了最后一个维度之外的每个维度创建1个中间类型...

此外,你可以使用同样有用的 MPI_Dims_create 来创建一个尽可能接近正方形的进程分解,而不是硬编码分解。请注意,这并不一定与 MPI_Cart_create 有任何关系,尽管你可以将其用于所需的维度。我将跳过 cart_create 部分,不是因为它没有用处,而是因为我想专注于 gather 部分。
如果每个进程都有相同大小的 array,那么根进程将从每个进程接收相同的数据类型,并且可以使用非常简单的子数组类型来获取它们的数据:
MPI_Type_create_subarray(2, whole_array_size, sub_array_size, starts,
                         MPI_ORDER_C, MPI_FLOAT, &block_type);
MPI_Type_commit(&block_type);

其中sub_array_size[] = {每个核心的行数, 每个核心的列数}whole_array_size[] = {n,n},对于这里,starts[]={0,0} - 例如,我们假设一切都从起点开始。

之所以如此,是因为我们可以使用Gatherv来明确地设置数组中的位移:

for (int i=0; i<size; i++) {
    counts[i] = 1;   /* one block_type per rank */

    int row = (i % A);
    int col = (i / A);
    /* displacement into the whole_array */
    disps[i] = (col*cols_per_core + row*(rows_per_core)*n);
}

MPI_Gatherv(array[0], rows_per_core*cols_per_core, MPI_FLOAT,
            recvptr, counts, disps, resized_type, 0, MPI_COMM_WORLD);

现在每个人都将他们的数据发送到一个块中,然后将其接收到数组的正确部分。为了使此工作正常,我已经调整了类型的大小,使其范围仅为一个浮点数,因此可以在该单位中计算位移:

MPI_Type_create_resized(block_type, 0, 1*sizeof(float), &resized_type);
MPI_Type_commit(&resized_type);

整个代码如下:
#include<stdio.h>
#include<stdlib.h>
#include<math.h>
#include<mpi.h>

float **alloc_2d_float( int ndim1, int ndim2 ) {
    float **array2 = malloc( ndim1 * sizeof( float * ) );
    int i;

    if( array2 != NULL ){
        array2[0] = malloc( ndim1 * ndim2 * sizeof( float ) );
        if( array2[ 0 ] != NULL ) {
            for( i = 1; i < ndim1; i++ )
                array2[i] = array2[0] + i * ndim2;
        }

        else {
            free( array2 );
            array2 = NULL;
        }
    }
    return array2;
}

void free_2d_float( float **array ) {
    if (array != NULL) {
        free(array[0]);
        free(array);
    }
    return;
}

void init_array2d(float **array, int ndim1, int ndim2, float data) {
    for (int i=0; i<ndim1; i++) 
        for (int j=0; j<ndim2; j++)
            array[i][j] = data;
    return;
}

void print_array2d(float **array, int ndim1, int ndim2) {
    for (int i=0; i<ndim1; i++) {
        for (int j=0; j<ndim2; j++) {
            printf("%6.2f ", array[i][j]);
        }
        printf("\n");
    }
    return;
}


int main(int argc, char ** argv)
{
    int size, rank;
    int dim_size[2];
    int periods[2];
    MPI_Datatype block_type, resized_type;

    float **array;
    float **whole_array;
    float *recvptr;

    int *counts, *disps;

    int n = 10;
    int rows_per_core;
    int cols_per_core;
    int i, j;

    int whole_array_size[2];
    int sub_array_size[2];
    int starts[2];
    int A, B;

    /* Initialise MPI */
    MPI_Init(&argc, &argv);

    /* Get the rank for this process, and the number of processes */
    MPI_Comm_size(MPI_COMM_WORLD, &size);
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);

    if (rank == 0)
    {
        /* If we're the master process */
        whole_array = alloc_2d_float(n, n);
        recvptr = &(whole_array[0][0]);

        /* Initialise whole array to silly values */
        for (i = 0; i < n; i++)
        {
            for (j = 0; j < n; j++)
            {
                whole_array[i][j] = 9999.99;
            }
        }

        print_array2d(whole_array, n, n);
        puts("\n\n");
    }

    /* Create the cartesian communicator */
    MPI_Dims_create(size, 2, dim_size);
    A = dim_size[1];
    B = dim_size[0];
    periods[0] = 1;
    periods[1] = 1;

    rows_per_core = ceil(n / (float) A);
    cols_per_core = ceil(n / (float) B);
    if (rows_per_core*A != n) {
        if (rank == 0) fprintf(stderr,"Aborting: rows %d don't divide by %d evenly\n", n, A);
        MPI_Abort(MPI_COMM_WORLD,1);
    }
    if (cols_per_core*B != n) {
        if (rank == 0) fprintf(stderr,"Aborting: cols %d don't divide by %d evenly\n", n, B);
        MPI_Abort(MPI_COMM_WORLD,2);
    }

    array = alloc_2d_float(rows_per_core, cols_per_core);
    printf("%d, RpC: %d, CpC: %d\n", rank, rows_per_core, cols_per_core);

    whole_array_size[0] = n;             
    sub_array_size  [0] = rows_per_core; 
    whole_array_size[1] = n;
    sub_array_size  [1] = cols_per_core;
    starts[0] = 0; starts[1] = 0;

    MPI_Type_create_subarray(2, whole_array_size, sub_array_size, starts, 
                             MPI_ORDER_C, MPI_FLOAT, &block_type);
    MPI_Type_commit(&block_type);
    MPI_Type_create_resized(block_type, 0, 1*sizeof(float), &resized_type);
    MPI_Type_commit(&resized_type);

    if (array == NULL)
    {
        printf("Problem with array allocation.\nExiting\n");
        MPI_Abort(MPI_COMM_WORLD,3);
    }

    init_array2d(array,rows_per_core,cols_per_core,(float)rank);

    counts = (int *)malloc(size * sizeof(int));
    disps  = (int *)malloc(size * sizeof(int));
    /* note -- we're just using MPI_COMM_WORLD rank here to
     * determine location, not the cart_comm for now... */
    for (int i=0; i<size; i++) {
        counts[i] = 1;   /* one block_type per rank */

        int row = (i % A);
        int col = (i / A);
        /* displacement into the whole_array */
        disps[i] = (col*cols_per_core + row*(rows_per_core)*n);
    }

    MPI_Gatherv(array[0], rows_per_core*cols_per_core, MPI_FLOAT, 
                recvptr, counts, disps, resized_type, 0, MPI_COMM_WORLD);

    free_2d_float(array);
    if (rank == 0) print_array2d(whole_array, n, n);
    if (rank == 0) free_2d_float(whole_array);
    MPI_Finalize();
}

小事一桩——你在gather之前不需要栅栏。实际上,你很少真正需要栅栏,而且它们是昂贵的操作,有很多原因可以隐藏问题——我的经验法则是:除非你确切地知道为什么需要打破规则,否则永远不要使用栅栏。特别是在这种情况下,集体聚集例程与栅栏完全相同,所以只需使用它。
现在,进入更难的部分。如果事情不能被平均分配,你有几个选择。最简单的,虽然不一定是最好的,就是填充数组,使其能够被平均分配,即使仅针对此操作。
如果你可以安排好列数可以被平均分配,即使行数不能,那么你仍然可以使用gatherv,并为每一行的每个部分创建一个向量类型,并从每个处理器中收集适当数量的行。那样会很好用。
如果你确定既不能计算行数也不能计算列数,并且不能填充数据进行发送,那么我可以看到三个子选项:
  • 正如susterpatt建议的那样,使用点对点通信。对于少量任务来说,这种方法还不错,但是随着任务数量增加,这种方法的效率会显著降低,比起集体操作。
  • 创建一个由所有不在边缘的处理器组成的通信器,然后使用上述代码精确地收集它们的代码;然后点对点传输边缘任务的数据。
  • 根本不需要将数据收集到0号进程中;使用分布式数组类型描述数组的布局,并使用MPI-IO将所有数据写入文件;完成后,如果需要,可以让0号进程显示数据。

1

看起来你 MPI_Gather 调用的第一个参数应该是 array[0],而不是 array

此外,如果您需要从每个排名获取不同数量的数据,则最好使用 MPI_Gatherv

最后,注意将所有数据集中在一个地方以进行输出在许多情况下都不可扩展。随着数据量的增长,最终会超过rank 0可用的内存。您可能更好地分发输出工作(如果正在写入文件,则使用MPI IO或其他库调用),或者一次性向rank 0发送点对点发送,以限制总内存消耗。

另一方面,我不建议协调每个rank依次打印到标准输出,因为一些主要的MPI实现不能保证标准输出会按顺序生成。特别是Cray的MPI,如果多个rank同时打印,则会彻底搞乱标准输出。


0

根据this(由我强调):

集合操作的类型匹配条件比点对点之间的发送器和接收器更为严格。换句话说,对于集合操作,发送的数据量必须与接收方指定的数据量完全相匹配。在发送方和接收方之间具有不同的类型映射仍然是允许的。

听起来你有两个选择:

  1. 填充较小的子矩阵,使所有进程发送相同数量的数据,然后在 Gather 后将矩阵裁剪回其原始大小。如果你感到冒险,可以尝试定义接收类型映射,以便在 Gather 操作期间自动覆盖填充,从而消除之后需要裁剪的需要。但这可能会变得有些复杂。
  2. 退回到点对点通信。更直接,但可能存在更高的通信成本。

就我个人而言,我会选择第二个选项。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接