对于 Eigen 矩阵中的一列子集进行矩阵乘法

6

对于一个随机的列索引集,对 Eigen::Matrix 进行矩阵乘法的最快方法是什么?

Eigen::MatrixXd mat = Eigen::MatrixXd::Random(100, 1000);
// vector of random indices (linspaced here for brevity)
Eigen::VectorXi idx = VectorXi::LinSpaced(8,1000,9);

我正在使用RcppEigen和R,它仍然使用版本为3.x的Eigen(不支持使用索引数组的()), 而且无论如何,我的理解是()运算符仍然执行深拷贝。

现在我正在执行一个深度复制,并生成一个仅包含idx列数据的新矩阵:

template <typename T>
inline Eigen::Matrix<T, -1, -1> subset_cols(const Eigen::Matrix<T, -1, -1>& x, const std::vector<size_t>& cols) {
    Eigen::Matrix<T, -1, -1> y(x.rows(), cols.size());
    for (size_t i = 0; i < cols.size(); ++i)
        y.col(i) = x.col(cols[i]);
    return y;
}

然后进行矩阵乘法:

Eigen::MatrixXd sub_mat = subset_cols(mat, idx);
Eigen::MatrixXd a = sub_mat * sub_mat.transpose();

a 是我想要的。

一定有办法避免深度拷贝,而是使用 Eigen::Map

编辑 5/9/22: 针对@Markus提出使用原始数据访问和Eigen::Map的方法进行回复。所提出的解决方案比进行深拷贝的矩阵乘法略慢。这里使用Rcpp代码和R进行基准测试:

//[[Rcpp::depends(RcppClock)]]
#include <RcppClock.h>

//[[Rcpp::export]]
void bench(Eigen::MatrixXd mat, Eigen::VectorXi idx){
  Rcpp::Clock clock;
  size_t reps = 100;
  while(reps-- > 0){
    clock.tick("copy");
    Eigen::MatrixXd sub_mat = subset_cols(mat, idx);
    Eigen::MatrixXd a = sub_mat * sub_mat.transpose();
    clock.tock("copy");
    
    clock.tick("map");
    double *b_raw = new double[mat.rows() * mat.rows()];
    Eigen::Map<Eigen::MatrixXd> b(b_raw, mat.rows(), mat.rows());
    subset_AAt(b_raw, mat, idx);
    clock.tock("map");
  }
  clock.stop("clock");
}

以下是一个100行、100,000列矩阵的三个运算实例。我们将对(1)10列子集,(2)1000列子集和(3)10000列子集进行矩阵乘法运算。

R:

bench(
  matrix(runif(100000 * 100), 100, 100000), 
  sample(100000, 10) - 1)

# Unit: microseconds 
# ticker   mean     sd   min    max neval
#    copy  31.65  4.376 30.15  69.46   100
#     map 113.46 21.355 68.54 166.29   100

bench(
  matrix(runif(100000 * 100), 100, 100000), 
  sample(100000, 1000) - 1)

#  Unit: milliseconds 
#  ticker  mean     sd   min   max neval
#    copy 2.361 0.5789 1.972  4.86   100
#     map 9.495 2.4201 7.962 19.90   100

bench(
  matrix(runif(100000 * 100), 100, 100000), 
  sample(100000, 10000) - 1)

#  Unit: milliseconds 
#  ticker   mean     sd    min   max neval
#    copy  23.04  2.774  20.95  42.4   100
#     map 378.14 19.424 351.56 492.0   100

我在几台类似的机器上进行了基准测试,结果相似。以上结果来自一台良好的 HPC 节点。

编辑:2022 年 5 月 10 日 以下是一段代码片段,它可以像没有直接使用 Eigen BLAS 的任何代码一样快地执行矩阵乘法,但仅适用于某些列的子集:

template <typename T>
Eigen::Matrix<T, -1, -1> subset_AAt(const Eigen::Matrix<T, -1, -1>& A, const Eigen::VectorXi& cols) {
  const size_t n = A.rows();
  Eigen::Matrix<T, -1, -1> AAt(n, n);
  for (size_t k = 0; k < cols.size(); ++k) {
    const T* A_data = A.data() + cols(k) * n;
    for (size_t i = 0; i < n; ++i) {
      T tmp_i = A_data[i];
      for (size_t j = 0; j <= i; ++j) {
        AAt(i * n + j) += tmp_i * A_data[j];
      }
    }
  }
  return AAt;
}

1
我稍微试了一下。Eigen::Map不起作用,因为步幅是非等距的。在Linux上使用slicling比你的subset_cols()方式性能提高了约10%,在clang和gcc上表现更好,但在MSVC上表现较差。正如你所指出的,它在3.3分支上不可用。有一种自定义的方法可以模仿它,但在我的测试中始终表现较差。启用AVX可以获得最佳改进(快约1.5倍),甚至可以启用AVX512。 - Sedenion
@ Sedenion,感谢您在基准测试替代方法方面所做的努力。您的想法很有道理,但似乎任何收益可能只是微不足道的。是的,在我的个人使用中,我正在使用启用了AVX和Intel MKL的版本,但对于普通用户来说,性能是我首要关注的问题。 - zdebruine
2个回答

3

利用对称性

您可以利用结果矩阵将会是对称的这一特点:

