最高效的遍历Eigen矩阵的方法是什么?

15

我正在创建一些函数来执行像负数和正数的“分离求和”,kahan,成对等其他操作,其中从矩阵中取元素的顺序并不重要,例如:

template <typename T, int R, int C>
inline T sum(const Eigen::Matrix<T,R,C>& xs)
{
  T sumP(0);
  T sumN(0);
  for (size_t i = 0, nRows = xs.rows(), nCols = xs.cols(); i < nRows; ++i)
   for (size_t j = 0; j < nCols; ++j)
   {
        if (xs(i,j)>0)
          sumP += xs(i,j);
        else if (xs(i,j)<0) //ignore 0 elements: improvement for sparse matrices I think
          sumN += xs(i,j);
   }
 return sumP+sumN;
}

现在,我希望尽可能地提高效率,所以我的问题是,是像上面那样循环遍历每一行的每一列更好,还是像下面这样做相反的操作:

for (size_t i = 0, nRows = xs.rows(), nCols = xs.cols(); i < nCols; ++i)
  for (size_t j = 0; j < nRows; ++j)

我认为这取决于矩阵元素在内存中分配的顺序,但我在Eigen的手册中找不到相关信息。此外,是否有其他替代方法(例如使用迭代器,Eigen是否支持?),可能会更快一些?
5个回答

19

我进行了一些基准测试,以查看哪种方式更快,我得到了以下结果(以秒为单位):

12
30
3
6
23
3

第一行按照@jleahy的建议进行迭代。 第二行按照我在问题中的代码进行迭代(与@jleahy相反)。 第三行使用PlainObjectBase :: data()进行迭代,如下所示:for (int i = 0; i < matrixObject.size(); i++)。 其他三行重复上述内容,但使用@lucas92建议的临时变量。 我还进行了相同的测试,但使用/if else。*/替换/else/(对于稀疏矩阵没有特殊处理),并得到以下结果(以秒为单位):
10
27
3
6
24
2

再次进行测试,结果非常相似。我使用的是g++ 4.7.3-O3。代码如下:

#include <ctime>
#include <iostream>
#include <Eigen/Dense>

