先决条件:
- 并行引擎: OpenMP 3.1+ (如果需要,也可以是OpenMP 4.0)
- 并行结构:OpenMP 任务
- 编译器:gcc 4.9.x(支持OpenMP 4.0)
输入:
- 具有循环的C代码
- 循环具有跨迭代数据依赖关系,即“i + 1”迭代需要从“i”迭代中获取数据(仅此类依赖关系,没有其他内容)
- 循环主体可能存在部分依赖
- 循环不能分成两个循环;循环主体应保持完整
- 可以合理地添加到循环或循环主体函数定义中的任何内容
代码示例:
(这里只是为了说明而使用了conf / config / configData变量,主要是在value / valueData变量中。)
void loopFunc(const char* config, int* value)
{
int conf;
conf = prepare(config); // independent, does not change “config”
*value = process(conf, *value); // dependent, takes prev., produce next
return;
}
int main()
{
int N = 100;
char* configData; // never changes
int valueData = 0; // initial value
…
for (int i = 0; i < N; i++)
{
loopFunc(configData, &valueData);
}
…
}
需要:
- 使用omp任务并行化循环(不能使用omp for / omp sections)
- “准备”函数应与其他“准备”或“处理”函数并行执行
- “处理”函数应根据数据依赖关系排序
已提出和实施的方案:
- 定义整数标志
- 将其分配给第一次迭代的数量
- 每次迭代需要数据时,等待标志等于其迭代
- 当下一次迭代的数据准备好时,更新标志值
就像这样:
(我提醒您,conf / config / configData变量仅用于说明目的,主要兴趣在于value / valueData变量。)
void loopFunc(const char* config, int* value, volatile int *parSync, int iteration)
{
int conf;
conf = prepare(config); // independent, do not change “config”
while (*parSync != iteration) // wait for previous to be ready
{
#pragma omp taskyield
}
*value = process(conf, *value); // dependent, takes prev., produce next
*parSync = iteration + 1; // inform next about readiness
return;
}
int main()
{
int N = 100;
char* configData; // never changes
int valueData = 0; // initial value
volatile int parallelSync = 0;
…
omp_set_num_threads(5);
#pragma omp parallel
#pragma omp single
for (int i = 0; i < N; i++)
{
#pragma omp task shared(configData, valueData, parallelSync) firstprivate(i)
loopFunc(configData, &valueData, ¶llelSync, i);
}
#pragma omp taskwait
…
}
发生了什么:
它失败了。 :)
原因是openmp任务占用了openmp线程。 例如,如果我们定义了5个openmp线程(如上面的代码)。
- “For”循环生成100个任务。
- OpenMP运行时将5个任意任务分配给5个线程并启动这些任务。
如果已经开始的任务中没有i=0的任务(有时会发生),执行任务会一直等待,永远占据线程,并且i=0的任务永远不会开始。
接下来怎么办?
我没有其他想法如何实现所需的计算模式。
目前的解决方案
感谢下面的@parallelgeek提出的想法
int main()
{
int N = 10;
char* configData; // never changes
int valueData = 0; // initial value
volatile int parallelSync = 0;
int workers;
volatile int workingTasks = 0;
...
omp_set_num_threads(5);
#pragma omp parallel
#pragma omp single
{
workers = omp_get_num_threads()-1; // reserve 1 thread for task generation
for (int i = 0; i < N; i++)
{
while (workingTasks >= workers)
{
#pragma omp taskyield
}
#pragma omp atomic update
workingTasks++;
#pragma omp task shared(configData, valueData, parallelSync, workingTasks) firstprivate(i)
{
loopFunc(configData, &valueData, ¶llelSync, i);
#pragma omp atomic update
workingTasks--;
}
}
#pragma omp taskwait
}
}
process()
到底是什么(做什么)? - MichiOMP_WAIT_POLICY
将其设置为PASSIVE
,以便更好地进行任务调度... - Gilles