如何在C#中快速地从一个ushort数组中减去另一个ushort数组?

5
我需要快速减去ushort类型的数组A中每个值与具有相同长度的ushort类型数组B中相应索引值。此外,如果差值为负数,则需要存储零而不是负数差值。
(确切地说,长度为327680,因为我正在从一个相同大小的图像中减去一个640x512的图像)。
下面的代码目前需要约20ms,如果可能的话我想将其降至约5ms以下。使用unsafe代码是可以的,但请提供示例,因为我不太擅长编写unsafe代码。
谢谢!
public ushort[] Buffer { get; set; }

public void SubtractBackgroundFromBuffer(ushort[] backgroundBuffer)
{
    System.Diagnostics.Stopwatch sw = new System.Diagnostics.Stopwatch();
    sw.Start();

    int bufferLength = Buffer.Length;

    for (int index = 0; index < bufferLength; index++)
    {
        int difference = Buffer[index] - backgroundBuffer[index];

        if (difference >= 0)
            Buffer[index] = (ushort)difference;
        else
            Buffer[index] = 0;
    }

    Debug.WriteLine("SubtractBackgroundFromBuffer(ms): " + sw.Elapsed.TotalMilliseconds.ToString("N2"));
}

更新:尽管这不是纯粹的C#,但为了其他人的利益,我最终在我的解决方案中添加了一个C++ CLR类库,并使用以下代码。它运行时间大约为3.1毫秒。如果使用未管理的C++库,则运行时间为2.2毫秒。考虑到时间差异很小,我决定使用托管库。

// SpeedCode.h
#pragma once
using namespace System;

namespace SpeedCode
{
    public ref class SpeedClass
    {
        public:
            static void SpeedSubtractBackgroundFromBuffer(array<UInt16> ^ buffer, array<UInt16> ^ backgroundBuffer, int bufferLength);
    };
}

// SpeedCode.cpp
// This is the main DLL file.
#include "stdafx.h"
#include "SpeedCode.h"

namespace SpeedCode
{
    void SpeedClass::SpeedSubtractBackgroundFromBuffer(array<UInt16> ^ buffer, array<UInt16> ^ backgroundBuffer, int bufferLength)
    {
        for (int index = 0; index < bufferLength; index++)
        {
            buffer[index] = (UInt16)((buffer[index] - backgroundBuffer[index]) * (buffer[index] > backgroundBuffer[index]));
        }
    }
}

然后我这样调用它:
    public void SubtractBackgroundFromBuffer(ushort[] backgroundBuffer)
    {
        System.Diagnostics.Stopwatch sw = new System.Diagnostics.Stopwatch();
        sw.Start();

        SpeedCode.SpeedClass.SpeedSubtractBackgroundFromBuffer(Buffer, backgroundBuffer, Buffer.Length);

        Debug.WriteLine("SubtractBackgroundFromBuffer(ms): " + sw.Elapsed.TotalMilliseconds.ToString("N2"));
    }

20毫秒听起来相当慢(也许你的机器配置较低?)。“以防万一”,你是在运行没有调试的发布版本吗? - Ergwun
p/invoke和使用PSUBW? - Yaur
你们在处理灰度图像吗? - nick_w
@GeoffBattye:我的电脑是Win7 64位i5。我正在运行调试版本。 - nb1forxp
@nick_w:是的,我正在处理灰度图像。 - nb1forxp
我已经更新了我的答案,使用了一个新的更快的方法,它使用了C++/CLI。 - Ergwun
6个回答

5

一些基准测试结果。

  1. SubtractBackgroundFromBuffer: 这是原问题中提出的方法。
  2. SubtractBackgroundFromBufferWithCalcOpt: 这是在 TTat 的改进下对原方法进行扩展以提高计算速度的方法。
  3. SubtractBackgroundFromBufferParallelFor: Selman22 的答案中提供的解决方案。
  4. SubtractBackgroundFromBufferBlockParallelFor: 我的答案。与 3. 类似,但将处理分成了 4096 值的块。
  5. SubtractBackgroundFromBufferPartitionedParallelForEach: Geoff 的第一个答案。
  6. SubtractBackgroundFromBufferPartitionedParallelForEachHack: Geoff 的第二个答案。

