MPI:从从属进程渗透一个子数组到根/主进程以更新主数组

3

非常抱歉代码比较冗长。我刚开始学习MPI,尝试并行化排序算法时遇到了问题。我有一个巨大的整数数组,需要通过将其分成相等的部分交给从进程来更快地排序。一旦从进程完成,它们必须将子数组返回给根进程(合并),以便进行进一步处理。 因此:解决方案必须是用半排序的子数组打补丁的全局数组。 我已经尝试查看论坛中以前的问题,但整天都无法解决。请不要对我太苛刻,因为我还是MPI的新手。

void Batcher( char *inputFile,   string outputFile, int N  ){

   int initialised;
   MPI_Initialized(&initialised);
   //int rank;
   if (!initialised)
   {
      MPI_Init(NULL, NULL);
      atexit(finalized);
   }
   //
   //BEGIN : FILE READING FOR ARRAY
   //
   //Get number of processes
   int world_size;
   int root = 0;
   MPI_Comm_size(MPI_COMM_WORLD, &world_size);
   N=world_size;
   //Get the rank of the process
   int rank;
   MPI_Comm_rank(MPI_COMM_WORLD , &rank);

   int *global_array;
   int *sub_array_per_process;
   int element_count;

   if ( rank == root ){
       // ifstream input;
       ofstream output;
       std::ifstream input( inputFile, std::ifstream::in);
       //get number of integers in the file  so they may be populates to array
       int counter=0;
       string line;
       while( std::getline(input,line)){
          if ( line !="")          ++counter;
       }

       int numbers[counter];
       global_array = new int [counter];
       global_array = &numbers[0];


       int current;
       int index = 0;

       //reset read pointer to beginning of input file
       input.clear();
       input.seekg(0, ios::beg);
      //get number from inputfile and add to array numbers 
       while( input >> current){
          numbers[index]=current;
          index++;
          // cout<<((int) a);
       }
      global_array = numbers;

      for(int i=0; i<counter; i++)
          global_array[i]=numbers[i];//<<endl;

       for(int i=0; i<counter; i++)
          cout<<"global "<< global_array[i]<< " numbers " <<numbers[i] <<endl;


       element_count = counter; 
       input.close();


       /* 
       Send tasks to slaves  */

        int NON = element_count / (N - 1 );
        for(int i=root+1; i< world_size ; i++){
           int start = get_preceeding_ranks(i )*NON;
           //     cout<<"start is "<< start <<endl;
           MPI_Send(&global_array[start], NON, MPI_INT,i, 1 ,MPI_COMM_WORLD);

       }


      MPI_Status status;
      int temp[counter];

   } // End root process operation

    MPI_Bcast(&element_count, 1, MPI_INT,root,MPI_COMM_WORLD );

    int NON = element_count / (N - 1 );
     //Recieve local su-job from root
    if ( rank != root ){
       MPI_Status status;
       MPI_Recv(sub_array_per_process,NON, MPI_INT , root, 1 , MPI_COMM_WORLD ,&status );

    }
    int n_per_small_chunk = sizeof(sub_array_per_process) / sizeof(sub_array_per_process[0]) ;

    oddEvenMergeSort(sub_array_per_process,0, n_per_small_chunk);

    cout<<"After sorting processwise sorting.... "<<endl;
    if ( rank != root ){
         for(int i=0;i<NON;i++)
          cout<<"rank  : "<< rank <<"  data = "<< sub_array_per_process[ i] << endl;
    }
 //   MPI_Bcast(global_array, element_count , MPI_INT,root,MPI_COMM_WORLD );
 //sub_array_per_process = new int[2];

    MPI_Barrier(MPI_COMM_WORLD  ) ;

   if (rank == root ){
       int start ;
       int sender = -1;
       for ( int i= 0; i< N; i++){
            start = get_preceeding_ranks(i+1 )*NON;
             MPI_Status status;
       cout<<" start = "<<start<<endl;
       sender = i+1;
           MPI_Recv(&global_array[start], NON  , MPI_INT , sender , 1, MPI_COMM_WORLD ,&status );
           cout<<" Received  " << global_array[start] <<" from " << sender <<   endl
           ;
           // MPI_Bcast(global_array, elem , MPI_INT,root,MPI_COMM_WORLD );

        }
         for ( int j=0; j< element_count; j++ )  
             cout<<" Received  " << global_array[j] <<" from " <<sender <<   endl;

   }
   else    {  //Send to root your sub-array..
     // for (int j=1 ; j < N; i++ )
         for ( int i=0;i<NON; i++)
             MPI_Send(&sub_array_per_process[i], NON , MPI_INT,0, 1 ,MPI_COMM_WORLD);
  }

   MPI_Barrier(MPI_COMM_WORLD  ) ;

   for ( int j=0; j< element_count; j++ )  
     cout<<" iOutside  " << global_array[j] <<" from "<<   endl;



   MPI_Finalize();

}

