使用OpenMP实现C++并行矩阵求平均值

4

我有一段C++代码,可以计算矩阵每列的平均值。我想使用OpenMP并行化这段代码。

#include <vector>
#include <cstdlib>
#include <chrono>
#include <iostream>
using namespace std;

vector<double> average(const vector<vector<unsigned char>>& original){
  vector<vector<double>> result(original.size(), vector<double>(original[0].size()));
  vector<double> average(original[0].size(), 0.0);

  for (int i=0; i<original.size(); i++) {
    const vector<unsigned char>& vector = original[i];
    for (int k = 0; k < vector.size(); ++k) {
      average[k] += vector[k];
    }
  }
  for (double& val : average) {
    val /= original.size();
  }

  return average;
}

在外部for循环之前添加#pragma omp parallel for会导致我得到错误的结果。你有什么建议吗?我以为我会在网上找到大量关于此的示例,但并没有找到太多。这是我第一次使用OpenMP。


1
坦白地说,这更像是SIMD的工作而不是线程。您的优化器循环向量化程序可能已经做得很好了。用线程来超越它将会很困难。 - user4442671
在涉及线程之前,我会先看一下将值累加为整数,并仅在最后的除法步骤中将它们提升为双精度浮点数。 - user4442671
你处理的矩阵大小是多少? - Zulan
@Zulan 我一直在做有大约20,000列和500行的矩阵实验。 - Pablo M
3个回答

3

弗兰克说得对,你目前的问题可能是你正在使用一个非原子操作:

average[k] += vector[k];

您可以通过使用以下方式进行修复:
#pragma omp atomic
average[k] += vector[k];

但更大的概念问题是,这可能不会加速你的代码。你正在执行的操作非常简单,而且内存(至少行)是连续的。

确实,我已经为您的代码制作了一个最小工作示例(您应该对您的问题做到这一点):

#include <vector>
#include <cstdlib>
#include <chrono>
#include <iostream>
using namespace std;

vector<float> average(const vector<vector<unsigned char>>& original){
  vector<float> average(original[0].size(), 0.0);

  #pragma omp parallel for
  for (int i=0; i<original.size(); i++) {
    const vector<unsigned char>& vector = original[i];
    for (int k = 0; k < vector.size(); ++k) {
      #pragma omp atomic
      average[k] += vector[k];
    }
  }
  for (float& val : average) {
    val /= original.size();
  }

  return average;
}

int main(){
  vector<vector<unsigned char>> mat(1000);
  for(int y=0;y<mat.size();y++)
  for(int x=0;x<mat.size();x++)
    mat.at(y).emplace_back(rand()%255);

  std::chrono::steady_clock::time_point begin = std::chrono::steady_clock::now();
  double dont_optimize = 0;
  for(int i=0;i<100;i++){
    auto ret = average(mat);
    dont_optimize += ret[0];
  }
  std::chrono::steady_clock::time_point end = std::chrono::steady_clock::now();

  std::cout<<"Time = "<<(std::chrono::duration_cast<std::chrono::microseconds>(end - begin).count()/100)<<std::endl;

  return 0;
}

使用 g++ -O3 temp.cpp -fopenmp 编译可以启用 OpenMP。在我的四核机器上,运行时间始终约为 10,247 微秒。禁用 OpenMP 后,运行时间约为 2,561 微秒。
开启和管理线程团队是昂贵的。
但有一种真正的方法可以加速您的代码:改善内存布局。
使用 std::vector< std::vector<T> > 设计意味着每个 vector<T> 可以位于内存中的任何位置。相反,我们希望所有内存都是连续的。我们可以通过使用平坦数组索引来实现这一点,如下所示:
(请注意,下面代码的早期版本使用了例如 mat.at(y*width+x)。这意味着范围检查导致了与现在使用的 mat[y*width+x] 相比显着的速度损失。已相应地更新时间。)
#include <vector>
#include <cstdlib>
#include <chrono>
#include <iostream>
using namespace std;

class Matrix {
 public:
  vector<unsigned char> mat;
  int width;
  int height;
  Matrix(int width0, int height0){
    width  = width0;
    height = height0;
    for(int i=0;i<width*height;i++)
      mat.emplace_back(rand()%255);
  }
  unsigned char& operator()(int x, int y){
    return mat[y*width+x];
  }
  unsigned char operator()(int x, int y) const {
    return mat[y*width+x];
  }
  unsigned char& operator()(int i){
    return mat[i];
  }
  unsigned char operator()(int i) const {
    return mat[i];
  }
};

