部分并行循环使用OpenMP任务

6

先决条件:

  • 并行引擎: OpenMP 3.1+ (如果需要,也可以是OpenMP 4.0)
  • 并行结构:OpenMP 任务
  • 编译器:gcc 4.9.x(支持OpenMP 4.0)

输入:

  • 具有循环的C代码
  • 循环具有跨迭代数据依赖关系,即“i + 1”迭代需要从“i”迭代中获取数据(仅此类依赖关系,没有其他内容)
  • 循环主体可能存在部分依赖
  • 循环不能分成两个循环;循环主体应保持完整
  • 可以合理地添加到循环或循环主体函数定义中的任何内容

代码示例:

(这里只是为了说明而使用了conf / config / configData变量,主要是在value / valueData变量中。)

void loopFunc(const char* config, int* value)
{
    int conf;
    conf = prepare(config);         // independent, does not change “config”
    *value = process(conf, *value); // dependent, takes prev., produce next
    return;
}

int main()
{
    int N = 100;
    char* configData;           // never changes
    int valueData = 0;          // initial valuefor (int i = 0; i < N; i++)
    {
        loopFunc(configData, &valueData);
    }
    …
}

需要:

  • 使用omp任务并行化循环(不能使用omp for / omp sections)
  • “准备”函数应与其他“准备”或“处理”函数并行执行
  • “处理”函数应根据数据依赖关系排序

已提出和实施的方案:

  • 定义整数标志
  • 将其分配给第一次迭代的数量
  • 每次迭代需要数据时,等待标志等于其迭代
  • 当下一次迭代的数据准备好时,更新标志值

就像这样:

(我提醒您,conf / config / configData变量仅用于说明目的,主要兴趣在于value / valueData变量。)

void loopFunc(const char* config, int* value, volatile int *parSync, int iteration)
{
    int conf;
    conf = prepare(config);         // independent, do not change “config”
    while (*parSync != iteration)   // wait for previous to be ready
    {
        #pragma omp taskyield
    }
    *value = process(conf, *value); // dependent, takes prev., produce next
    *parSync = iteration + 1;       // inform next about readiness
    return;
}

int main()
{
    int N = 100;
    char* configData;           // never changes
    int valueData = 0;          // initial value
    volatile int parallelSync = 0;
    …
    omp_set_num_threads(5);
    #pragma omp parallel
    #pragma omp single
    for (int i = 0; i < N; i++)
    {
        #pragma omp task shared(configData, valueData, parallelSync) firstprivate(i)
            loopFunc(configData, &valueData, &parallelSync, i);
    }
    #pragma omp taskwait
    …
}

发生了什么:

它失败了。 :)

原因是openmp任务占用了openmp线程。 例如,如果我们定义了5个openmp线程(如上面的代码)。

  • “For”循环生成100个任务。
  • OpenMP运行时将5个任意任务分配给5个线程并启动这些任务。

如果已经开始的任务中没有i=0的任务(有时会发生),执行任务会一直等待,永远占据线程,并且i=0的任务永远不会开始。

接下来怎么办?

我没有其他想法如何实现所需的计算模式。

目前的解决方案

感谢下面的@parallelgeek提出的想法

int main()
{
    int N = 10;
    char* configData;           // never changes
    int valueData = 0;          // initial value
    volatile int parallelSync = 0;
    int workers;
    volatile int workingTasks = 0;
    ...
    omp_set_num_threads(5);
    #pragma omp parallel
    #pragma omp single
    {
        workers = omp_get_num_threads()-1;  // reserve 1 thread for task generation

        for (int i = 0; i < N; i++)
        {
            while (workingTasks >= workers)
            {
                #pragma omp taskyield
            }

            #pragma omp atomic update
                workingTasks++;

            #pragma omp task shared(configData, valueData, parallelSync, workingTasks) firstprivate(i)
            {
                loopFunc(configData, &valueData, &parallelSync, i);

                #pragma omp atomic update
                    workingTasks--;
            }
        }
        #pragma omp taskwait
    }
}

1
让我们看一下,你声明了 *char configData;**,然后将它传递给 **loopFunc(),在loopFunc()**内部,你将它传递给 **process(conf, *value);问题:** process() 到底是什么(做什么)? - Michi
@Michi:这里的conf/config/configData变量仅用于说明目的,主要关注点在于value/valueData变量。已更新帖子。“Prepare”说明了一个与值无关但需要时间且不与其他迭代交互的函数。“Process”说明了一个从上一次迭代中获取数据并为下一次迭代生成新值的函数。 - Badiboy
@Gilles,“int &xxx”据我所知,只能在C++中使用?我需要与C兼容...因此“flush”似乎根本不可能,因为如果omp不允许刷新类似于flush(*parSync)的解除引用的值... - Badiboy
2
还有一件困扰我的事情是标准说“每当一个线程到达任务调度点时,实现可能会导致其执行任务切换,开始或恢复执行绑定到当前团队的不同任务。”这里没有义务,因此您的代码行为无论如何都是实现定义的,不是吗?但是以防万一,我也会尝试使用OMP_WAIT_POLICY将其设置为PASSIVE,以便更好地进行任务调度... - Gilles
@Gilles,是的,解决方案的行为是不可预测的,这是主要问题。这就是为什么我在这里提出问题的原因。 - Badiboy
去掉 volatile 怎么样?你已经向编译器提供了足够的信息,表明 workingTasks 是一个共享变量,所以在我看来,它不应该再施加它的黑魔法了。关于这个问题,OpenMP 标准有什么说法? - parallelgeek
1个回答

1
  1. 据我所知,volatile变量不能防止硬件重排序,这就是为什么你可能会在内存中出现混乱的原因,因为数据尚未被写入,而标志已经被消费线程视为true。
  2. 这就是为什么建议使用C11原子操作来确保数据的可见性。据我所知,gcc 4.9支持c11 C11Status in GCC
  3. 您可以尝试将生成的任务分成K个任务组,其中K == ThreadNum,并且只有在运行的任何任务完成后才开始生成后续任务(在第一组任务生成后)。因此,您有一个不变量,即每次您只有K个任务正在运行并计划在K个线程上运行
  4. 使用C11的原子标志也可以满足任务之间的依赖关系。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接