更新

有趣的是,我可以通过使用 Bruno Costa 建议的方法略微提高(约为 6%)SubtractBackgroundFromBufferBlockParallelFor 的速度。

Buffer[i] = (ushort)Math.Max(difference, 0);

替代

if (difference >= 0)
    Buffer[i] = (ushort)difference;
else
    Buffer[i] = 0;

结果

注意,这是每次运行中1000次迭代的总时间。

SubtractBackgroundFromBuffer(ms):                                 2,062.23 
SubtractBackgroundFromBufferWithCalcOpt(ms):                      2,245.42
SubtractBackgroundFromBufferParallelFor(ms):                      4,021.58
SubtractBackgroundFromBufferBlockParallelFor(ms):                   769.74
SubtractBackgroundFromBufferPartitionedParallelForEach(ms):         827.48
SubtractBackgroundFromBufferPartitionedParallelForEachHack(ms):     539.60

从这些结果来看,最好的方法是结合计算优化来获得小的收益,并利用Parallel.For在图像块上进行操作。当然,您的效果会有所不同,并且并行代码的性能取决于您运行的CPU。

测试工具

我以发布模式为每种方法运行此程序。我以这种方式启动和停止Stopwatch,以确保只测量处理时间。

System.Diagnostics.Stopwatch sw = new System.Diagnostics.Stopwatch();
ushort[] bgImg = GenerateRandomBuffer(327680, 818687447);

for (int i = 0; i < 1000; i++)
{
    Buffer = GenerateRandomBuffer(327680, 128011992);                

    sw.Start();
    SubtractBackgroundFromBuffer(bgImg);
    sw.Stop();
}

Console.WriteLine("SubtractBackgroundFromBuffer(ms): " + sw.Elapsed.TotalMilliseconds.ToString("N2"));


public static ushort[] GenerateRandomBuffer(int size, int randomSeed)
{
    ushort[] buffer = new ushort[327680];
    Random random = new Random(randomSeed);

    for (int i = 0; i < size; i++)
    {
        buffer[i] = (ushort)random.Next(ushort.MinValue, ushort.MaxValue);
    }

    return buffer;
}

方法

public static void SubtractBackgroundFromBuffer(ushort[] backgroundBuffer)
{
    int bufferLength = Buffer.Length;

    for (int index = 0; index < bufferLength; index++)
    {
        int difference = Buffer[index] - backgroundBuffer[index];

        if (difference >= 0)
            Buffer[index] = (ushort)difference;
        else
            Buffer[index] = 0;
    }
}

public static void SubtractBackgroundFromBufferWithCalcOpt(ushort[] backgroundBuffer)
{
    int bufferLength = Buffer.Length;

    for (int index = 0; index < bufferLength; index++)
    {
        if (Buffer[index] < backgroundBuffer[index])
        {
            Buffer[index] = 0;
        }
        else
        {
            Buffer[index] -= backgroundBuffer[index];
        }
    }
}

public static void SubtractBackgroundFromBufferParallelFor(ushort[] backgroundBuffer)
{
    Parallel.For(0, Buffer.Length, (i) =>
    {
        int difference = Buffer[i] - backgroundBuffer[i];
        if (difference >= 0)
            Buffer[i] = (ushort)difference;
        else
            Buffer[i] = 0;
    });
}        

public static void SubtractBackgroundFromBufferBlockParallelFor(ushort[] backgroundBuffer)
{
    int blockSize = 4096;

    Parallel.For(0, (int)Math.Ceiling(Buffer.Length / (double)blockSize), (j) =>
    {
        for (int i = j * blockSize; i < (j + 1) * blockSize; i++)
        {
            int difference = Buffer[i] - backgroundBuffer[i];

            Buffer[i] = (ushort)Math.Max(difference, 0);                    
        }
    });
}