vector<float> average(const Matrix& original){
  vector<float> average(original.width, 0.0);

  #pragma omp parallel for
  for(int y=0;y<original.height;y++)
  for(int x=0;x<original.width;x++)
    #pragma omp atomic
    average[x] += original(x,y);

  for (float& val : average) 
    val /= original.height;

  return average;
}

int main(){
  Matrix mat(1000,1000);

  std::cerr<<mat.width<<" "<<mat.height<<std::endl;

  std::chrono::steady_clock::time_point begin = std::chrono::steady_clock::now();
  double dont_optimize = 0;
  for(int i=0;i<100;i++){
    auto ret = average(mat);
    dont_optimize += ret[0];
  }
  std::chrono::steady_clock::time_point end = std::chrono::steady_clock::now();

  std::cout<<"Time = "<<(std::chrono::duration_cast<std::chrono::microseconds>(end - begin).count()/100)<<std::endl;

  return 0;
}

请注意,我使用的是float而不是double:这样可以将两倍的数字压缩到同样的空间中,这对于缓存来说很有好处。
这将在没有OpenMP的情况下运行292微秒,在OpenMP的情况下运行9426微秒。
总之,使用OpenMP/并行会使代码变慢,因为并行处理所需的时间比设置并行处理的时间更短,但使用更好的内存布局可以提高约90%的速度。此外,使用方便的Matrix类可以提高代码的可读性和可维护性。
编辑:
将其运行在10,000x10,000而不是1,000x1,000的矩阵上会得到类似的结果。对于向量的向量:没有OpenMP的情况下为7,449微秒,有OpenMP的情况下为156,316微秒。对于平面数组索引:没有OpenMP的情况下为32,668微秒,有OpenMP的情况下为145,470微秒。
性能可能与我的机器上可用的硬件指令集有关(特别是,如果我的机器缺少原子指令,那么OpenMP将不得不用互斥锁等模拟它们)。事实上,在平面数组示例中,使用-march=native编译可以改善OpenMP的性能,尽管仍然不太好:没有OpenMP的情况下为33,079微秒,有OpenMP的情况下为127,841微秒。我稍后会尝试在更强大的机器上进行实验。
编辑:
虽然上述测试是在Intel(R) Core(TM) i5 CPU M 480 @ 2.67GHz上执行的,但我已经在超级厉害的Intel(R) Xeon(R) CPU E5-2680 v3 @ 2.50GHz上编译了这段代码(使用-O3 -march=native)。结果类似:
  • 1000x1000,向量的向量,没有OpenMP:145μs
  • 1000x1000,向量的向量,有OpenMP:2,941μs
  • 10000x10000,向量的向量,没有OpenMP:20,254μs
  • 10000x10000,向量的向量,有OpenMP:85,703μs
  • 1000x1000,平面数组,没有OpenMP:139μs
  • 1000x1000,平面数组,有OpenMP:3,171μs
  • 10000x10000,平面数组,没有OpenMP:18,712μs
  • 10000x10000,平面数组,有OpenMP:89,097μs
这证实了我们之前的结果:即使你的硬件非常棒,使用OpenMP也往往会使事情变慢。实际上,两个处理器之间的大部分加速可能是由于Xeon的大型L3缓存大小:它的大小为30,720K,比i5上的3,720K缓存大10倍。

编辑

Zulan的简化策略从下面的答案中纳入,可以有效地利用并行处理:

vector<float> average(const Matrix& original){
  vector<float> average(original.width, 0.0);
  auto average_data = average.data();

  #pragma omp parallel for reduction(+ : average_data[ : original.width])
  for(int y=0;y<original.height;y++){
    for(int x=0;x<original.width;x++)
      average_data[x] += original(x,y);
  }

  for (float& val : average) 
    val /= original.height;

  return average;
}

对于24个线程,在10,000x10,000的数组上运行时间为2629微秒:相对于串行版本,提高了7.1倍。在您原始代码上使用Zulan的策略(不使用平坦的数组索引)需要3529微秒,因此通过使用更好的布局仍然可以获得25%的加速。


