使用Rcpp和OpenMP进行多线程和SIMD向量化的Mandelbrot

8
作为一个 OpenMPRcpp 的性能测试,我想检查使用最简单直接的 Rcpp+OpenMP 实现在 R 中计算 Mandelbrot 集合的速度有多快。目前我所做的是:
#include <Rcpp.h>
#include <omp.h>
// [[Rcpp::plugins(openmp)]]

using namespace Rcpp;

// [[Rcpp::export]]
Rcpp::NumericMatrix mandelRcpp(const double x_min, const double x_max, const double y_min, const double y_max,
                         const int res_x, const int res_y, const int nb_iter) {
  Rcpp::NumericMatrix ret(res_x, res_y);
  double x_step = (x_max - x_min) / res_x;
  double y_step = (y_max - y_min) / res_y;
  int r,c;
#pragma omp parallel for default(shared) private(c) schedule(dynamic,1) collapse(2)
  for (r = 0; r < res_y; r++) {
    for (c = 0; c < res_x; c++) {
      double zx = 0.0, zy = 0.0, new_zx;
      double cx = x_min + c*x_step, cy = y_min + r*y_step;
      int n = 0;
      for (n=0;  (zx*zx + zy*zy < 4.0 ) && ( n < nb_iter ); n++ ) {
        new_zx = zx*zx - zy*zy + cx;
        zy = 2.0*zx*zy + cy;
        zx = new_zx;
      }
      ret(c,r) = n;
    }
  }
  return ret;
}

然后在R中:

library(Rcpp)
sourceCpp("mandelRcpp.cpp")
xlims=c(-0.74877,-0.74872);
ylims=c(0.065053,0.065103);
x_res=y_res=1080L; nb_iter=10000L;
system.time(m <- mandelRcpp(xlims[[1]], xlims[[2]], ylims[[1]], ylims[[2]], x_res, y_res, nb_iter)) 
# 0.92s
rainbow=c(rgb(0.47,0.11,0.53),rgb(0.27,0.18,0.73),rgb(0.25,0.39,0.81),rgb(0.30,0.57,0.75),rgb(0.39,0.67,0.60),rgb(0.51,0.73,0.44),rgb(0.67,0.74,0.32),rgb(0.81,0.71,0.26),rgb(0.89,0.60,0.22),rgb(0.89,0.39,0.18),rgb(0.86,0.13,0.13))
    cols=c(colorRampPalette(rainbow)(100),rev(colorRampPalette(rainbow)(100)),"black") # palette
par(mar=c(0, 0, 0, 0))
system.time(image(m^(1/7), col=cols, asp=diff(ylims)/diff(xlims), axes=F, useRaster=T)) 
# 0.5s

enter image description here