public static void SubtractBackgroundFromBufferPartitionedParallelForEach(ushort[] backgroundBuffer)
{
    Parallel.ForEach(Partitioner.Create(0, Buffer.Length), range =>
        {
            for (int i = range.Item1; i < range.Item2; ++i)
            {
                if (Buffer[i] < backgroundBuffer[i])
                {
                    Buffer[i] = 0;
                }
                else
                {
                    Buffer[i] -= backgroundBuffer[i];
                }
            }
        });
}

public static void SubtractBackgroundFromBufferPartitionedParallelForEachHack(ushort[] backgroundBuffer)
{
    Parallel.ForEach(Partitioner.Create(0, Buffer.Length), range =>
    {
        for (int i = range.Item1; i < range.Item2; ++i)
        {
            unsafe
            {
                var nonNegative = Buffer[i] > backgroundBuffer[i];
                Buffer[i] = (ushort)((Buffer[i] - backgroundBuffer[i]) *
                    *((int*)(&nonNegative)));
            }
        }
    });
}

@BrunoCosta 1. 我不确定我理解你的意思。你所说的“再次分区”是什么意思?2. 你认为这不是在整个数组上操作的原因是什么?块大小是一个有点随意的选择,也许值得进一步进行基准测试。 - nick_w
我完全误解了这段代码...但我仍然相信Parallel.Foreach会自动进行分区。 分区意味着许多线程可以被分配来处理你的4096块。 但也许我搞错了... - Bruno Costa
@BrunoCosta 从文档来看,你传递给Parallel.For的委托将在每次迭代中执行一次。我认为从这点可以确定每次迭代都会在单个线程中运行。 - nick_w
你还应该尝试在本地变量中存储对Buffer的引用(在并行版本中,在委托内部执行此操作)。据我所知,优化器不会为您执行此操作,因为另一个线程可能会更改Buffer的值。此外,如果JIT不能确定数组实例不会更改,则无法消除冗余边界检查。 - Daniel
@Daniel 我刚刚尝试了一下,也就是将代码 var localBuffer = Buffer; 放在 Parallel.For 委托中,但由于某种原因,代码运行速度明显变慢了。很奇怪。 - nick_w
感谢大家提供的伟大建议!我希望能将其中几个标记为解决方案,因为很多人对整体解决方案做出了贡献。但现在,我必须把解决方案给予nick_w,因为他的测试工具、各种方法的列举、基准测试和完整性都非常出色。使用Parallel.ForEach和Math.Max,我将平均执行时间降低到约9毫秒。再次感谢大家! - nb1forxp

5

这是一个有趣的问题。

只有在测试结果不会为负数(如TTat和Maximum Cookie所建议的)之后执行减法,对性能影响微乎其微,因为JIT编译器已经可以执行此优化。

并行化任务(如Selman22所建议的)是一个好主意,但当循环速度像这种情况下一样快时,开销最终会超过收益,因此在我的测试中Selman22的实现实际上运行得更慢。我怀疑nick_w的基准测试是在调试器附加的情况下产生的,隐藏了这个事实。

将任务分成更大的块(如nick_w所建议的)可以解决开销问题,并且实际上可以产生更快的性能,但您不必自己计算块 - 您可以使用Partitioner来为您完成此操作:

public static void SubtractBackgroundFromBufferPartitionedParallelForEach(
    ushort[] backgroundBuffer)
{
    Parallel.ForEach(Partitioner.Create(0, Buffer.Length), range =>
        {
            for (int i = range.Item1; i < range.Item2; ++i)
            {
                if (Buffer[i] < backgroundBuffer[i])
                {
                    Buffer[i] = 0;
                }
                else
                {
                    Buffer[i] -= backgroundBuffer[i];
                }
            }
        });
}

