使用OpenMP线程和std :: (experimental ::) simd计算Mandelbrot集

3
我希望实现一个简单的Mandelbrot集合绘制程序,使用不同种类的HPC范式来展示它们的优势和劣势,以及它们的实现难易程度。考虑到GPGPU(CUDA/OpenACC/OpenMP4.5)、线程/OpenMP和MPI。并使用这些示例为新手程序员提供指导,并展示可能性。代码的清晰度比从硬件中获得绝对最高性能更重要,那是第二步;)
因为该问题容易并行化,而且现代CPU可以通过矢量指令获得大量性能,所以我还想结合OpenMP和SIMD。不幸的是,简单地添加#pragma omp simd并不能产生令人满意的结果,使用内嵌函数也不太友好或具有未来可扩展性,如此处所示。
幸运的是,正在进行C++标准的工作,使得通用地实现矢量指令应该更容易,如TS中所述:“并行扩展,版本2”,特别是第9节数据并行类型。可以在此处找到WIP实现,它基于可以在此处找到的VC。
假设我有以下类(已更改为使其更简单)。
#include <stddef.h>

using Range = std::pair<double, double>;
using Resolution = std::pair<std::size_t, std::size_t>;

class Mandelbrot
{
    double* d_iters;
    Range d_xrange;
    Range d_yrange;
    Resolution d_res;
    std::size_t d_maxIter;
    
public:
    Mandelbrot(Range xrange, Range yrange, Resolution res, std::size_t maxIter);
    ~Mandelbrot();

    void writeImage(std::string const& fileName);
    void computeMandelbrot();
private:
    void calculateColors();
}; 

以下是使用OpenMP实现的computeMandelbrot()

void Mandelbrot::computeMandelbrot()
{
    double dx = (d_xrange.second - d_xrange.first) / d_res.first;
    double dy = (d_yrange.second - d_yrange.first) / d_res.second;

    #pragma omp parallel for schedule(dynamic)
    for (std::size_t row = 0; row != d_res.second; ++row)
    {
        double c_imag = d_yrange.first + row * dy;
        for (std::size_t col = 0; col != d_res.first; ++col)
        {
            double real = 0.0;
            double imag = 0.0;
            double realSquared = 0.0;
            double imagSquared = 0.0;
            double c_real = d_xrange.first + col * dx;

            std::size_t iter = 0;
            while (iter < d_maxIter && realSquared + imagSquared < 4.0)
            {
                realSquared = real * real;
                imagSquared = imag * imag;
                imag = 2 * real * imag + c_imag;
                real = realSquared - imagSquared + c_real;
                ++iter;
            }
            d_iters[row * d_res.first + col] = iter;
        }   
    }
}

我们可以假设 x 和 y 方向的分辨率都是 2/4/8/... 的倍数,具体取决于我们使用哪些 SIMD 指令。
不幸的是,在 std::experimental::simd 上线上几乎找不到任何信息。就我所知,也没有任何非平凡的示例。
在 Vc git 存储库中,有一个 Mandelbrot 集计算器的实现,但它相当复杂,并且由于缺少注释而难以跟踪。
很明显,我应该改变函数 computeMandelbrot() 中 double 的数据类型,但我不确定应该改成什么类型。 TS 提到了两种主要的新数据类型,适用于某种类型 T,
native_simd = std::experimental::simd;

fixed_size_simd = std::experimental::simd>;
使用 native_simd 最有意义,因为我不知道编译时的界限。但是我不清楚这些类型代表什么,native_simd 是单个 double 还是执行矢量指令的多个 double 集合?这个集合中有多少个 double?
如果有人能向我指出使用这些概念的示例,或者给我一些关于如何使用 std::experimental::simd 实现矢量指令的指导,我将非常感激。

1
事实上,SIMD Mandelbrot 是一个艰难的权衡,因为每个像素都有独立的退出条件。您需要手动进行矢量化,例如一直进行直到向量中的所有像素都“逃脱”(但仍记录每个像素第一次逃脱的时间,例如基于比较结果的增量向量的屏蔽,如果您想要对集合外的区域进行着色)。或者跟踪当前在向量中的像素,并在其逃脱时替换一个像素(使用混合),等等。但这会增加很多簿记和洗牌开销。 - Peter Cordes
1
简而言之,曼德博集合中的数据并行性很难利用SIMD(除非始终以最大迭代次数幼稚地运行,对于集合外的像素来说非常慢);您可能需要一个公开更多机器特定操作的SIMD API。哦,这个C++扩展具有bool any_of(const simd_mask<T, Abi>&)和类似函数来测试vec1 < vec2 simd_mask结果,例如x86的movmskpd_mm_movemask_pd)让您根据每个元素比较结果的所有/任意进行分支。因此,您可以使用它来实现Mandelbrot,但我建议先选择一个更适合SIMD的问题。 - Peter Cordes
1
如果你以前没有做过任何SIMD相关的工作,可以尝试向量点积或数组的线性搜索。(这些都与OpenMP线程级并行无关,虽然大多数编译器可以使用OMP simd自动向量化数组点积。) - Peter Cordes
@PeterCordes 这也是为什么向量内积对我来说不是一个很好的例子,因为编译器通常会做正确的事情。这会让人觉得向量化就像添加一个编译指示符或者只是信任icc的自动向量化一样简单。有时候是真的,但往往并非如此。 - Nigel Overmars
@PeterCordes 我主要是在寻找那些已经尝试过这个新的 std::simd 的人,因为目前我还没有找到任何例子。这很有道理,因为它仍然是实验性的。但这也是发布这个问题的原因之一,以便其他想要使用这个构造的人可以找到这个问题。 - Nigel Overmars
显示剩余2条评论
1个回答

