快速的按字节替换方法

9

我有一个函数,可以将二进制数据从一个区域复制到另一个区域,但仅当字节与特定值不同时才进行复制。以下是代码示例:

void copy_if(char* src, char* dest, size_t size, char ignore)
{
  for (size_t i = 0; i < size; ++i)
  {
    if (src[i] != ignore)
      dest[i] = src[i];
  }
}

问题在于这种方法对于我当前的需求来说太慢了。有没有一种更快的方法来获得相同的结果呢?

更新: 根据答案,我尝试了两种新的实现方式:

void copy_if_vectorized(const uint8_t* src, uint8_t* dest, size_t size, char ignore)
{
    for (size_t i = 0; i < size; ++i)
    {
        char temps = src[i];
        char tempd = dest[i];
        dest[i] = temps == ignore ? tempd : temps;
    }
}

void copy_if_SSE(const uint8_t* src, uint8_t* dest, size_t size, uint8_t ignore)
{
    const __m128i vignore = _mm_set1_epi8(ignore);

    size_t i;

    for (i = 0; i + 16 <= size; i += 16)
    {
        __m128i v = _mm_loadu_si128((__m128i *)&src[i]);
        __m128i vmask = _mm_cmpeq_epi8(v, vignore);
        vmask = _mm_xor_si128(vmask, _mm_set1_epi8(-1));
        _mm_maskmoveu_si128(v, vmask, (char *)&dest[i]);
    }
    for (; i < size; ++i)
    {
        if (src[i] != ignore)
            dest[i] = src[i];
    }

}

我得到了以下结果:

Naive: 
Duration: 2.04844s
Vectorized: 
Pass: PASS
Duration: 3.18553s
SIMD: 
Pass: PASS
Duration: 0.481888s

我猜我的编译器(最新版MSVC)未能进行向量化,但是使用SIMD方案已经足够好了,谢谢!

更新(第二次) 我使用一些#pragma指令为我的编译器(MSVC)进行了向量化,事实上它比SIMD更快,这是最终的代码:

void copy_if_vectorized(const uint8_t* src, uint8_t* dest, size_t size, char ignore)
{

#pragma loop(hint_parallel(0))
#pragma loop(ivdep)

for (int i = 0; i < size; ++i) // Sadly no parallelization if i is unsigned, but more than 2Go of data is very unlikely
{
    char temps = src[i];
    char tempd = dest[i];
    dest[i] = temps == ignore ? tempd : temps;
}
}