using namespace std;

 template <typename T, int R, int C>
    inline T sum_kahan1(const Eigen::Matrix<T,R,C>& xs) {
      if (xs.size() == 0) return 0;
      T sumP(0);
      T sumN(0);
      T tP(0);
      T tN(0);
      T cP(0);
      T cN(0);
      T yP(0);
      T yN(0);
      for (size_t i = 0, nRows = xs.rows(), nCols = xs.cols(); i < nCols; ++i)
      for (size_t j = 0; j < nRows; ++j)
      {
          if (xs(j,i)>0)
          {
            yP = xs(j,i) - cP;
          tP = sumP + yP;
          cP = (tP - sumP) - yP;
          sumP = tP;
          }
        else if (xs(j,i)<0)
          {
            yN = xs(j,i) - cN;
          tN = sumN + yN;
          cN = (tN - sumN) - yN;
          sumN = tN;
          }
      }
      return sumP+sumN;
    }

 template <typename T, int R, int C>
    inline T sum_kahan2(const Eigen::Matrix<T,R,C>& xs) {
      if (xs.size() == 0) return 0;
      T sumP(0);
      T sumN(0);
      T tP(0);
      T tN(0);
      T cP(0);
      T cN(0);
      T yP(0);
      T yN(0);
      for (size_t i = 0, nRows = xs.rows(), nCols = xs.cols(); i < nRows; ++i)
      for (size_t j = 0; j < nCols; ++j)
      {
          if (xs(i,j)>0)
          {
            yP = xs(i,j) - cP;
          tP = sumP + yP;
          cP = (tP - sumP) - yP;
          sumP = tP;
          }
        else if (xs(i,j)<0)
          {
            yN = xs(i,j) - cN;
          tN = sumN + yN;
          cN = (tN - sumN) - yN;
          sumN = tN;
          }
      }
      return sumP+sumN;
    }


 template <typename T, int R, int C>
    inline T sum_kahan3(const Eigen::Matrix<T,R,C>& xs) {
      if (xs.size() == 0) return 0;
      T sumP(0);
      T sumN(0);
      T tP(0);
      T tN(0);
      T cP(0);
      T cN(0);
      T yP(0);
      T yN(0);
    for (size_t i = 0, size = xs.size(); i < size; i++)
      {
          if ((*(xs.data() + i))>0)
          {
            yP = (*(xs.data() + i)) - cP;
          tP = sumP + yP;
          cP = (tP - sumP) - yP;
          sumP = tP;
          }
        else if ((*(xs.data() + i))<0)
          {
            yN = (*(xs.data() + i)) - cN;
          tN = sumN + yN;
          cN = (tN - sumN) - yN;
          sumN = tN;
          }
      }
      return sumP+sumN;
    }

 template <typename T, int R, int C>
    inline T sum_kahan1t(const Eigen::Matrix<T,R,C>& xs) {
      if (xs.size() == 0) return 0;
      T sumP(0);
      T sumN(0);
      T tP(0);
      T tN(0);
      T cP(0);
      T cN(0);
      T yP(0);
      T yN(0);
      for (size_t i = 0, nRows = xs.rows(), nCols = xs.cols(); i < nCols; ++i)
      for (size_t j = 0; j < nRows; ++j)
      {
      T temporary = xs(j,i);
          if (temporary>0)
          {
            yP = temporary - cP;
          tP = sumP + yP;
          cP = (tP - sumP) - yP;
          sumP = tP;
          }
        else if (temporary<0)
          {
            yN = temporary - cN;
          tN = sumN + yN;
          cN = (tN - sumN) - yN;
          sumN = tN;
          }
      }
      return sumP+sumN;
    }

 template <typename T, int R, int C>
    inline T sum_kahan2t(const Eigen::Matrix<T,R,C>& xs) {
      if (xs.size() == 0) return 0;
      T sumP(0);
      T sumN(0);
      T tP(0);
      T tN(0);
      T cP(0);
      T cN(0);
      T yP(0);
      T yN(0);
      for (size_t i = 0, nRows = xs.rows(), nCols = xs.cols(); i < nRows; ++i)
      for (size_t j = 0; j < nCols; ++j)
      {
      T temporary = xs(i,j);
          if (temporary>0)
          {
            yP = temporary - cP;
          tP = sumP + yP;
          cP = (tP - sumP) - yP;
          sumP = tP;
          }
        else if (temporary<0)
          {
            yN = temporary - cN;
          tN = sumN + yN;
          cN = (tN - sumN) - yN;
          sumN = tN;
          }
      }
      return sumP+sumN;
    }


 template <typename T, int R, int C>
    inline T sum_kahan3t(const Eigen::Matrix<T,R,C>& xs) {
      if (xs.size() == 0) return 0;
      T sumP(0);
      T sumN(0);
      T tP(0);
      T tN(0);
      T cP(0);
      T cN(0);
      T yP(0);
      T yN(0);
    for (size_t i = 0, size = xs.size(); i < size; i++)
      {
      T temporary = (*(xs.data() + i));
          if (temporary>0)
          {
            yP = temporary - cP;
          tP = sumP + yP;
          cP = (tP - sumP) - yP;
          sumP = tP;
          }
        else if (temporary<0)
          {
            yN = temporary - cN;
          tN = sumN + yN;
          cN = (tN - sumN) - yN;
          sumN = tN;
          }
      }
      return sumP+sumN;
    }

 template <typename T, int R, int C>
    inline T sum_kahan1e(const Eigen::Matrix<T,R,C>& xs) {
      if (xs.size() == 0) return 0;
      T sumP(0);
      T sumN(0);
      T tP(0);
      T tN(0);
      T cP(0);
      T cN(0);
      T yP(0);
      T yN(0);
      for (size_t i = 0, nRows = xs.rows(), nCols = xs.cols(); i < nCols; ++i)
      for (size_t j = 0; j < nRows; ++j)
      {
          if (xs(j,i)>0)
          {
            yP = xs(j,i) - cP;
          tP = sumP + yP;
          cP = (tP - sumP) - yP;
          sumP = tP;
          }
        else
          {
            yN = xs(j,i) - cN;
          tN = sumN + yN;
          cN = (tN - sumN) - yN;
          sumN = tN;
          }
      }
      return sumP+sumN;
    }

 template <typename T, int R, int C>
    inline T sum_kahan2e(const Eigen::Matrix<T,R,C>& xs) {
      if (xs.size() == 0) return 0;
      T sumP(0);
      T sumN(0);
      T tP(0);
      T tN(0);
      T cP(0);
      T cN(0);
      T yP(0);
      T yN(0);
      for (size_t i = 0, nRows = xs.rows(), nCols = xs.cols(); i < nRows; ++i)
      for (size_t j = 0; j < nCols; ++j)
      {
          if (xs(i,j)>0)
          {
            yP = xs(i,j) - cP;
          tP = sumP + yP;
          cP = (tP - sumP) - yP;
          sumP = tP;
          }
        else
          {
            yN = xs(i,j) - cN;
          tN = sumN + yN;
          cN = (tN - sumN) - yN;
          sumN = tN;
          }
      }
      return sumP+sumN;
    }


 template <typename T, int R, int C>
    inline T sum_kahan3e(const Eigen::Matrix<T,R,C>& xs) {
      if (xs.size() == 0) return 0;
      T sumP(0);
      T sumN(0);
      T tP(0);
      T tN(0);
      T cP(0);
      T cN(0);
      T yP(0);
      T yN(0);
    for (size_t i = 0, size = xs.size(); i < size; i++)
      {
          if ((*(xs.data() + i))>0)
          {
            yP = (*(xs.data() + i)) - cP;
          tP = sumP + yP;
          cP = (tP - sumP) - yP;
          sumP = tP;
          }
        else
          {
            yN = (*(xs.data() + i)) - cN;
          tN = sumN + yN;
          cN = (tN - sumN) - yN;
          sumN = tN;
          }
      }
      return sumP+sumN;
    }

 template <typename T, int R, int C>
    inline T sum_kahan1te(const Eigen::Matrix<T,R,C>& xs) {
      if (xs.size() == 0) return 0;
      T sumP(0);
      T sumN(0);
      T tP(0);
      T tN(0);
      T cP(0);
      T cN(0);
      T yP(0);
      T yN(0);
      for (size_t i = 0, nRows = xs.rows(), nCols = xs.cols(); i < nCols; ++i)
      for (size_t j = 0; j < nRows; ++j)
      {
      T temporary = xs(j,i);
          if (temporary>0)
          {
            yP = temporary - cP;
          tP = sumP + yP;
          cP = (tP - sumP) - yP;
          sumP = tP;
          }
        else
          {
            yN = temporary - cN;
          tN = sumN + yN;
          cN = (tN - sumN) - yN;
          sumN = tN;
          }
      }
      return sumP+sumN;
    }

 template <typename T, int R, int C>
    inline T sum_kahan2te(const Eigen::Matrix<T,R,C>& xs) {
      if (xs.size() == 0) return 0;
      T sumP(0);
      T sumN(0);
      T tP(0);
      T tN(0);
      T cP(0);
      T cN(0);
      T yP(0);
      T yN(0);
      for (size_t i = 0, nRows = xs.rows(), nCols = xs.cols(); i < nRows; ++i)
      for (size_t j = 0; j < nCols; ++j)
      {
      T temporary = xs(i,j);
          if (temporary>0)
          {
            yP = temporary - cP;
          tP = sumP + yP;
          cP = (tP - sumP) - yP;
          sumP = tP;
          }
        else
          {
            yN = temporary - cN;
          tN = sumN + yN;
          cN = (tN - sumN) - yN;
          sumN = tN;
          }
      }
      return sumP+sumN;
    }


 template <typename T, int R, int C>
    inline T sum_kahan3te(const Eigen::Matrix<T,R,C>& xs) {
      if (xs.size() == 0) return 0;
      T sumP(0);
      T sumN(0);
      T tP(0);
      T tN(0);
      T cP(0);
      T cN(0);
      T yP(0);
      T yN(0);
    for (size_t i = 0, size = xs.size(); i < size; i++)
      {
      T temporary = (*(xs.data() + i));
          if (temporary>0)
          {
            yP = temporary - cP;
          tP = sumP + yP;
          cP = (tP - sumP) - yP;
          sumP = tP;
          }
        else
          {
            yN = temporary - cN;
          tN = sumN + yN;
          cN = (tN - sumN) - yN;
          sumN = tN;
          }
      }
      return sumP+sumN;
    }