0

这里是一个非常基本的实现,它可以工作(据我所知)。对于向量中哪些元素的绝对值大于2的测试是以一种非常繁琐和低效的方式完成的。肯定有更好的方法来做到这一点,但我还没有找到。

在 AMD Ryzen 5 3600 上使用选项-march=znver2给 g++ 带来了约72%的性能提升,这比预期的要少。

template <class T>
void mandelbrot(T xstart, T xend,
                T ystart, T yend)
{
    namespace stdx = std::experimental;

    constexpr auto simdSize = stdx::native_simd<T>().size();
    constexpr unsigned size = 4096;
    constexpr unsigned maxIter = 250;

    assert(size % simdSize == 0);
    unsigned* res = new unsigned[size * size];

    T dx = (xend - xstart) / size;
    T dy = (yend - ystart) / size;

    for (std::size_t row = 0; row != size; ++row)
    {
        T c_imag = ystart + row * dy;
        for (std::size_t col = 0; col != size; col += simdSize)
        {
            stdx::native_simd<T> real{0};
            stdx::native_simd<T> imag{0};
            stdx::native_simd<T> realSquared{0};
            stdx::native_simd<T> imagSquared{0};
            stdx::fixed_size_simd<unsigned, simdSize> iters{0};

            stdx::native_simd<T> c_real;
            for (int idx = 0; idx != simdSize; ++idx)
            {
                c_real[idx] = xstart + (col + idx) * dx;
            }

            for (unsigned iter = 0; iter != maxIter; ++iter)
            {
                realSquared = real * real;
                imagSquared = imag * imag;
                auto isInside = realSquared + imagSquared > stdx::native_simd<T>{4};
                for (int idx = 0; idx != simdSize; ++idx)
                {
                    // if not bigger than 4, increase iters
                    if (!isInside[idx])
                    {
                        iters[idx] += 1;
                    }
                    else
                    {
                        // prevent that they become inf/nan
                        real[idx] = static_cast<T>(4);
                        imag[idx] = static_cast<T>(4);
                    }
                }

                if (stdx::all_of(isInside) )
                {
                    break;
                }        

                imag = static_cast<T>(2.0) * real * imag + c_imag;
                real = realSquared - imagSquared + c_real;
            }
            iters.copy_to(res + row * size + col, stdx::element_aligned);
        }

    }
    delete[] res;
}

整个测试代码(从auto test = (...)开始)编译成以下内容:
  .L9:
  vmulps ymm1, ymm1, ymm1
  vmulps ymm13, ymm2, ymm2
  xor eax, eax
  vaddps ymm2, ymm13, ymm1
  vcmpltps ymm2, ymm5, ymm2
  vmovaps YMMWORD PTR [rsp+160], ymm2
  jmp .L6
.L3:
  vmovss DWORD PTR [rsp+32+rax], xmm0
  vmovss DWORD PTR [rsp+64+rax], xmm0
  add rax, 4
  cmp rax, 32
  je .L22
.L6:
  vucomiss xmm3, DWORD PTR [rsp+160+rax]
  jp .L3
  jne .L3
  inc DWORD PTR [rsp+96+rax]
  add rax, 4
  cmp rax, 32
  jne .L6

请注意,比较操作返回一个全为0或全为1的向量,即整数0 / -1。 iters -= compare; 增加0或1,即 -(-1)。 您希望使用FP比较结果作为int32_t或uint32_t向量的vcmpltps / vpsubd汇编指令,因此您可能需要进行一些强制类型转换以使编译器对类型满意。 - Peter Cordes
@PeterCordes 在我的系统上,它是一个全为0或全为1(即正数)的向量。所以我可以直接执行 iters[idx] += isInside[idx]。但是我仍然有分歧的问题,最终会导致NaN...我会再试试,非常感谢任何提示! - Nigel Overmars
这不是我得到的结果。auto foo (stdx::native_simd<float> v) { return v == v; }编译成一个只执行cmpeqps %xmm0, %xmm0 / ret的函数。因此,它返回一个全1位的向量,表示整数-1。(根据英特尔的汇编手册https://www.felixcloutier.com/x86/cmpps)。你需要做一些其他的事情来将其转换为一个`0x00000000`或`0x00000001`的向量,而不是`0xffffffff`。 - Peter Cordes
simd_mask<T> 的内部对象表示是我们想要的,但我不知道是否有一种可移植的方法可以将其作为整数减去。 叹气,尝试实现可移植性通常会导致某些有用的技巧无法实现,因为抽象 API 与机器实际操作之间存在不匹配。(对于除 AVX512 外的所有向量 ISAs) - Peter Cordes
我已经给这个库的作者写了一封电子邮件,希望他能够给出一些指导,告诉我如何正确地做! - Nigel Overmars
显示剩余3条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接