MPI-3共享内存用于数组结构

3

我有一个简单的C++结构体,基本上包装了一个标准的C数组:

struct MyArray {
    T* data;
    int length;
    // ...
}

其中,T 是像 floatdouble 这样的数字类型。 length 是数组中元素的数量。通常我的数组非常大(成千上万甚至数百万个元素)。

我有一个 MPI 程序,我想通过 MPI 3 共享内存将两个 MyArray 实例暴露为共享内存对象。假设它们是 a_olda_new。每个 MPI 排名从 a_old 中读取数据。然后,每个 MPI 排名将写入到 a_new 的某些索引中(每个排名只写入自己的一组索引 - 没有重叠)。最后,在所有排名上必须设置 a_old = a_newa_olda_new 的大小相同。目前,我正在通过使用 Isend/Irecv 将每个排名的更新值与其他排名同步来使我的代码正常工作。但是,由于数据访问模式,我没有理由需要承担消息传递的开销,而可以只有一个共享内存对象,并在 a_old = a_new 之前放置一个屏障。我认为这会给我更好的性能(如果我错了,请纠正我)。

我一直有困难找到使用 MPI 3 进行共享内存的完整代码示例。大多数网站只提供参考文献或不完整的片段。能否有人向我演示一个简单而完整的代码示例,以实现我试图实现的事情(通过 MPI 共享内存更新和同步数字数组)?我了解创建共享内存通信器和窗口、设置栅栏等主要概念,但看到一个将它们全部结合起来的示例会对我的理解非常有帮助。

此外,我应该提到,我只会在一个节点上运行我的代码,因此我不需要担心需要在节点之间复制我的共享内存对象;我只需要为运行我的 MPI 进程的单个节点拥有一份数据副本。尽管如此,在这种情况下,其他解决方案(如 OpenMP)对我来说并不可行,因为我有大量的 MPI 代码,无法为了分享一两个数组而重写所有代码。

2个回答

8

使用MPI-3共享内存相对简单。

首先,您需要使用MPI_Win_allocate_shared来分配共享内存窗口:

MPI_Win win;
MPI_Aint size;
void *baseptr;

if (rank == 0)
{
   size = 2 * ARRAY_LEN * sizeof(T);
   MPI_Win_allocate_shared(size, sizeof(T), MPI_INFO_NULL,
                           MPI_COMM_WORLD, &baseptr, &win);
}
else
{
   int disp_unit;
   MPI_Win_allocate_shared(0, sizeof(T), MPI_INFO_NULL,
                           MPI_COMM_WORLD, &baseptr, &win);
   MPI_Win_shared_query(win, 0, &size, &disp_unit, &baseptr);
}
a_old.data = baseptr;
a_old.length = ARRAY_LEN;
a_new.data = a_old.data + ARRAY_LEN;
a_new.length = ARRAY_LEN;

在这里,只有等级0分配内存。由于它是共享的,因此哪个进程分配它并不重要。甚至可以让每个进程分配一部分内存,但由于默认情况下分配是连续的,因此两种方法是等效的。MPI_Win_shared_query然后由所有其他进程使用,以查找共享内存块开头在其虚拟地址空间中的位置。该地址可能因等级而异,因此不应传递绝对指针。
现在,您可以简单地从a_old.data和a_new.data中加载和存储数据。由于您的情况下等级在不同的内存位置集上工作,因此您不需要锁定窗口。使用窗口锁来实现例如a_old的受保护初始化或需要同步的其他操作。您还可能需要明确告诉编译器不要重新排序代码,并发出内存栅栏,以使所有未完成的加载/存储操作在调用MPI_Barrier()之前完成。
代码a_old = a_new表明将一个数组复制到另一个数组中。相反,您可以简单地交换数据指针,最终交换大小字段。由于仅数组的数据位于共享内存块中,因此交换指针是本地操作,即无需同步。假设两个数组长度相等:
T *temp;
temp = a_old.data;
a_old.data = a_new.data;
a_new.data = temp;

在继续之前,您仍需要一个屏障,以确保所有其他进程已完成处理。

最后,只需释放窗口:

MPI_Win_free(&win);

一个完整的示例(使用C语言)如下所示:
#include <stdio.h>
#include <mpi.h>

#define ARRAY_LEN 1000

int main (void)
{
   MPI_Init(NULL, NULL);

   int rank, nproc;
   MPI_Comm_rank(MPI_COMM_WORLD, &rank);
   MPI_Comm_size(MPI_COMM_WORLD, &nproc);

   MPI_Win win;
   MPI_Aint size;
   void *baseptr;

   if (rank == 0)
   {
      size = ARRAY_LEN * sizeof(float);
      MPI_Win_allocate_shared(size, sizeof(int), MPI_INFO_NULL,
                              MPI_COMM_WORLD, &baseptr, &win);
   }
   else
   {
      int disp_unit;
      MPI_Win_allocate_shared(0, sizeof(int), MPI_INFO_NULL,
                              MPI_COMM_WORLD, &baseptr, &win);
      MPI_Win_shared_query(win, 0, &size, &disp_unit, &baseptr);
   }

   printf("Rank %d, baseptr = %p\n", rank, baseptr);

   int *arr = baseptr;
   for (int i = rank; i < ARRAY_LEN; i += nproc)
     arr[i] = rank;

   MPI_Barrier(MPI_COMM_WORLD);

   if (rank == 0)
   {
      for (int i = 0; i < 10; i++)
         printf("%4d", arr[i]);
      printf("\n");
   }

   MPI_Win_free(&win);

   MPI_Finalize();
   return 0;
}

免责声明:这里的内容仅供参考,我对MPI的RMA理解还不是很深刻。