上述方法在我的测试中始终优于nick_w's手写的分块方法。
但是!事情并不止于此。
真正拖慢代码速度的罪魁祸首不是赋值或算术运算。而是if语句。它对性能的影响将会受到您正在处理的数据性质的重大影响。 nick_w's基准测试为两个缓冲区生成相同量级的随机数据。然而,我怀疑您实际上在后台缓冲区中具有更低的平均量级数据。由于分支预测(如this classic SO answer所解释),这些细节可能非常重要。
当后台缓存中的值通常小于缓存中的值时,JIT编译器可以注意到这一点,并相应地优化该分支。当每个缓冲区中的数据来自相同的随机总体时,无法以高于50%的准确度猜测 if 语句的结果。正是在这种后一种情况下,nick_w正在进行基准测试,并在这些条件下,我们可能通过使用不安全代码将布尔值转换为整数并完全避免分支来进一步优化您的方法。(请注意,以下代码依赖于有关内存中如何表示bool的实现细节,虽然它适用于.NET 4.5中的您的情况,但不一定是一个好主意,并且仅出于说明目的而在此显示。)
public static void SubtractBackgroundFromBufferPartitionedParallelForEachHack(
    ushort[] backgroundBuffer)
{
    Parallel.ForEach(Partitioner.Create(0, Buffer.Length), range =>
        {
            for (int i = range.Item1; i < range.Item2; ++i)
            {
                unsafe
                {
                    var nonNegative = Buffer[i] > backgroundBuffer[i];
                    Buffer[i] = (ushort)((Buffer[i] - backgroundBuffer[i]) *
                        *((int*)(&nonNegative)));
                }
            }
        });
}

如果你真的想节省更多时间,那么你可以通过切换语言到C++/CLI以更安全的方式遵循这种方法,因为这将允许你在算术表达式中使用布尔值而不需要诉诸不安全的代码:
UInt16 MyCppLib::Maths::SafeSubtraction(UInt16 minuend, UInt16 subtrahend)
{
    return (UInt16)((minuend - subtrahend) * (minuend > subtrahend));
}

你可以使用C++/CLI创建一个纯托管的DLL,暴露上述静态方法,然后在你的C#代码中使用它:
public static void SubtractBackgroundFromBufferPartitionedParallelForEachCpp(
    ushort[] backgroundBuffer)
{
    Parallel.ForEach(Partitioner.Create(0, Buffer.Length), range =>
    {
        for (int i = range.Item1; i < range.Item2; ++i)
        {
            Buffer[i] = 
                MyCppLib.Maths.SafeSubtraction(Buffer[i], backgroundBuffer[i]);
        }
    });
}

这比上面的hacky不安全的C#代码表现更好。事实上,它非常快,以至于您可以使用C++/CLI编写整个方法,而忘记并行化,它仍然可以优于其他技术。

使用nick_w的测试工具包, 上述方法将优于此前在此处发布的任何其他建议。以下是我得到的结果(1-4是他尝试的案例,5-7是本答案中概述的案例):

1. SubtractBackgroundFromBuffer(ms):                               2,021.37
2. SubtractBackgroundFromBufferWithCalcOpt(ms):                    2,125.80
3. SubtractBackgroundFromBufferParallelFor(ms):                    3,431.58
4. SubtractBackgroundFromBufferBlockParallelFor(ms):               1,401.36
5. SubtractBackgroundFromBufferPartitionedParallelForEach(ms):     1,197.76
6. SubtractBackgroundFromBufferPartitionedParallelForEachHack(ms):   742.72
7. SubtractBackgroundFromBufferPartitionedParallelForEachCpp(ms):    499.27

然而,在我预期的情况下,背景值通常较小,成功的分支预测可以全面提高结果,并且避免if语句的“hack”实际上会更慢:

当我将背景缓冲区中的值限制在范围0-6500(约占缓冲区的10%)时,使用nick_w's test harness得到的结果如下:

