SIMD 优化列最大值

3

我有一个函数,我的代码中花费了大量时间。如果可能的话,我想通过矢量化-SIMD-编译器内部函数来优化它。

它基本上在列方向上找到矩阵上的最大值及其位置,并将它们存储在以下变量中:

  • val_ptr: 输入矩阵: 列优先(Fortran样式)n行乘以m列(通常n行>>m列)
  • opt_pos_ptr: 长度为n_rows的int向量,用于存储最大值的位置。进入时填充为0。
  • max_ptr: 长度为n_rows的float向量,用于存储最大值。 进入时,用val_ptr的第一列副本填充。
  • 该函数将在并行循环中调用
  • 这些内存区域保证不重叠
  • 我不需要max_ptr被填充,目前只是用于记账和避免内存分配
  • 我使用MSVC,C++17,在Windows 10上运行。旨在运行现代Intel CPU

以下是函数代码,其中模板类型应为float或double:

template <typename eT>
find_max(const int n_cols, 
         const int n_rows, 
         const eT* val_ptr,
         int* opt_pos_ptr,
         eT* max_ptr){
    for (int col = 1; col < n_cols; ++col)
    {
        //Getting the pointer to the beginning of the column
        const auto* value_col = val_ptr + col * n_rows;
        //Looping over the rows
        for (int row = 0; row < n_rows; ++row)
        {
            //If the value is larger than the current maximum, we replace and we store its positions
            if (value_col[row] > max_ptr[row])
            {
                max_ptr[row] = value_col[row];
                opt_pos_ptr[row] = col;
            }
        }
    }
}

我尝试过的方法:

  • 我尝试在内层循环上使用OpenMP并行for,但只有在非常大的行上才会带来一些效果,比我的当前使用略大。
  • 内层循环中的if防止了#pragma omp simd的工作,而我无法在没有它的情况下重写它。