Mat sub_mat = subset_cols(mat, idx); // From your original post
Mat a = Mat::Zero(numRows, numRows);
a.selfadjointView<Eigen::Lower>().rankUpdate(sub_mat); // (1)
a.triangularView<Eigen::Upper>() = a.transpose(); // (2)

第一行(1)只会计算下三角部分的a += sub_mat * sub_mat.transpose()。第二行(2)将下三角部分写入上三角部分。请参阅文档(此处此处)。如果您只需要下三角部分,可以省略步骤(2)。

对于一个100x100000的矩阵mat,当取10列时,速度提升约为:

  • ~1.1x,
  • 当取100列时,速度提升约为~1.5x,
  • 当取1000列时,速度提升约为~1.7x。

使用MSVC在Windows和使用clang在Linux上进行全优化和AVX,都可以得到以上结果。

启用并行处理

另一种加快计算速度的方法是通过编译同时启用并行处理功能。Eigen会自动处理其余部分。上述利用对称性的代码不会受益于这个功能。但原始代码可以。

Eigen::MatrixXd sub_mat = subset_cols(mat, idx);
Eigen::MatrixXd a = sub_mat * sub_mat.transpose();

对于一个 100x100000 的矩阵 mat,在 Linux 平台上使用 clang 编译器运行时,使用 4 个线程(利用了 4 个真实的 CPU 核心),相比于单线程运行,速度提升大致为:
- 当取 10 列时,即几乎没有加速:大约 ~1.0x - 当取 100 列时,速度提高了约 1.8 倍:大约 ~1.8x - 当取 1000 列时,速度提高了约 2 倍:大约 ~2.0x
换句话说,除了极少数列数较小的情况外,使用 4 个或更多核心都能优于上述对称方法。在我的测试中,仅使用 2 个核心始终较慢。需要注意的是,在我的测试中,使用SMT有时会显著降低性能。
其他注意事项:
我已经在评论中写过这个,但为了完整起见: Eigen::Map 不适用于本例,因为步长不是等距的。在 Linux 上使用 clang 和 gcc,使用切片比你的拷贝方法性能提高了约 10%,但在 MSVC 上稍微差一些。此外,正如你指出的那样,它在 Eigen 的 3.3 分支中不可用。有一种自定义方式可以模仿它,但在我的测试中性能始终较差。此外,在我的测试中,与拷贝方法相比,它并没有节省任何内存。
我认为很难在性能上超越拷贝方法本身,因为 Eigen 矩阵默认为列主序,即拷贝几列的成本相当便宜。此外,不需要真正了解细节,我怀疑 Eigen 可以在计算乘积和转置时将全部优化投入到完整矩阵中,而无需处理视图或任何类似的内容。这可能给予 Eigen 更多的向量化或缓存局部性的机会。
除了打开优化选项外,还应使用尽可能高的指令集。在我的测试中,打开 AVX 可以提高大约 1.5 倍的性能。不幸的是,我无法测试 AVX512。

非常好。对称性的观点确实很有效,绝对有帮助。谢谢! - zdebruine
@zdebruine 我在我的帖子中编辑了另一种方法,通过启用OpenMP并行化来加速计算。 - Sedenion
说实话,针对矩阵乘法来说并行化是前进的道路。如果你可以使用 OpenCL,你会发现有很多经过优化的实现可以利用GPU硬件的计算核心的共享内存,而且有了OpenCL,你也可以在必要时返回到CPU上运行。还有其他选项,但在我看来,大规模并行才是正确的答案,特别是当你有很多互不相关的矩阵时。 - StarShine
1
@zdebruine 如果我的回答对您来说可以接受,请您确认一下吗? - Sedenion
@Sedenion 当然,非常感谢。很快就会在一个广泛使用的软件包中投入生产 :) - zdebruine

1

如果有人在后面发现这个有用的话,我能够通过使用OpenMP和三角形索引来提高比接受问题中Eigen代码的性能。在这种情况下,我正在使用Rcpp :: NumericMatrix,但您也可以直接插入Eigen :: MatrixXd

    Rcpp::NumericMatrix Rcpp_AAt(const Rcpp::NumericMatrix& mat) {
    const size_t n = mat.cols();
    const size_t n_vals = n / 2 * (1 + n) - n;
    Rcpp::NumericMatrix res(n, n);
    #pragma omp parallel for
    for (size_t k = 0; k < (n_vals + n); ++k) {
        // k is linear index
        if (k >= n_vals) {
            size_t i = k - n_vals;
            double tmp = 0;
            for (size_t row = 0; row < mat.rows(); ++row)
                tmp += mat(row, i) * mat(row, i);
            res(i, i) = tmp;
        } else {
            size_t i = n - 2 - std::floor(std::sqrt(-8 * k + 4 * n * (n - 1) - 7) / 2.0 - 0.5);
            size_t j = k + i + 1 - n * (n - 1) / 2 + (n - i) * ((n - i) - 1) / 2;
            double tmp = 0;
            for (size_t row = 0; row < mat.rows(); ++row)
                tmp += mat(row, i) * mat(row, j);
            res(i, j) = tmp;
            res(j, i) = tmp;
        }
    }
    return res;
}

通过使用三角索引,我们允许OpenMP为所有列的组合生成线程,这比仅在一个时间内对一列进行并行处理更为高效(原因显而易见)。Eigen使用多线程,因此我认为这是公平竞争。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接