int main() {

    Eigen::Matrix<double, Eigen::Dynamic, Eigen::Dynamic> test = Eigen::Matrix<double, Eigen::Dynamic, Eigen::Dynamic>::Random(10000,10000);

    cout << "start" << endl;   
    int now;

    now = time(0);
    sum_kahan1(test);
    cout << time(0) - now << endl;

    now = time(0);
    sum_kahan2(test);
    cout << time(0) - now << endl;

    now = time(0);
    sum_kahan3(test);
    cout << time(0) - now << endl;

    now = time(0);
    sum_kahan1t(test);
    cout << time(0) - now << endl;

    now = time(0);
    sum_kahan2t(test);
    cout << time(0) - now << endl;

    now = time(0);
    sum_kahan3t(test);
    cout << time(0) - now << endl;

    now = time(0);
    sum_kahan1e(test);
    cout << time(0) - now << endl;

    now = time(0);
    sum_kahan2e(test);
    cout << time(0) - now << endl;

    now = time(0);
    sum_kahan3e(test);
    cout << time(0) - now << endl;

    now = time(0);
    sum_kahan1te(test);
    cout << time(0) - now << endl;

    now = time(0);
    sum_kahan2te(test);
    cout << time(0) - now << endl;

    now = time(0);
    sum_kahan3te(test);
    cout << time(0) - now << endl;

    return 0;
}