1. SubtractBackgroundFromBuffer(ms):                                 773.50
2. SubtractBackgroundFromBufferWithCalcOpt(ms):                      915.91
3. SubtractBackgroundFromBufferParallelFor(ms):                    2,458.36
4. SubtractBackgroundFromBufferBlockParallelFor(ms):                 663.76
5. SubtractBackgroundFromBufferPartitionedParallelForEach(ms):       658.05
6. SubtractBackgroundFromBufferPartitionedParallelForEachHack(ms):   762.11
7. SubtractBackgroundFromBufferPartitionedParallelForEachCpp(ms):    494.12

你可以看到,结果1-5都有了显著的改善,因为它们现在受益于更好的分支预测。结果6和7没有太大变化,因为它们避免了分支。
数据的这种变化完全改变了情况。在这种情况下,即使是最快的全C#解决方案,现在也只比你的原始代码快15%。
底线:一定要用代表性数据测试任何选择的方法,否则你的结果将毫无意义。

你正在将bool强制转换为int(无效),并且正在假设true bool的数字值(无效-不能保证是1)。不过我喜欢避免分支的一般想法。 - usr
@usr 是的,那段代码确实可以工作,但你说得对,依赖于这个实现细节并不是一个好主意 - 我会澄清这一点。我怀疑在 OP 的数据下,这种 hack 实际上会更慢,正如我在答案中所说的那样。 - Ergwun
@usr 我现在已经使用C++/CLI添加了一个安全版本的分支避免技术。 - Ergwun
有趣的结果。这仍然是托管和JIT编译的,对吧?我想知道发出了哪些指令,使得它运行得如此之快。 - usr
很好了解。.NET JIT本身无法进行该转换,因此C++编译器正在提供帮助。 - usr
显示剩余2条评论

1
你可以尝试使用 Parallel.For
Parallel.For(0, Buffer.Length, (i) =>
{
    int difference = Buffer[i] - backgroundBuffer[i];
    if (difference >= 0)
          Buffer[i] = (ushort) difference;
    else
         Buffer[i] = 0;
}); 

更新:我已经尝试过了,发现在你的情况下差异很小,但是当数组变得更大时,差异也会变得更大。

enter image description here


@elgonzo Parallel.For在每次迭代中不会创建新任务:Parallel.For是否在每个迭代中使用一个任务? - MarcinJuraszek
这可能会节省一些减法和转换周期: 如果(Buffer [i] <= backgroundBuffer [i]){ Buffer [i] = 0; } else { Buffer [i] -= backgroundBuffer [i]; } - TTat
不过这可能会影响性能,但我会使用Buffer[i] = Math.max(0, difference)。(也许你可以对此进行基准测试) - Bruno Costa

1
你可以通过在执行减法操作之前先检查结果是否为负数来获得轻微的性能提升。这样,如果结果将为负数,则无需执行减法操作。示例:
if (Buffer[index] > backgroundBuffer[index])
    Buffer[index] = (ushort)(Buffer[index] - backgroundBuffer[index]);
else
    Buffer[index] = 0;

这取决于Jitter如何将IL代码编译成汇编语言。即使它能提高速度,也不会超过几微秒。 - Bruno Costa

0
这里是使用 Zip() 的解决方案:
Buffer = Buffer.Zip<ushort, ushort, ushort>(backgroundBuffer, (x, y) =>
{
    return (ushort)Math.Max(0, x - y);
}).ToArray();

它的表现不如其他答案好,但绝对是最短的解决方案。


0

关于这个,

Enumerable.Range(0, Buffer.Length).AsParalell().ForAll(i =>
    {
         unsafe
        {
            var nonNegative = Buffer[i] > backgroundBuffer[i];
            Buffer[i] = (ushort)((Buffer[i] - backgroundBuffer[i]) *
                *((int*)(&nonNegative)));
        }
    });

这个比使用PartitionerParallel.Foreach慢大约10倍。令人惊讶的是它落后得那么远。 - Ergwun

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接