1
哇,非常感谢您提供如此详细的解释!我原以为可以轻松获得很多性能,但显然不是这样。我将尝试运行一些实验,看看是否可以在大矩阵上获得收益。 - Pablo M
这个答案的结论严重偏向于假设矩阵很小。 - Zulan
@Zulan:结论的一部分。无论矩阵大小如何,使用平坦数组索引都会更快。尽管如此,我现在对10,000x10,000的矩阵进行测试,并得出相同的结果:我假设这是由于原子操作的同步效应所致。您可以将大小调整得更大,看看是否会发现不同的结果。 - Richard
我已经发布了一个补充回答,解释了如何使用线程来实际加速此过程。我相当确定您的性能问题源于在“Mat :: operator()”实现中使用运行时检查的“.at”,而不是“operator []”。通常,编译器将为具有这种访问方式的循环生成合理的代码,您无需手动进行连续遍历。 - Zulan
@Zulan:使用operator[]而不是.at()确实提高了速度;我已经相应地修改了时间。我认为编译器仍然生成次优代码,因为它们使用vaddss而不是vaddps - Richard
我使用GCC 7.1的-O3 -march=native选项,加上大量混乱的整数/浮点转换,得到了一个漂亮的4路展开的vaddps ymm循环。 - Zulan

2

Frank和Richard的基本问题是正确的。有关内存布局的提示也是正确的。但是,可以比使用原子操作更好地完成此任务。仅使用原子增量的并行循环不太可能扩展得很好,因为原子数据访问非常昂贵,并且从所有线程向完全共享的内存空间写入会导致缓存性能下降。

归约

基本思路是首先计算本地求和向量,然后稍后安全地将这些向量相加。这样,大部分工作都可以独立且高效地完成。最近的OpenMP版本使其非常方便。

以下是示例代码,基于Richard的示例-但我交换了索引并修复了operator()的效率。

#include <chrono>
#include <cstdlib>
#include <iostream>
#include <memory>
#include <vector>

class Matrix {
public:
  std::vector<unsigned char> mat;
  int width;
  int height;
  Matrix(int width0, int height0) {
    srand(0);
    width = width0;
    height = height0;
    for (int i = 0; i < width * height; i++)
      mat.emplace_back(rand() % 255);
  }
  unsigned char &operator()(int row, int col) { return mat[row * width + col]; }
  unsigned char operator()(int row, int col) const {
    // do not use at here, the extra check is too expensive for the tight loop
    return mat[row * width + col];
  }
};

std::vector<float> __attribute__((noinline)) average(const Matrix &original) {
  std::vector<float> average(original.width, 0.0);
  // We can't do array reduction directly on vectors
  auto average_data = average.data();

  #pragma omp parallel reduction(+ : average_data[ : original.width])
  {
    #pragma omp for
    for (int row = 0; row < original.height; row++) {
      for (int col = 0; col < original.width; col++) {
        average_data[col] += original(row, col);
      }
    }
  }
  for (float &val : average) {
    val /= original.height;
  }
  return average;
}

int main() {
  Matrix mat(500, 20000);

  std::cerr << mat.width << " " << mat.height << std::endl;

  std::chrono::steady_clock::time_point begin = chrono::steady_clock::now();
  double dont_optimize = 0;
  for (int i = 0; i < 100; i++) {
    auto ret = average(mat);
    dont_optimize += ret[0];
  }
  std::chrono::steady_clock::time_point end = std::chrono::steady_clock::now();

  std::cout << "Time = "
            << (std::chrono::duration_cast<std::chrono::microseconds>(end-begin).count() / 100.)
            << "\n" << optimize << std::endl;
  return 0;
}

对于给定的矩阵大小,使用12个线程在2.5 GHz名义频率的Intel Xeon E5-2680 v3上,将时间从约1.8毫秒减少到约0.3毫秒。

循环切换

另一种方法是并行化内部循环,因为它的迭代彼此独立。但是,由于每个线程的工作量较小,这样会更慢。然后可以交换内部和外部循环,但这会使内存访问不连续,这也会影响性能。因此,最好的方法是将内部循环拆分如下:

constexpr size_t chunksize = 128;
#pragma omp parallel for
for (size_t col_chunk = 0; col_chunk < original.width; col_chunk += chunksize) {
  for (size_t row = 0; row < original.height; row++) {
    const auto col_end = std::min(col_chunk + chunksize, original.width);
    for (size_t col = col_chunk; col < col_end; col++) {

这样可以让您合理地连续访问内存,同时避免所有线程之间的交互。然而,在线程边界处仍可能存在一些虚假共享。我无法轻松地获得非常好的性能,但在足够数量的线程下仍比串行更快。


这很好-谢谢。从这里唯一可以去的地方就是让AVX工作起来。 - Richard

0

average[k] += vector[k];不是原子操作。

每个线程可能(并且很可能)在相同的时间读取当前的k值,添加到它上面,并将值写回。

这些类型的跨线程数据竞争是未定义行为。

编辑: 一个简单的解决方法是倒转循环顺序,并在k循环上进行并行化。这样,每个线程只会写入一个值。但是,您将通过K乘以顶层向量上的查找次数,因此可能不会获得太大的性能提升,因为您将开始强制使用缓存。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接