我不确定除了OpenMP多线程之外,是否还有其他明显的速度提升方法可以利用,例如通过simd向量化?(在openmp #pragma中使用simd选项似乎没有任何效果)

首先我的代码一开始崩溃了,但后来我发现通过将ret[r,c] = n;替换为ret(r,c) = n;可以解决这个问题。如下面的答案所建议的使用Armadillo类可以使事情稍微快一点,尽管时间几乎相同。还将xy翻转,以便在使用image()绘制时呈现正确的方向。使用8个线程速度比向量化的普通R Mandelbrot版本here快约350倍,也比(非多线程)Python/Numba版本here(类似于PyCUDA或PyOpenCL速度)快约7.3倍,所以对此感到非常满意... 现在似乎是R中光栅化/显示的瓶颈....


通常情况下,我通过避免在相同轮廓区域和M集上进行迭代来提高速度(使用C和汇编迭代)。远离M集边界时,大面积都包含在轮廓内部,因此我开发了一种曲线拼接方法来跟踪轮廓边界,然后填充轮廓。迭代越深,则收益越好。如果意外地剪掉了一个芽,可能会有惩罚,并且我不知道在使用线程时如何使用这种方法。当进行双倍缩放时,还可以通过已知1/4点的方式实现节省。 - Weather Vane
其实并不是这样,因为这样就不再使用简单的逃逸时间算法了,也不再得到连续的数字返回,而是得到固定迭代次数的结果,详见https://en.wikipedia.org/wiki/Mandelbrot_set#Continuous_(smooth)_coloring。 - Tom Wenseleers
请参考以下 Python 代码示例:https://www.ibm.com/developerworks/community/blogs/jfp/entry/My_Christmas_Gift?lang=en 其中介绍了两种方法... - Tom Wenseleers
感谢您的回复。虽然已经有一段时间了,但也许我应该学习一下您提供的链接并重新审视我的屏幕保护程序。关于平滑轮廓的着色:眼睛甚至能够察觉到颜色的微小变化。关于在M-Set边界附近着色:我尝试了许多不同的基于与邻居之间的差异来选择清晰细节的着色算法。关于播放:我从存储的关键图像中插值出中间帧,并且需要使用不同的着色算法来过滤掉边界附近的像素闪烁。 - Weather Vane
是的,我很想看看如何在我的代码中使其工作!(如果可以不使用内联汇编的话 :-) ) - Tom Wenseleers
显示剩余3条评论
2个回答

6

不要在使用Rcpp*Vector*Matrix对象时使用OpenMP,因为它们掩盖了单线程的SEXP函数/内存分配。OpenMP是一种多线程方法

这就是代码崩溃的原因。

绕过此限制的一种方法是使用非R数据结构来存储结果。以下任何一种都足够: arma::matEigen::MatrixXdstd::vector<T>... 因为我喜欢armadillo,所以我将把res矩阵从Rcpp::NumericMatrix更改为arma::mat。 因此,以下内容将并行执行您的代码:

#include <RcppArmadillo.h> // Note the changed include and new attribute
// [[Rcpp::depends(RcppArmadillo)]]

// Avoid including header if openmp not on system
#ifdef _OPENMP
#include <omp.h>
#endif
// [[Rcpp::plugins(openmp)]]

// Note the changed return type
// [[Rcpp::export]]
arma::mat mandelRcpp(const double x_min, const double x_max,
                     const double y_min, const double y_max,
                     const int res_x, const int res_y, const int nb_iter) {
  arma::mat ret(res_x, res_y); // note change
  double x_step = (x_max - x_min) / res_x;
  double y_step = (y_max - y_min) / res_y;
  unsigned r,c;

  #pragma omp parallel for shared(res)
  for (r = 0; r < res_y; r++) {
    for (c = 0; c < res_x; c++) {
      double zx = 0.0, zy = 0.0, new_zx;
      double cx = x_min + c*x_step, cy = y_min + r*y_step;
      unsigned n = 0;
      for (;  (zx*zx + zy*zy < 4.0 ) && ( n < nb_iter ); n++ ) {
        new_zx = zx*zx - zy*zy + cx;
        zy = 2.0*zx*zy + cy;
        zx = new_zx;
      }

      if(n == nb_iter) {
        n = 0;
      }

      ret(r, c) = n;
    }
  }

  return ret;
}

使用测试代码(注意yx未定义,因此我假设y = ylimsx = xlims),我们得到:

xlims = ylims = c(-2.0, 2.0)

x_res = y_res = 400L
nb_iter = 256L

system.time(m <-
              mandelRcpp(xlims[[1]], xlims[[2]],
                         ylims[[1]], ylims[[2]], 
                         x_res, y_res, nb_iter))

rainbow = c(
  rgb(0.47, 0.11, 0.53),
  rgb(0.27, 0.18, 0.73),
  rgb(0.25, 0.39, 0.81),
  rgb(0.30, 0.57, 0.75),
  rgb(0.39, 0.67, 0.60),
  rgb(0.51, 0.73, 0.44),
  rgb(0.67, 0.74, 0.32),
  rgb(0.81, 0.71, 0.26),
  rgb(0.89, 0.60, 0.22),
  rgb(0.89, 0.39, 0.18),
  rgb(0.86, 0.13, 0.13)
)