1
并行化?几个线程分别处理复制的一部分? - Some programmer dude
2
如果您被限制在特定的架构上,例如x86,则这将是SIMD实现的良好选择(例如SSE-请参见[maskmovdqu / _mm_maskmoveu_si128](https://software.intel.com/sites/landingpage/IntrinsicsGuide/#expand=3268,3266,3266,3266&text=maskmovdqu))。 - Paul R
并行化可能是一个选项,但我已经在一个有很多线程的系统中了,我想避免再创建更多。实际上,我受限于x64架构。 - J. Edmond
1
请使用restrict关键字。如果您不确保指针不别名,编译器将无法矢量化任何内容。 - user3528438
“太慢”是什么意思?你要移动多大的数组?如果它们足够大,你将受到内存带宽的限制,没有任何优化可以帮助你。 - Art
显示剩余12条评论
4个回答

6

gcc向量化以下代码:

#include <stddef.h>
void copy_if(char* src, char* dest, size_t size, char ignore)
{
  for (size_t i = 0; i < size; ++i)
  {
    char temps = src[i];
    char tempd = dest[i];
    dest[i] = temps == ignore ? tempd : temps;
  }
}

请注意,dest[i] 的加载和赋值都是无条件的,因此编译器不受禁止在多线程程序中发明存储的限制所限制。
为了适应更现代的编译器和处理器,并提供godbolt链接:
在x86-64 gcc 11.1 编译 该代码时,使用 -O3 -mavx512f -mavx512bw 编译选项,生成处理64字节的对齐循环。
.L5:
        vmovdqu8        (%rdi,%rax), %zmm2
        vpcmpb  $4, %zmm0, %zmm2, %k1
        vmovdqu8        %zmm2, (%rsi,%rax){%k1}
        addq    $64, %rax
        cmpq    %rax, %r8
        jne     .L5

这个编译器也可以很好地处理gcc -std=gnu11 -O3 -mavx2相关的内容,一次处理32字节:
.L5:
        vpcmpeqb        (%rdi,%rax), %ymm1, %ymm0
        vmovdqu (%rdi,%rax), %ymm2
        vpblendvb       %ymm0, (%rsi,%rax), %ymm2, %ymm0
        vmovdqu %ymm0, (%rsi,%rax)
        addq    $32, %rax
        cmpq    %rax, %r8
        jne     .L5

一般情况下, 现代 编译器 对于 带有向量单元的任何处理器架构都表现良好

旧编译器(gcc 4.8.4),旧处理器(没有AVX512),旧答案:

对于-march=core-avx2,生成的汇编包含此向量化循环,每次处理32个字节:

.L9:
    vmovdqu (%rdi,%rcx), %ymm1
    addq    $1, %r10
    vmovdqu (%rsi,%rcx), %ymm2
    vpcmpeqb    %ymm0, %ymm1, %ymm3
    vpblendvb   %ymm3, %ymm2, %ymm1, %ymm1
    vmovdqu %ymm1, (%rsi,%rcx)
    addq    $32, %rcx
    cmpq    %r10, %r8
    ja  .L9

对于通用的x86-64,生成的汇编代码包含这个矢量化循环,每次处理16个字节:

.L9:
    movdqu  (%rdi,%r8), %xmm3
    addq    $1, %r10
    movdqa  %xmm3, %xmm1
    movdqu  (%rsi,%r8), %xmm2
    pcmpeqb %xmm0, %xmm1
    pand    %xmm1, %xmm2
    pandn   %xmm3, %xmm1
    por %xmm2, %xmm1
    movdqu  %xmm1, (%rsi,%r8)
    addq    $16, %r8
    cmpq    %r9, %r10
    jb  .L9

对于armv7l-neon,clang-3.7生成以下循环,每次处理16个字节:
.LBB0_9:                                @ %vector.body
                                        @ =>This Inner Loop Header: Depth=1
        vld1.8  {d18, d19}, [r5]!
        subs.w  lr, lr, #16
        vceq.i8 q10, q9, q8
        vld1.8  {d22, d23}, [r4]
        vbsl    q10, q11, q9
        vst1.8  {d20, d21}, [r4]!
        bne     .LBB0_9

所以,与汇编或指令集相比,该代码不仅更易读,而且还可以在多个架构和编译器上使用,具有可移植性。通过重新编译,可以轻松地利用新的架构和指令集扩展。

使用Intel 编译器 13.1.3.198,我不得不标记指针为 restrict 才能获得矢量化。否则,它会抱怨 “向量依赖”,我认为这是正确的(考虑 dst == src+1)。因此,我的建议是为指针添加 restrict 修改器以增加在各种平台和编译器上实现矢量化的机会。 - njuffa
如果指定了“假设无别名”标志(可以在命令行上显式指定,也可以隐含地作为特定优化级别的一部分),则某些编译器可能会在不使用restrict的情况下进行矢量化。 - njuffa
1
@njuffa:嗯,gcc只是在运行时检查重叠。它需要大约8个简单的指令(leaorsetcc)。你可以通过使指针“限定”来摆脱它们,但为什么要费心呢? - EOF
这是一个非常好的观点,尽管我熟悉这种技术,但我却忽略了它。 - njuffa

5

以下是使用SSE2指令利用maskmovdqu指令的示例。在Haswell CPU上,SIMD版本的速度似乎比原始版本快约两倍(使用clang编译的代码):

    #include <stdio.h>
    #include <string.h>
    #include <emmintrin.h>  // SSE2
    #include <sys/time.h>   // gettimeofday

    void copy_if_ref(const uint8_t* src, uint8_t* dest, size_t size, uint8_t ignore)
    {
        for (size_t i = 0; i < size; ++i)
        {
            if (src[i] != ignore)
                dest[i] = src[i];
        }
    }

    void copy_if_SSE(const uint8_t* src, uint8_t* dest, size_t size, uint8_t ignore)
    {
        const __m128i vignore = _mm_set1_epi8(ignore);

        size_t i;

        for (i = 0; i + 16 <= size; i += 16)
        {
            __m128i v = _mm_loadu_si128((__m128i *)&src[i]);
            __m128i vmask = _mm_cmpeq_epi8(v, vignore);
            vmask = _mm_xor_si128(vmask, _mm_set1_epi8(-1));
            _mm_maskmoveu_si128 (v, vmask, (char *)&dest[i]);
        }
        for ( ; i < size; ++i)
        {
            if (src[i] != ignore)
                dest[i] = src[i];
        }
    }

    #define TIME_IT(init, copy_if, src, dest, size, ignore) \
    do { \
        const int kLoops = 1000; \
        struct timeval t0, t1; \
        double t_ms = 0.0; \
     \
        for (int i = 0; i < kLoops; ++i) \
        { \
            init; \
            gettimeofday(&t0, NULL); \
            copy_if(src, dest, size, ignore); \
            gettimeofday(&t1, NULL); \
            t_ms += ((double)(t1.tv_sec - t0.tv_sec) + (double)(t1.tv_usec - t0.tv_usec) * 1.0e-6) * 1.0e3; \
        } \
        printf("%s: %.3g ns / element\n", #copy_if, t_ms * 1.0e6 / (double)(kLoops * size)); \
    } while (0)

    int main()
    {
        const size_t N = 10000000;

        uint8_t *src = malloc(N);
        uint8_t *dest_ref = malloc(N);
        uint8_t *dest_init = malloc(N);
        uint8_t *dest_test = malloc(N);

        for (size_t i = 0; i < N; ++i)
        {
            src[i] = (uint8_t)rand();
            dest_init[i] = (uint8_t)rand();
        }

        memcpy(dest_ref, dest_init, N);
        copy_if_ref(src, dest_ref, N, 0x42);

        memcpy(dest_test, dest_init, N);
        copy_if_SSE(src, dest_test, N, 0x42);
        printf("copy_if_SSE: %s\n", memcmp(dest_ref, dest_test, N) == 0 ? "PASS" : "FAIL");

        TIME_IT(memcpy(dest_test, dest_init, N), copy_if_ref, src, dest_ref, N, 0x42);
        TIME_IT(memcpy(dest_test, dest_init, N), copy_if_SSE, src, dest_test, N, 0x42);

        return 0;
    }

编译和测试:

$ gcc -Wall -msse2 -O3 copy_if.c && ./a.out 
copy_if_SSE: PASS
copy_if_ref: 0.416 ns / element
copy_if_SSE: 0.239 ns / element

(注:此答案的早期版本在计时代码中有一个错误系数为16,因此早期数字比应有的高16倍。)


更新

受@EOF解决方案和编译器生成的代码的启发,我尝试了一种不同的SSE4方法,并获得了更好的结果:

#include <stdio.h>
#include <string.h>
#include <smmintrin.h>  // SSE4
#include <sys/time.h>   // gettimeofday

void copy_if_ref(const uint8_t* src, uint8_t* dest, size_t size, uint8_t ignore)
{
    for (size_t i = 0; i < size; ++i)
    {
        if (src[i] != ignore)
            dest[i] = src[i];
    }
}

void copy_if_EOF(const uint8_t* src, uint8_t* dest, size_t size, uint8_t ignore)
{
    for (size_t i = 0; i < size; ++i)
    {
        char temps = src[i];
        char tempd = dest[i];
        dest[i] = temps == ignore ? tempd : temps;
    }
}

void copy_if_SSE(const uint8_t* src, uint8_t* dest, size_t size, uint8_t ignore)
{
    const __m128i vignore = _mm_set1_epi8(ignore);

    size_t i;

    for (i = 0; i + 16 <= size; i += 16)
    {
        __m128i vsrc = _mm_loadu_si128((__m128i *)&src[i]);
        __m128i vdest = _mm_loadu_si128((__m128i *)&dest[i]);
        __m128i vmask = _mm_cmpeq_epi8(vsrc, vignore);
        vdest = _mm_blendv_epi8(vsrc, vdest, vmask);
        _mm_storeu_si128 ((__m128i *)&dest[i], vdest);
    }
    for ( ; i < size; ++i)
    {
        if (src[i] != ignore)
            dest[i] = src[i];
    }
}

#define TIME_IT(init, copy_if, src, dest, size, ignore) \
do { \
    const int kLoops = 1000; \
    struct timeval t0, t1; \
    double t_ms = 0.0; \
 \
    for (int i = 0; i < kLoops; ++i) \
    { \
        init; \
        gettimeofday(&t0, NULL); \
        copy_if(src, dest, size, ignore); \
        gettimeofday(&t1, NULL); \
        t_ms += ((double)(t1.tv_sec - t0.tv_sec) + (double)(t1.tv_usec - t0.tv_usec) * 1.0e-6) * 1.0e3; \
    } \
    printf("%s: %.3g ns / element\n", #copy_if, t_ms * 1.0e6 / (double)(kLoops * size)); \
} while (0)

int main()
{
    const size_t N = 10000000;

    uint8_t *src = malloc(N);
    uint8_t *dest_ref = malloc(N);
    uint8_t *dest_init = malloc(N);
    uint8_t *dest_test = malloc(N);

    for (size_t i = 0; i < N; ++i)
    {
        src[i] = (uint8_t)rand();
        dest_init[i] = (uint8_t)rand();
    }

    memcpy(dest_ref, dest_init, N);
    copy_if_ref(src, dest_ref, N, 0x42);

    memcpy(dest_test, dest_init, N);
    copy_if_EOF(src, dest_test, N, 0x42);
    printf("copy_if_EOF: %s\n", memcmp(dest_ref, dest_test, N) == 0 ? "PASS" : "FAIL");

    memcpy(dest_test, dest_init, N);
    copy_if_SSE(src, dest_test, N, 0x42);
    printf("copy_if_SSE: %s\n", memcmp(dest_ref, dest_test, N) == 0 ? "PASS" : "FAIL");

    TIME_IT(memcpy(dest_test, dest_init, N), copy_if_ref, src, dest_ref, N, 0x42);
    TIME_IT(memcpy(dest_test, dest_init, N), copy_if_EOF, src, dest_test, N, 0x42);
    TIME_IT(memcpy(dest_test, dest_init, N), copy_if_SSE, src, dest_test, N, 0x42);

    return 0;
}

编译和测试:

$ gcc -Wall -msse4 -O3 copy_if_2.c && ./a.out 
copy_if_EOF: PASS
copy_if_SSE: PASS
copy_if_ref: 0.419 ns / element
copy_if_EOF: 0.114 ns / element
copy_if_SSE: 0.114 ns / element

结论: 虽然从功能角度来看,_mm_maskmoveu_si128似乎是解决这个问题的好办法,但它不如使用显式加载、屏蔽和存储效率高。此外,在这种情况下,编译器生成的代码(参见@EOF的答案)似乎与显式编码的SIMD一样快。


1
@EOF:我刚意识到我的时序代码误差因子为16(我使用了一些旧代码,之前没有发现)。我已经更新了上面的原始SSE2实现,并添加了一个SSE4版本,灵感来自于您解决方案的编译器生成的代码。现在时间基本匹配。我认为自动向量化的代码对于这种特定情况很难被超越。 - Paul R
2
不要忘记提到maskmovdqu带有非暂态提示,因此它完成时会将目标从缓存中驱逐。在Haswell上,这是一个10-uop指令,吞吐量为每6个周期的一个。vmaskmovps/pdvpmaskmovd/q没有NT提示,并且速度更快(特别是如果您在读取目标的同时它仍然在缓存中)。但是它们只按32b或64b粒度进行屏蔽。没有AVX2 vmaskmovdqu ymm,只有仍然带有NT提示的128b版本。 - Peter Cordes
1
SSE:总是提供几乎你想要的指令,但不完全!我发现这在查看汇编代码时尤其正确,而不仅仅是内部函数。避免额外的mov指令很难,因为通常所需的数据移动会在目标位置就地进行。位/字节移位无明显原因地在原地发生。每个移位都有自己的操作码,它们只是不使用/r字段选择不同的目标。它们都可以像pshufd一样,有一个单独的目标。 :/ - Peter Cordes
2
@PeterCordes:幸运的是,AVX(2)为x86向量指令提供了合理的编码。但我希望英特尔和AMD能停止制造不支持它的新机器。 - EOF
1
(更正之前的评论; SIMD立即移位,如psllw xmm0,2确实使用/r字段获取额外的操作码位,66 0F 71 /6 ibpsraw/4不同。也许我看的是从另一个寄存器中获取移位计数的psllw xmm,xmm/m128,这当然需要两个ModRM字段) - Peter Cordes
显示剩余2条评论

0

以下内容将是一种改进,尽管编译器本身也可以设计出这种改进。

void copy_if(char* src, char* dest, size_t size, char ignore)
{
  while (size--)
  {
    if (*src != ignore)
      *dest = *src;
    src++; dest++;
  }
}

你的实现与原始实现不同 - 当你在拷贝过程中忽略字节时,在目标数组中不会留下“空洞”。(而且while循环中的size_t显然是一个笔误)。- 我同意你的观点,大多数现代编译器应该能够自行处理这个问题。 - tofro
@tofro 抱歉,但我很好奇。你能告诉我这与原问题有何不同吗?对我来说,它们看起来都在做同样的事情。 - muXXmit2X
原始版本即使字符没有被复制也会递增dest指针,从而在目标数组中创建“空洞”。Paul的版本在这种情况下不会递增它,因此可以通过任何跳过复制来移动数组内容。我现在看到它显然已经被编辑过了,所以不再是这种情况了。请忽略我的评论。 - tofro
@tofro和其他人,我看到了并且修改了答案。Tofro,你可能是在参考旧答案。对于造成的困惑,我感到抱歉。 - Paul Ogilvie

0
如果忽略的频率不是太高,那么像下面这样的memcpy代码可能会有所帮助。
size_t copy_if(char* src, char* dest, size_t size, char ignore)
{
    size_t i=0, count =0 , res= 0;
    while (count < size)
    {
    while (*src != ignore){
        count++;
        if (count > size)
             break;
        src++;
        i++;
        res++;
    }
    count++;
    if (i> 0){
        memcpy(dest,src-i, i);
        dest += i;
    }
    i = 0;
    src++;
  }
return res;
}

使用大多数(全部?)memcpy实现时,您现在将传递数组两次。一次用于搜索忽略的出现,第二次(隐藏)在memcpy()内部。很可能不比以前更好;) - tofro

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接