2
基准测试!太好了。你会惊讶于有多少人拒绝对他们的代码进行基准测试。我很惊讶使用 .data 如此之快,这可能表明 eigen 在 .cols、.rows 和 .xs 函数中做了一些工作。如果是这种情况,你可以尝试将 xs.size() 和 xs.data() 提升出循环,如果可能的话,这可能会更有帮助(虽然这不太可能,但值得一试)。 - jleahy
xs.size() 已经超出了循环范围。我在将 xs.data() 输出时遇到了麻烦:T * xsBegin = xs.data();,因为 g++ 输出了一些关于模板的错误信息,结果发现我只是漏了一个 const:const T * xsBegin = xs.data(); - random_user
据我所知,for (size_t i = 0, size = xs.size(); i < size; i++) { }类似于{size_t i = 0, size = xs.size(); for (; i < size; i++) { } } - random_user
抱歉,我没有仔细看你的代码,也没有注意到你已经进行了优化。为了不让人们感到困惑,我撤回之前的评论。 - jleahy

14
默认情况下,Eigen按列主序(Fortran)分配矩阵(文档)。以存储顺序迭代矩阵是最快的方法,反之将增加缓存未命中次数(如果您的矩阵不适合L1,则会占用计算时间,因此请提高您的计算时间),未命中次数的增加系数为cacheline/elemsize(可能为64/8=8)。如果您的矩阵适合L1缓存,则这不会有区别,但良好的编译器应能够矢量化循环,启用AVX(在全新的Core i7上)可将速度提高多达4倍。最后,请不要期望任何Eigen的内置函数给你带来速度提升(我不认为有迭代器,但我可能错了),它们只会给你相同的(非常简单的)代码。 简而言之:交换迭代顺序,您需要更快地变化行索引。

2
你发来的链接中使用了PlainObjectBase::data(),如此写法是这样的for (int i = 0; i < Acolmajor.size(); i++),我之前不知道这个函数的存在,也许它和简单循环一样快速,并且我不必担心矩阵是按列优先还是按行优先存储。我会进行一些基准测试来验证这一点。感谢你的回答和链接! - random_user
啊,我也不知道那个函数。这样更好! - jleahy
是的!快来看看我的回答:它比另外两种方法都快得多! - random_user

4

我注意到这段代码等同于矩阵中所有条目的总和,也就是说你可以这样做:

return xs.sum();

我认为单次遍历的表现会更好,而且Eigen应该知道如何安排遍历以获得最佳性能。
然而,如果你想保留两次遍历,可以使用系数逐个减少的机制来表示,就像这样:
return (xs.array() > 0).select(xs, 0).sum() +
       (xs.array() < 0).select(xs, 0).sum();

该方法使用布尔系数逐个选择正负条目。我不知道它是否能胜过手写循环,但在理论上,采用这种编码方式可以让Eigen(和编译器)更了解您的意图,并可能改进结果。


2
尝试在循环内部将xs(i,j)存储在临时变量中,以便您只调用函数一次。

它在if/else if语句中,因此您只会调用它一次。 - jleahy
我在每次迭代中调用它两到三次。我将进行一些基准测试,以检查是否创建临时变量更快。 - random_user

0

除非你使用的处理器不支持添加两个T,否则我强烈建议完全删除if检查,并仅保留单个运行总和(并以此方式重新测试性能)。即:

   for( ... )
      sum += xs(i,j);

考虑到您已经需要知道单元格的值才能评估if条件(即您已经为内存提取付出了代价),因此避免为零值单元格添加0的好处很小。

在您发布的基准测试中,您将T设置为double。除非您使用的CPU缺乏硬件浮点支持,否则您的CPU很可能比处理条件分支更快地执行double加法操作。实际上,在Intel Core i9上运行您的代码时,我能够实现超过10倍的加速(根据矩阵大小高达16倍),只需删除if检查并始终将单元格的值添加到运行总和中。我意识到这是一个旧问题,但是在2013年左右存在的CPU情况也应该没有什么不同,除非您正在处理专用/嵌入式CPU。显然,人们应该始终在他们的目标硬件上进行测试。

PS. 这个答案假设您已经考虑了@jleahy's answer


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接