cols = c(colorRampPalette(rainbow)(100),
         rev(colorRampPalette(rainbow)(100)),
         "black") # palette
par(mar = c(0, 0, 0, 0))

image(m,
      col = cols,
      asp = diff(range(ylims)) / diff(range(xlims)),
      axes = F)

对于:

输入图像描述


2
你可以在这些对象上使用新的SIMD结构。关于私有变量,那些变量是私有的。因此,对于每个私有变量,你都会在内存中为每个线程显式地创建一个单独的副本。不确定是否会有收益。 - coatless
哈,我现在明白了 - 谢谢!我尝试过以下这些指令: #pragma omp parallel for simd #pragma omp for simd #pragma omp simd 但似乎都无法提高性能... - Tom Wenseleers
@TomWenseleers 你需要手动进行向量化。这种优化对于编译器来说太过高级了。你必须保存先完成的像素,并使用掩码来查找何时所有像素都完成,然后再移动到下一个像素。 - Z boson
@TomWenseleers 你在用哪个编译器?我可以给你一个优雅的解决方案,适用于GCC、Clang和可能的ICC,但不适用于MSVC。 - Z boson
@TomWenseleers,我建议您使用向量扩展。请看我在这里制作的表格https://stackoverflow.com/a/43778723/2542702 - Z boson
显示剩余7条评论

5

我使用GCC和Clang的向量扩展,将OP的代码向量化。在展示如何进行向量化之前,让我先展示以下硬件的性能:

Skylake (SKL) at 3.1 GHz with 4 cores
Knights Landing (KNL) at 1.5 GHz with 68 cores
ARMv8 Cortex-A57 arch64 (Nvidia Jetson TX1) 4 cores at ? GHz

nb_iter = 1000000
                        GCC             Clang
SKL_scalar              6m5,422s
SKL_SSE41               3m18,058s
SKL_AVX2                1m37,843s       1m39,943s
SKL_scalar_omp          0m52,237s
SKL_SSE41_omp           0m29,624s       0m31,356s
SKL_AVX2_omp            0m14,156s       0m16,783s

ARM_scalar              15m28.285s
ARM_vector              9m26.384s
ARM_scalar_omp          3m54.242s
ARM_vector_omp          2m21.780s

KNL_scalar              19m34.121s
KNL_SSE41               11m30.280s
KNL_AVX2                5m0.005s        6m39.568s
KNL_AVX512              2m40.934s       6m20.061s
KNL_scalar_omp          0m9.108s
KNL_SSE41_omp           0m6.666s        0m6.992s
KNL_AVX2_omp            0m2.973s        0m3.988s
KNL_AVX512_omp          0m1.761s        0m3.335s

KNL相对于SKL的理论加速比为

(68 cores/4 cores)*(1.5 GHz/3.1 Ghz)*
(8 doubles per lane/4 doubles per lane) = 16.45

我详细介绍了GCC和Clang的向量扩展功能在这里。为了将OP的代码向量化,我们需要定义三个额外的向量操作。 1. 广播 对于向量v和标量s,GCC无法执行v = s,但Clang可以。但我发现了一种不错的解决方案,适用于GCC和Clang在这里。例如:
vsi v = s - (vsi){};

2. 一个类似于OpenCL中的any()函数或者像R中的。

我想到的最好的方式是创建一个通用函数。

static bool any(vli const & x) {
  for(int i=0; i<VLI_SIZE; i++) if(x[i]) return true;
  return false;
}