这对我目前处理类似情况非常有帮助。您是否熟悉Fortran实现类似代码的方式? - Rain
@Rain 在Fortran中的工作方式相同。唯一的非平凡差异是,您需要声明一个Fortran指针,并使用类似于从Fortran标准iso_c_binding模块中的c_f_pointer()函数将其与由MPI_Win_allocate_shared返回的基本指针地址关联起来。 - Hristo Iliev
谢谢!我仍然有一些关于Fortran实现的问题,即如何存储和索引用户定义的数据类型(而不是数组类型)。请参见以下问题:https://stackoverflow.com/questions/68369535/using-mpi-to-share-bulk-data-between-processes。在C中,我找到了使用指针算术的方法。在Fortran中,我想知道是否存在类似的代码? - Rain

4
这里有一段代码可以满足您的描述。在注释中,我放了一些关于代码的小描述。通常它呈现一个动态的RMA窗口,需要分配内存并添加到窗口中。

MPI_Win_lock_all(0, win) Open MPI文档中的描述:

开始对win中所有进程进行RMA访问纪元,锁定类型为MPI_LOCK_SHARED。在此期间,调用进程可以通过使用RMA操作访问win中所有进程的窗口内存。

在我使用MPI_INFO_NULL时,您可以使用MPI_Info对象提供额外的信息给MPI,但它取决于您的内存访问模式。
#include <stdio.h>
#include <stdlib.h>
#include <mpi.h>

typedef struct MyArray {
    double* data;
    int length;
}MyArray;

#define ARRAY_SIZE 10

int main(int argc, char *argv[]) {
    int rank, worldSize, i;
    MPI_Win win;
    MPI_Aint disp;
    MPI_Aint *allProcessDisp;
    MPI_Request *requestArray;

    MyArray myArray;

    MPI_Init(&argc, &argv);
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);
    MPI_Comm_size(MPI_COMM_WORLD, &worldSize);

    MPI_Win_create_dynamic(MPI_INFO_NULL, MPI_COMM_WORLD, &win);

    allProcessDisp = malloc(sizeof(MPI_Aint) * worldSize);

    requestArray = malloc(sizeof(MPI_Request) * worldSize);
    for (i = 0; i < worldSize; i++) 
        requestArray[i] = MPI_REQUEST_NULL;

    myArray.data = malloc(sizeof(double) * ARRAY_SIZE);
    myArray.length = ARRAY_SIZE;

    //Allocating memory for each process share window space 
    MPI_Alloc_mem(sizeof(double) * ARRAY_SIZE, MPI_INFO_NULL, &myArray.data);
    for (i = 0; i < ARRAY_SIZE; i++)
        myArray.data[i] = rank;

    //attach the allocating memory to each process share window space 
    MPI_Win_attach(win, myArray.data, sizeof(double) * ARRAY_SIZE);

    MPI_Get_address(myArray.data, &disp);

    if (rank == 0) {
        allProcessDisp[0] = disp;
        //Collect all displacements
        for (i = 1; i < worldSize; i++) {
            MPI_Irecv(&allProcessDisp[i], 1, MPI_AINT, i, 0, MPI_COMM_WORLD, &requestArray[i]);
        }
        MPI_Waitall(worldSize, requestArray, MPI_STATUS_IGNORE);
        MPI_Bcast(allProcessDisp, worldSize, MPI_AINT, 0, MPI_COMM_WORLD);
    }
    else {
        //send displacement 
        MPI_Send(&disp, 1, MPI_AINT, 0, 0, MPI_COMM_WORLD);
        MPI_Bcast(allProcessDisp, worldSize, MPI_AINT, 0, MPI_COMM_WORLD);
    }

    // here you can do RMA operations 
    // Each time you need an RMA operation you start with 
    double otherRankData = -1.0;
    int otherRank = 1;
    if (rank == 0) {
        MPI_Win_lock_all(0, win);
        MPI_Get(&otherRankData, 1, MPI_DOUBLE, otherRank, allProcessDisp[otherRank], 1, MPI_DOUBLE, win);
        // and end with 
        MPI_Win_unlock_all(win);
        printf("Rank 0 : Got %.2f from %d\n", otherRankData, otherRank);
    }

    if (rank == 1) {
        MPI_Win_lock_all(0, win);
        MPI_Put(myArray.data, ARRAY_SIZE, MPI_DOUBLE, 0, allProcessDisp[0], ARRAY_SIZE, MPI_DOUBLE, win);
        // and end with 
        MPI_Win_unlock_all(win);
    }

    printf("Rank %d: ", rank);
    for (i = 0; i < ARRAY_SIZE; i++)
        printf("%.2f ", myArray.data[i]);
    printf("\n");

    //set rank 0 array
    if (rank == 0) {
        for (i = 0; i < ARRAY_SIZE; i++)
            myArray.data[i] = -1.0;

        printf("Rank %d: ", rank);
        for (i = 0; i < ARRAY_SIZE; i++)
            printf("%.2f ", myArray.data[i]);
        printf("\n");
    }

    free(allProcessDisp);
    free(requestArray);
    free(myArray.data);

    MPI_Win_detach(win, myArray.data);
    MPI_Win_free(&win);
    MPI_Finalize();

    return 0;
}

既然 Hristo 先回答了,我已经给了他答案,但这也是一个很好的例子,谢谢! - davewy
没问题,@Hristo的回答总是非常好的,就像上面那个一样。 - Angelos
你为什么没有在allProcessDisp上使用Allgather? - Jeff Hammond
因为我正在回答这个问题的一部分:“我在使用MPI 3进行共享内存时遇到了完整代码示例的困难。” - Angelos

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接