int main() {
  string output;//Dummy

  char* inputFile ="inputsmall.txt";
  int N=0; //Dummy


  Batcher( inputFile,  output, N  );

  return 0 ;
}

由于代码很长,我特别卡在这里:

   if (rank == root ){
           int start ;
           int sender = -1;
           for ( int i= 0; i< N; i++){ //get all submitted sub-arrays from slaves. 
                start = get_preceeding_ranks(i+1 )*NON;
                 MPI_Status status;
           cout<<" start = "<<start<<endl;
           sender = i+1;
               MPI_Recv(&global_array[start], NON  , MPI_INT , sender , 1, MPI_COMM_WORLD ,&status );
               cout<<" Received  " << global_array[start] <<" from " << sender <<   endl
               ;


            }
             for ( int j=0; j< element_count; j++ )  
                 cout<<" Received  " << global_array[j] <<" from " <<sender <<   endl;

       }
       else    {  //Send to root your sub-array..
                for ( int i=0;i<NON; i++)
                 MPI_Send(&sub_array_per_process[i], NON , MPI_INT,0, 1 ,MPI_COMM_WORLD);
      }

我得到的输出是:-
     start = 2
 Received  2 from 1
 start = 4
 Received  3 from 2
 start = 6
 Received  4 from 3
 start = 8
[tux2:25061] *** Process received signal ***
[tux2:25061] Signal: Segmentation fault (11)
[tux2:25061] Signal code:  (128)
[tux2:25061] Failing at address: (nil)
--------------------------------------------------------------------------
mpirun noticed that process rank 0 with PID 25061 on node tux2 exited on signal 11 (Segmentation fault).
--------------------------------------------------------------------------

不要为C++代码添加C标签。 - too honest for this site
注意,谢谢。这是因为代码中使用了 C 而不是 C++ 绑定。C++ 的绑定正在被废弃。 - Walker
1
所以,element_count似乎是您的数组长度,但get_preceeding_ranks(i + 1)在做什么?我找不到它的定义。您似乎正在实现MPI_Scatter,最好使用它:http://www.mpi-forum.org/docs/mpi-1.1/mpi-11-html/node72.html - haraldkl
非常感谢,我已经完成了教程的学习,受益匪浅。由于 MPI_ScatterMPI_Gather 已经满足了我的需求,现在我的代码变得简单易懂了。你所推荐的教程与 @Pooja 建议的类似:(http://mpitutorial.com/tutorials/mpi-scatter-gather-and-allgather/)。 - Walker
1个回答

3

根据您的代码,我发现当rank等于root时,您正在从从属进程接收排序后的数组。问题在于您的for循环中。您试图从i+1N的所有进程接收,而您的Communicator只有N个进程(这是因为您已经分配了N = world_size)。因此您的从属进程是1..N-1。因此,您需要将for语句更改如下:

for ( int i= 0; i< (N-1); i++){ //To receive data from process i=1 to i=N-1

另外一点是,你的代码很难调试和管理。你会发现使用 MPI_ScatterMPI_Gather 更容易。可以参考 这篇教程


谢谢,我似乎在重新发明轮子,自己写了MPI_ScatterMPI_Gather。通过那个教程,帮助了我很多。 - Walker

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接