OpenMP while循环

5

我有一段代码,它运行许多迭代,只有在满足条件时才保存迭代的结果。这自然地表达为一个while循环。我试图让代码并行运行,因为每个实现都是独立的。所以我有以下代码:

while(nit<avit){
    #pragma omp parallel shared(nit,avit)
    {
        //do some stuff
        if(condition){
            #pragma omp critical
            {
                nit++;
                \\save results
            }
        }
    }//implicit barrier here
}

这很好用...但每次实现后都有一个屏障,这意味着如果并行块中的操作在某一次迭代中比其他操作花费更长时间,那么所有线程都会等待其完成,而不是继续下一次迭代。
有没有办法避免这个屏障,使线程继续工作?我要执行数千次迭代,所以多几次不会有影响(如果“nit”变量尚未在已运行的线程中递增)...
我尝试将其转换为并行for循环,但for循环中的自动递增使得“nit”变量失控。这是我的尝试:
#pragma omp parallel shared(nit,avit)
{
    #pragma omp for
    for(nit=0;nit<avit;nit++){
        //do some stuff
        if(condition){
            \\save results
        } else {
            #pragma omp critical
            {
                nit--;
            }
        }
    }
}

这段代码与预期一样在for循环中继续工作和运行,但是我的nit变量却取决于不同线程在不同时间内的增加和减少,导致值无法预测。

我还尝试将for循环中的增量留空,但编译失败,或试图在代码中欺骗没有for循环增量,例如

...
incr=0;
for(nit=0;nit<avit;nit+=incr)
...

但是我的代码崩溃了...

有什么想法吗?

谢谢。

编辑:这里是一个在while循环中的工作最小化示例:

#include <random>
#include <vector>
#include <iostream>
#include <time.h>
#include <omp.h>
#include <stdlib.h>
#include <unistd.h>

using namespace std;

int main(){

    int nit,dit,avit=100,t,j,tmax=100,jmax=10;
    vector<double> Res(10),avRes(10);

    nit=0; dit=0;
    while(nit<avit){
        #pragma omp parallel shared(tmax,nit,jmax,avRes,avit,dit) private(t,j) firstprivate(Res)
        {
            srand(int(time(NULL)) ^ omp_get_thread_num());
            t=0; j=0;
            while(t<tmax&&j<jmax){
                Res[j]=rand() % 10;
                t+=Res[j];
                if(omp_get_thread_num()==5){
                    usleep(100000);
                }
                j++;
            }
            if(t<tmax){
                #pragma omp critical
                {
                    nit++;
                    for(j=0;j<jmax;j++){
                        avRes[j]+=Res[j];
                    }
                    for(j=0;j<jmax;j++){
                        cout<<avRes[j]/nit<<"\t";
                    }
                    cout<<" \t nit="<<nit<<"\t thread: "<<omp_get_thread_num();
                    cout<<endl;
                }
            } else{
                #pragma omp critical
                {
                    dit++;
                    cout<<"Discarded: "<<dit<<"\r"<<flush;
                }
            }
        }
    }
    return 0;
}

我添加了usleep部分来模拟一个线程的执行时间比其他线程长。如果你运行这个程序,所有的线程都必须等待第5个线程完成后才能开始下一轮执行。而我想要做的正是避免这种等待,也就是说,我希望其他线程在不等待5号线程完成的情况下继续执行下一次迭代。


如果没有更具体的了解某些东西,就很难正确回答这个问题。特别是我们不知道nit是否在某些东西中被访问,当多个线程同时拥有condition时会发生什么,或者当一个线程正在执行某些东西时,nit是否被多次更新... 这很困难,但为了得到一个好的、具体的答案,你必须创建一个[mcve]。 - Zulan
感谢@Zulan的评论。我编辑了问题,在结尾处添加了一个最小工作示例。希望这样能澄清事情。 - Francisco Herrerias-Azcue
1个回答

4
您可以基本上按照与此问题相同的概念操作,稍微变化以确保avRes不会并行写入:
int nit = 0;
#pragma omp parallel
while(1) {
     int local_nit;
     #pragma omp atomic read
     local_nit = nit;
     if (local_nit >= avit) {
          break;
     }

     [...]

     if (...) { 
          #pragma omp critical
          {
                #pragma omp atomic capture
                local_nit = ++nit;
                for(j=0;j<jmax;j++){
                    avRes[j] += Res[j];
                } 
                for(j=0;j<jmax;j++){
                    // technically you could also use `nit` directly since
                    // now `nit` is only modified within this critical section
                    cout<<avRes[j]/local_nit<<"\t";
                }
          }
     } else {
          #pragma omp atomic update
          dit++;
     }
 }

它也可以与关键区域一起使用,但原子操作更高效。

还有一件事需要考虑,rand() 不应该在并行上下文中使用。参见此问题。对于C++,应该使用<random>中的私有(即在并行区域内定义)随机数生成器。


谢谢@Zulan,稍作修改后,这似乎(几乎)可以解决问题!我对atomic不是很熟悉,但它不允许我像critical一样添加代码块...我需要所有线程都能够写入结果,并且它们比仅在最后更频繁地写入...如果我使用#pragma omp critical {local_nit = nit ++; for(j=0;j<jmax;j++){avRes[j]+=Res[j];}[...]},它可以工作,但我想知道如何使用atomic,因为你提到它更有效率。另外,第一个问题的链接似乎有误... - Francisco Herrerias-Azcue
这有点更有意义了...但这只会在程序末尾再次写入,对吗?我需要能够更频繁地检查结果,所以我认为我确实需要一个关键段...我能否在if语句中“隐藏它”:#pragma omp atomic capture local_wnit=++wnit; if(local_wnit==wavit){ #pragma omp critical { \\write; wnit-=local_wnit;} } 其中wnit是全局写计数器,local_wnit是本地处理的变量,wavit是一个共享变量,表示写事件之间的迭代次数。这样做有帮助还是一样的?感谢您修复链接。 - Francisco Herrerias-Azcue
如果您需要在每个增量更新结果(抱歉,我误读了您的问题),那么最好使用关键段进行更新。请参见我的编辑答案。如果性能是一个问题,并且jmax相对较小,则可以使用reduction来进行求和。最重要的是,您不希望在关键段中读取计数器。 - Zulan
谢谢@Zulan。为什么在关键部分读取计数器是不好的?我需要输出avRes/nit,所以是否最好做#pragma omp atomic update local_nit=++nit; #pragma omp critical { for(j=0;j<jmax;j++){ avRes[j] += Res[j]; cout<<avRes[j]/local_nit;}?在我的当前代码中,我在关键部分增加了nit,然后直接使用nit进行平均值计算,因为我认为这样可以防止使用错误的nit计算平均值...但我没有意识到在关键部分内读取计数器是不好的形式... - Francisco Herrerias-Azcue
计数器在每次迭代中都会被读取,临界区比原子读取显然更加昂贵。是的,最好使用局部副本来计算平均值。但是,在进入临界区之前这样做是危险的,因为可能会使用过时的nit。相反,将++nit移动到临界区内。然后无论您使用nit还是local_nit都没有关系。请查看我的更新。我认为输出对于您的示例并不重要。 - Zulan
显示剩余3条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接