Clang实际上使用ptest指令为此生成相对高效的代码(但不适用于AVX512),但GCC则不是这样。 3. 压缩 计算使用64位双精度浮点数完成,但结果会写成32位整数。因此,使用64位整数进行了两次计算,然后将这两个计算压缩成一个32位整数向量。我想出了一种通用解决方案,Clang做得很好。
static vsi compress(vli const & lo, vli const & hi) {
  vsi lo2 = (vsi)lo, hi2 = (vsi)hi, z;
  for(int i=0; i<VLI_SIZE; i++) z[i+0*VLI_SIZE] = lo2[2*i];
  for(int i=0; i<VLI_SIZE; i++) z[i+1*VLI_SIZE] = hi2[2*i];
  return z;
}

以下解决方案在GCC上效果更好,但对于Clang来说并没有改善。但由于此函数不是关键性的,我只使用通用版本。

static vsi compress(vli const & low, vli const & high) {
#if defined(__clang__)
  return __builtin_shufflevector((vsi)low, (vsi)high, MASK);
#else
  return __builtin_shuffle((vsi)low, (vsi)high, (vsi){MASK});
#endif
}

这些定义不依赖于任何x86特定的内容,下面定义的代码也可以在ARM处理器上使用GCC和Clang编译。
现在这些已经定义好了,下面是代码。
#include <string.h>
#include <inttypes.h>
#include <Rcpp.h>

using namespace Rcpp;

#ifdef _OPENMP
#include <omp.h>
#endif
// [[Rcpp::plugins(openmp)]]
// [[Rcpp::plugins(cpp14)]]

#if defined ( __AVX512F__ ) || defined ( __AVX512__ )
static const int SIMD_SIZE = 64;
#elif defined ( __AVX2__ )
static const int SIMD_SIZE = 32;
#else
static const int SIMD_SIZE = 16;
#endif

static const int VSI_SIZE = SIMD_SIZE/sizeof(int32_t);
static const int VLI_SIZE = SIMD_SIZE/sizeof(int64_t);
static const int VDF_SIZE = SIMD_SIZE/sizeof(double);

#if defined(__clang__)
typedef int32_t vsi __attribute__ ((ext_vector_type(VSI_SIZE)));
typedef int64_t vli __attribute__ ((ext_vector_type(VLI_SIZE)));
typedef double  vdf __attribute__ ((ext_vector_type(VDF_SIZE)));
#else
typedef int32_t vsi __attribute__ ((vector_size (SIMD_SIZE)));
typedef int64_t vli __attribute__ ((vector_size (SIMD_SIZE)));
typedef double  vdf __attribute__ ((vector_size (SIMD_SIZE)));
#endif

static bool any(vli const & x) {
  for(int i=0; i<VLI_SIZE; i++) if(x[i]) return true;
  return false;
}

static vsi compress(vli const & lo, vli const & hi) {
  vsi lo2 = (vsi)lo, hi2 = (vsi)hi, z;
  for(int i=0; i<VLI_SIZE; i++) z[i+0*VLI_SIZE] = lo2[2*i];
  for(int i=0; i<VLI_SIZE; i++) z[i+1*VLI_SIZE] = hi2[2*i];
  return z;
}

// [[Rcpp::export]]
IntegerVector frac(double x_min, double x_max, double y_min,  double y_max, int res_x, int res_y, int nb_iter) {
  IntegerVector out(res_x*res_y);
  vdf x_minv = x_min - (vdf){}, y_minv = y_min - (vdf){};
  vdf x_stepv = (x_max - x_min)/res_x - (vdf){}, y_stepv = (y_max - y_min)/res_y - (vdf){};
  double a[VDF_SIZE] __attribute__ ((aligned(SIMD_SIZE)));
  for(int i=0; i<VDF_SIZE; i++) a[i] = 1.0*i;
  vdf vi0 = *(vdf*)a;

  #pragma omp parallel for schedule(dynamic) collapse(2)
  for (int r = 0; r < res_y; r++) {
    for (int c = 0; c < res_x/(VSI_SIZE); c++) {
      vli nv[2] = {0 - (vli){}, 0 - (vli){}};
      for(int j=0; j<2; j++) {
        vdf c2 = 1.0*VDF_SIZE*(2*c+j) + vi0;
        vdf zx = 0.0 - (vdf){}, zy = 0.0 - (vdf){}, new_zx;
        vdf cx = x_minv + c2*x_stepv, cy = y_minv + r*y_stepv;
        vli t = -1 - (vli){};
        for (int n = 0; any(t = zx*zx + zy*zy < 4.0) && n < nb_iter; n++, nv[j] -= t) {
          new_zx = zx*zx - zy*zy + cx;
          zy = 2.0*zx*zy + cy;
          zx = new_zx;
        }
      }
      vsi sp = compress(nv[0], nv[1]);
      memcpy(&out[r*res_x + VSI_SIZE*c], (int*)&sp, SIMD_SIZE);
    }
  }
  return out;
}