这里似乎混淆了“行”和“列”的含义。在C语言中,我们通常将固定i的集合A[i][j]称为行,将固定j的集合A[i][j]称为列,并且C地址是这样的:&A[i][j] = &A[0][0] + i*NumberOfColumns + j,但是val_ptr + col * n_rows`与该约定相反。 - Eric Postpischil
我编辑了问题,修复了错误并添加了更多细节。你是对的:我使用列优先布局,因为我使用Armadillo来管理我的矩阵,并且通常会转到指针来完成繁重的工作,就像在这种情况下一样。 - Enzo Ferrazzano
我认为使用模板类型会使使用内部函数变得困难。如果 eT 是 double 类型,你可以使用 fmax 来获取最大值,并使用 ?: 语句替换 opt_pos_ptr 中的 if 语句。如果一定要使用模板类型,从你的问题中删除 C 标签是礼貌的举动。 - dmuir
可以给我指一下你提到的功能的文档吗?模板并不是必要的,我可以分别为浮点数和双精度浮点数编写两个实现,这是我主要的用途。 - Enzo Ferrazzano
对于fmax搜索'fmax c++'会得到一些结果,例如https://en.cppreference.com/w/cpp/numeric/math/fmax。您可以将if (cond) {opt[row] = col;}替换为opt[row] = cond ? col : opt[row]; fmax和?:看起来都可能涉及分支,但编译器可以使用条件存储代替。当然,这是否有任何改进只有测量才能告诉我们。 - dmuir
显示剩余3条评论
1个回答

4

根据您发布的代码示例,看起来您想计算垂直最大值,这意味着在您的情况下,“列”是水平的。 在C/C++中,元素的水平序列(即相邻元素在内存中相差一个元素的位置)通常称为行,而垂直序列(其中两个相邻元素的距离为行大小)则称为列。 在下面的回答中,我将使用传统术语,其中行是水平的,列是垂直的。

另外,为了简洁起见,我将关注一种可能的矩阵元素类型-float。 基本思想对于double是相同的,主要区别在于每个向量的元素数量和_ps/_pd内部选择。 我将在最后提供double版本。


想法是可以使用_mm_max_ps/_mm_max_pd并行计算多个列的垂直最大值。 为了记录找到的最大值的位置,可以将先前的最大值与当前元素进行比较。比较结果是一个掩码,其中元素全都是1,表示更新了最大值。该掩码可用于选择需要更新的位置。

我必须指出,下面的算法假定不重要记录哪个最大元素的位置,如果一列中有多个相等的最大元素。 此外,我假设矩阵不包含NaN值,这会影响比较。 更多信息请参见后文。

void find_max(const int n_cols, 
         const int n_rows, 
         const float* val_ptr,
         int* opt_pos_ptr,
         float* max_ptr){
    const __m128i mm_one = _mm_set1_epi32(1);

    // Pre-compute the number of rows that can be processed in full vector width.
    // In a 128-bit vector there are 4 floats or 2 doubles
    int tail_size = n_rows & 3;
    int n_rows_aligned = n_rows - tail_size;
    int row = 0;
    for (; row < n_rows_aligned; row += 4)
    {
        const auto* col_ptr = val_ptr + row;
        __m128 mm_max = _mm_loadu_ps(col_ptr);
        __m128i mm_max_pos = _mm_setzero_si128();
        __m128i mm_pos = mm_one;
        col_ptr += n_rows;
        for (int col = 1; col < n_cols; ++col)
        {
            __m128 mm_value = _mm_loadu_ps(col_ptr);

            // See if this value is greater than the old maximum
            __m128 mm_mask = _mm_cmplt_ps(mm_max, mm_value);
            // If it is, save its position
            mm_max_pos = _mm_blendv_epi8(mm_max_pos, mm_pos, _mm_castps_si128(mm_mask));

            // Compute the maximum
            mm_max = _mm_max_ps(mm_value, mm_max);

            mm_pos = _mm_add_epi32(mm_pos, mm_one);
            col_ptr += n_rows;
        }

        // Store the results
        _mm_storeu_ps(max_ptr + row, mm_max);
        _mm_storeu_si128(reinterpret_cast< __m128i* >(opt_pos_ptr + row), mm_max_pos);
    }

    // Process tail serially
    for (; row < n_rows; ++row)
    {
        const auto* col_ptr = val_ptr + row;
        auto max = *col_ptr;
        int max_pos = 0;
        col_ptr += n_rows;
        for (int col = 1; col < n_cols; ++col)
        {
            auto value = *col_ptr;
            if (value > max)
            {
                max = value;
                max_pos = col;
            }

            col_ptr += n_rows;
        }

        max_ptr[row] = max;
        opt_pos_ptr[row] = max_pos;
    }
}

上述代码由于混合内在函数需要SSE4.1支持。您可以将它们替换为_mm_and_si128/_ps_mm_andnot_si128/_ps_mm_or_si128/_ps的组合,这样就可以将要求降低到SSE2。有关特定内在函数的更多详细信息,包括所需的指令集扩展,请参阅Intel Intrinsics指南
关于NaN值的注意事项。如果矩阵中可能存在NaN,则_mm_cmplt_ps测试将始终返回false。至于_mm_max_ps,通常不知道它会返回什么。该内在函数所对应的maxps指令,如果其中任一操作数是NaN,则返回第二个(源)操作数,因此通过安排指令的操作数,可以实现任一行为。然而,并没有文档说明_mm_max_ps内在函数的哪个参数表示指令的哪个操作数,甚至编译器在不同情况下使用不同的关联也是可能的。有关更多详细信息,请参见答案。
为确保对NaNs的正确行为,您可以使用内联汇编以强制正确的maxps操作数顺序。不幸的是,对于您正在使用的x86-64目标MSVC而言,这并不是一个选择,因此您可以重用_mm_cmplt_ps的结果进行第二个混合,如下所示:
// Compute the maximum
mm_max = _mm_blendv_ps(mm_max, mm_value, mm_mask);

这将抑制结果中的NaN(非数字)。如果您希望保留NaN,可以使用第二个比较来检测NaN:

// Detect NaNs
__m128 mm_nan_mask = _mm_cmpunord_ps(mm_value, mm_value);

// Compute the maximum
mm_max = _mm_blendv_ps(mm_max, mm_value, _mm_or_ps(mm_mask, mm_nan_mask));

如果使用更为粗暴的向量寄存器(__m256__m512),并以一个小因子展开外层循环,那么算法性能可以进一步提高,这样就可以保证每次内层循环迭代时至少会加载一整行数据到缓存中。
下面是一个使用 double 的实现例子。需要注意的重点是,由于每个向量仅包含两个元素而向量长度是四个位置,因此我们必须将外层循环展开以便同时处理两个 double 向量,并通过压缩来自与前一最大值的比较所得到的两个掩码以混合 32 位位置。
void find_max(const int n_cols, 
         const int n_rows, 
         const double* val_ptr,
         int* opt_pos_ptr,
         double* max_ptr){
    const __m128i mm_one = _mm_set1_epi32(1);

    // Pre-compute the number of rows that can be processed in full vector width.
    // In a 128-bit vector there are 2 doubles, but we want to process
    // two vectors at a time.
    int tail_size = n_rows & 3;
    int n_rows_aligned = n_rows - tail_size;
    int row = 0;
    for (; row < n_rows_aligned; row += 4)
    {
        const auto* col_ptr = val_ptr + row;
        __m128d mm_max1 = _mm_loadu_pd(col_ptr);
        __m128d mm_max2 = _mm_loadu_pd(col_ptr + 2);
        __m128i mm_max_pos = _mm_setzero_si128();
        __m128i mm_pos = mm_one;
        col_ptr += n_rows;
        for (int col = 1; col < n_cols; ++col)
        {
            __m128d mm_value1 = _mm_loadu_pd(col_ptr);
            __m128d mm_value2 = _mm_loadu_pd(col_ptr + 2);

            // See if this value is greater than the old maximum
            __m128d mm_mask1 = _mm_cmplt_pd(mm_max1, mm_value1);
            __m128d mm_mask2 = _mm_cmplt_pd(mm_max2, mm_value2);
            // Compress the 2 masks into one
            __m128i mm_mask = _mm_packs_epi32(
                _mm_castpd_si128(mm_mask1), _mm_castpd_si128(mm_mask2));
            // If it is, save its position
            mm_max_pos = _mm_blendv_epi8(mm_max_pos, mm_pos, mm_mask);

            // Compute the maximum
            mm_max1 = _mm_max_pd(mm_value1, mm_max1);
            mm_max2 = _mm_max_pd(mm_value2, mm_max2);

            mm_pos = _mm_add_epi32(mm_pos, mm_one);
            col_ptr += n_rows;
        }

        // Store the results
        _mm_storeu_pd(max_ptr + row, mm_max1);
        _mm_storeu_pd(max_ptr + row + 2, mm_max2);
        _mm_storeu_si128(reinterpret_cast< __m128i* >(opt_pos_ptr + row), mm_max_pos);
    }

    // Process 2 doubles at once
    if (tail_size >= 2)
    {
        const auto* col_ptr = val_ptr + row;
        __m128d mm_max1 = _mm_loadu_pd(col_ptr);
        __m128i mm_max_pos = _mm_setzero_si128();
        __m128i mm_pos = mm_one;
        col_ptr += n_rows;
        for (int col = 1; col < n_cols; ++col)
        {
            __m128d mm_value1 = _mm_loadu_pd(col_ptr);

            // See if this value is greater than the old maximum
            __m128d mm_mask1 = _mm_cmplt_pd(mm_max1, mm_value1);
            // Compress the mask. The upper half doesn't matter.
            __m128i mm_mask = _mm_packs_epi32(
                _mm_castpd_si128(mm_mask1), _mm_castpd_si128(mm_mask1));
            // If it is, save its position
            mm_max_pos = _mm_blendv_epi8(mm_max_pos, mm_pos, mm_mask);

            // Compute the maximum
            mm_max1 = _mm_max_pd(mm_value1, mm_max1);

            mm_pos = _mm_add_epi32(mm_pos, mm_one);
            col_ptr += n_rows;
        }

        // Store the results
        _mm_storeu_pd(max_ptr + row, mm_max1);
        // Only store the lower two positions
        _mm_storel_epi64(reinterpret_cast< __m128i* >(opt_pos_ptr + row), mm_max_pos);

        row += 2;
    }

    // Process tail serially
    for (; row < n_rows; ++row)
    {
        const auto* col_ptr = val_ptr + row;
        auto max = *col_ptr;
        int max_pos = 0;
        col_ptr += n_rows;
        for (int col = 1; col < n_cols; ++col)
        {
            auto value = *col_ptr;
            if (value > max)
            {
                max = value;
                max_pos = col;
            }

            col_ptr += n_rows;
        }

        max_ptr[row] = max;
        opt_pos_ptr[row] = max_pos;
    }
}

1
使用 _mm_cmplt_ps_mm_max_ps 并行,而不是在之后使用 eq,可以获得更多的 ILP,并且如果小心使用,可能有助于 NaN 问题。此外,在沿着步幅进行操作时,至少要执行一个完整的缓存向量线,特别是如果数据按 64 或 128 对齐,则应该这样做,这样您就可以一次性触摸每个缓存线,而不是分开通过。 - Peter Cordes
我认为 _mm_max_ps 应该像汇编一样遵守参数顺序。但是 GCC 在 GCC7 之前即使没有 -ffast-math,也将其视为可交换的,因此在实践中,您只能在 clang、ICC 和可能的 MSVC 上安全使用它(我没有测试 MSVC)。 请参见 What is the instruction that gives branchless FP min and max on x86?。但是,如果您想要在 GCC6 及更早版本上实现可移植性,则需要假设编译器将其视为可交换的,并且不依赖于其 NaN 行为。 - Peter Cordes
@EnzoFerrazzano > 向量的宽度取决于使用的CPU类型,对吗?-- 可用的向量宽度是如此。但为了利用更宽的向量,必须编写使用更宽向量的该算法的版本,并使用基于支持的CPU功能的运行时调度来动态选择算法版本。 - Andrey Semashev
@EnzoFerrazzano > 你的代码使用哪个指令集?-- 我回答中的代码使用SSE4.1,因为它使用了混合内置函数。您可以将其替换为 _mm_and_si128/_ps_mm_andnot_si128/_ps_mm_or_si128/_ps 的组合,这样要求就会降低到SSE2。256位向量版本可能需要AVX2,而512位则需要AVX-512。 - Andrey Semashev
1
@EnzoFerrazzano 我已经更新了答案,加入了double版本。 - Andrey Semashev
显示剩余10条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接