这段 R 代码几乎与原帖作者的代码相同

library(Rcpp)
sourceCpp("frac.cpp", verbose=TRUE, rebuild=TRUE)                                                                                                                                                         
xlims=c(-0.74877,-0.74872);
ylims=c(0.065053,0.065103);
x_res=y_res=1080L; nb_iter=100000L;

t = system.time(m <- frac(xlims[[1]], xlims[[2]], ylims[[1]], ylims[[2]], x_res, y_res, nb_iter))
print(t)
m2 = matrix(m, ncol = x_res)

rainbow = c(
  rgb(0.47, 0.11, 0.53),
  rgb(0.27, 0.18, 0.73),
  rgb(0.25, 0.39, 0.81),
  rgb(0.30, 0.57, 0.75),
  rgb(0.39, 0.67, 0.60),
  rgb(0.51, 0.73, 0.44),
  rgb(0.67, 0.74, 0.32),
  rgb(0.81, 0.71, 0.26),
  rgb(0.89, 0.60, 0.22),
  rgb(0.89, 0.39, 0.18),
  rgb(0.86, 0.13, 0.13)
)

cols = c(colorRampPalette(rainbow)(100),
         rev(colorRampPalette(rainbow)(100)),"black") # palette                                                                                                                  
par(mar = c(0, 0, 0, 0))
image(m2^(1/7), col=cols, asp=diff(ylims)/diff(xlims), axes=F, useRaster=T)

如需为GCC或Clang编译,请更改文件~/.R/Makevars中的内容为:

CXXFLAGS= -Wall -std=c++14 -O3 -march=native -ffp-contract=fast -fopenmp
#uncomment the following two lines for clang    
#CXX=clang-5.0
#LDFLAGS= -lomp

如果您在Clang上无法使用OpenMP,请参见this


这段代码生成的图像与原图几乎相同。 在此输入图片描述


1
@TomWenseleers 我不想要这个被接受的投票。你能把它还给原来的人吗? - Z boson
1
谢谢,听起来非常棒和有趣!对于颜色,这只是一个简单的伽马颜色变换,以使颜色均衡一些 - 最好使用的伽马系数可能会有所不同。为了避免这种情况,最后我转而使用直方图均衡化,https://en.wikipedia.org/wiki/Histogram_equalization,因为它总是返回令人愉悦的颜色渐变...像http://www.fractalforums.com/fractal-exteme/smooth-shading-mandelbrot-plugin/或https://www.ibm.com/developerworks/community/blogs/jfp/entry/My_Christmas_Gift?lang=en中使用平滑着色也很好。 - Tom Wenseleers
1
@TomWenseleers,你的Mandelbrot存储库看起来真的很酷!我没有写关于摄动法的东西。我想我用的链接现在已经失效了。让我看看能否找到一个有效的链接。 - Z boson
1
https://math.stackexchange.com/questions/939270/perturbation-of-mandelbrot-set-fractal - Z boson
1
http://www.science.eclipse.co.uk/sft_maths.pdf - Z boson
显示剩余19条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接