并行化直方图函数

6

我已经实现了一个简单函数的普通版本和并行版本,用于计算32bppArgb位图的直方图。正常版本在1920x1080像素的图像上大约需要0.03秒的时间,而并行版本则需要0.07秒。

线程开销真的那么大吗?除了Parallel.For之外,还有其他构造可以加速这个过程吗?我需要加快这个过程,因为我要处理每秒30帧的视频。

以下是简化版代码:

public sealed class Histogram
{
    public int MaxA = 0;
    public int MaxR = 0;
    public int MaxG = 0;
    public int MaxB = 0;
    public int MaxT = 0;

    public int [] A = null;
    public int [] R = null;
    public int [] G = null;
    public int [] B = null;

    public Histogram ()
    {
        this.A = new int [256];
        this.R = new int [256];
        this.G = new int [256];
        this.B = new int [256];

        this.Initialize();
    }

    public void Initialize ()
    {
        this.MaxA = 0;
        this.MaxR = 0;
        this.MaxG = 0;
        this.MaxB = 0;
        this.MaxT = 0;

        for (int i = 0; i < this.A.Length; i++)
            this.A [i] = 0;
        for (int i = 0; i < this.R.Length; i++)
            this.R [i] = 0;
        for (int i = 0; i < this.G.Length; i++)
            this.G [i] = 0;
        for (int i = 0; i < this.B.Length; i++)
            this.B [i] = 0;
    }

    public void ComputeHistogram (System.Drawing.Bitmap bitmap, bool parallel = false)
    {
        System.Drawing.Imaging.BitmapData data = null;

        data = bitmap.LockBits
        (
            new System.Drawing.Rectangle(0, 0, bitmap.Width, bitmap.Height),
            System.Drawing.Imaging.ImageLockMode.ReadOnly,
            System.Drawing.Imaging.PixelFormat.Format32bppArgb
        );

        try
        {
            ComputeHistogram(data, parallel);
        }
        catch
        {
            bitmap.UnlockBits(data);

            throw;
        }

        bitmap.UnlockBits(data);
    }

    public void ComputeHistogram (System.Drawing.Imaging.BitmapData data, bool parallel = false)
    {
        int stride = System.Math.Abs(data.Stride);

        this.Initialize();

        if (parallel)
        {
            unsafe
            {
                System.Threading.Tasks.Parallel.For
                (
                    0,
                    data.Height,
                    new System.Threading.Tasks.ParallelOptions() { MaxDegreeOfParallelism = System.Environment.ProcessorCount },
                    y =>
                    {
                        byte* pointer = ((byte*) data.Scan0) + (stride * y);

                        for (int x = 0; x < stride; x += 4)
                        {
                            this.B [pointer [x + 0]]++;
                            this.G [pointer [x + 1]]++;
                            this.R [pointer [x + 2]]++;
                            this.A [pointer [x + 3]]++;
                        }
                    }
                );
            }
        }
        else
        {
            unsafe
            {
                for (int y = 0; y < data.Height; y++)
                {
                    byte* pointer = ((byte*) data.Scan0) + (stride * y);

                    for (int x = 0; x < stride; x += 4)
                    {
                        this.B [pointer [x + 0]]++;
                        this.G [pointer [x + 1]]++;
                        this.R [pointer [x + 2]]++;
                        this.A [pointer [x + 3]]++;
                    }
                }
            }
        }

        for (int i = 0; i < this.A.Length; i++)
            if (this.MaxA < this.A [i]) this.MaxA = this.A [i];
        for (int i = 0; i < this.R.Length; i++)
            if (this.MaxR < this.R [i]) this.MaxR = this.R [i];
        for (int i = 0; i < this.G.Length; i++)
            if (this.MaxG < this.G [i]) this.MaxG = this.G [i];
        for (int i = 0; i < this.B.Length; i++)
            if (this.MaxB < this.B [i]) this.MaxB = this.B [i];

        if (this.MaxT < this.MaxA) this.MaxT = this.MaxA;
        if (this.MaxT < this.MaxR) this.MaxT = this.MaxR;
        if (this.MaxT < this.MaxG) this.MaxT = this.MaxG;
        if (this.MaxT < this.MaxB) this.MaxT = this.MaxB;
    }
}

2
你尝试过让每个线程计算不止一行吗?也许让它们处理10-20行可以加快速度。 - Mohammed Hossain
1
对于传递到“Parallel.For”的lambda表达式,请尝试从“y”循环到“y”+(必须找到的某个最佳数字)。当然,这意味着调整“Parallel.For”的第二个参数,从“data.Height”调整为其他内容。 - Mohammed Hossain
你如何确保直方图桶的更新是原子的?你可能会收到线程串行化的性能。 - Mark Ransom
@gabba:这是不可能的,因为一个图像的输出会作为下一个图像的输入。 - Raheel Khan
@FredF:我不能对样本进行采样,因为后续操作依赖于此结果。 - Raheel Khan
显示剩余8条评论
2个回答

9
首先,你的并行循环中存在一个严重的错误:多个线程会访问、增加和更新共享数组——仅仅在同一图像上运行你的示例代码多次就会因固有竞争条件而导致极为不同的结果。
但这不是你所询问的问题。
至于为什么使用并行实现会导致性能下降,简单地说,可能是因为每个并行任务体内的工作量不足以抵消创建新任务、安排任务等“启动成本”。
更为关键的是,我认为你正在通过所有内存跳跃来大量占用L1/L2缓存——每个任务线程都会尝试将其认为需要的内容加载到缓存中,但由于你正在随意索引,因此不再创建一致的访问模式,所以每次尝试访问位图缓冲区或内部数组时都可能会出现缓存未命中。
还有一种同样高效的方式可以获取位图的只读数据,而不需要使用不安全的代码...实际上,我们先这样做:
通过调用LockBits,你获得了指向非托管内存的指针。让我们复制它:
System.Drawing.Imaging.BitmapData data = null;
data = bitmap.LockBits
(
    new System.Drawing.Rectangle(0, 0, bitmap.Width, bitmap.Height),
    System.Drawing.Imaging.ImageLockMode.ReadOnly,
    System.Drawing.Imaging.PixelFormat.Format32bppArgb
);

// For later usage
var imageStride = data.Stride;
var imageHeight = data.Height;

// allocate space to hold the data
byte[] buffer = new byte[data.Stride * data.Height];

// Source will be the bitmap scan data
IntPtr pointer = data.Scan0;

// the CLR marshalling system knows how to move blocks of bytes around, FAST.
Marshal.Copy(pointer, buffer, 0, buffer.Length);

// and now we can unlock this since we don't need it anymore
bitmap.UnlockBits(data);

ComputeHistogram(buffer, imageStride, imageHeight, parallel);

关于竞争条件 - 您可以通过使用Interlocked调用来提高计数的方式以合理的性能克服此问题(注意!!! 多线程编程很难,我的解决方案可能并不完美!)

public void ComputeHistogram (byte[] data, int stride, int height, bool parallel = false)
{
    this.Initialize();

    if (parallel)
    {
        System.Threading.Tasks.Parallel.For
        (
            0,
            height,
            new ParallelOptions() { MaxDegreeOfParallelism = Environment.ProcessorCount },
            y =>
            {
                int startIndex = (stride * y);
                int endIndex = stride * (y+1);
                for (int x = startIndex; x < endIndex; x += 4)
                {
                    // Interlocked actions are more-or-less atomic 
                    // (caveats abound, but this should work for us)
                    Interlocked.Increment(ref this.B[data[x]]);
                    Interlocked.Increment(ref this.G[data[x+1]]);
                    Interlocked.Increment(ref this.R[data[x+2]]);
                    Interlocked.Increment(ref this.A[data[x+3]]);
                }
            }
        );
    }
    else
    {
        // the original way is ok for non-parallel, since only one
        // thread is mucking around with the data
    }

    // Sorry, couldn't help myself, this just looked "cleaner" to me
    this.MaxA = this.A.Max();
    this.MaxR = this.R.Max();
    this.MaxG = this.G.Max();
    this.MaxB = this.B.Max();
    this.MaxT = new[] { this.MaxA, this.MaxB, this.MaxG, this.MaxR }.Max();
}

那么,这对运行时行为有什么影响吗?

没有太大影响,但至少并行分支现在计算正确的结果了。 :)

使用一个非常简单的测试装置:

void Main()
{    
    foreach(var useParallel in new[]{false, true})
    {
        var totalRunTime = TimeSpan.Zero;
        var sw = new Stopwatch();
        var runCount = 10;
        for(int run=0; run < runCount; run++)
        {
            GC.Collect();
            GC.WaitForPendingFinalizers();
            GC.Collect();
            sw.Reset();
            sw.Start();
            var bmp = Bitmap.FromFile(@"c:\temp\banner.bmp") as Bitmap;
            var hist = new Histogram();
            hist.ComputeHistogram(bmp, useParallel);
            sw.Stop();
            totalRunTime = totalRunTime.Add(sw.Elapsed);
        }
        Console.WriteLine("Parallel={0}, Avg={1} ms", useParallel, totalRunTime.TotalMilliseconds / runCount);
    }
}

我得到的结果如下所示:

Parallel=False, Avg=1.69777 ms
Parallel=True, Avg=5.33584 ms

如您所见,我们还没有回答您最初的问题。 :)

那么让我们试着让并行工作“更好”:

让我们看看将任务“增加工作量”会发生什么:

if (parallel)
{
    var batchSize = 2;
    System.Threading.Tasks.Parallel.For
    (
        0,
        height / batchSize,
        new ParallelOptions() { MaxDegreeOfParallelism = Environment.ProcessorCount },
        y =>
        {
            int startIndex = (stride * y * batchSize);
            int endIndex = startIndex + (stride * batchSize);
            for (int x = startIndex; x < endIndex; x += 4)
            {
                // Interlocked actions are more-or-less atomic 
                // (caveats abound, but this should work for us)
                Interlocked.Increment(ref this.B[data[x]]);
                Interlocked.Increment(ref this.G[data[x+1]]);
                Interlocked.Increment(ref this.R[data[x+2]]);
                Interlocked.Increment(ref this.A[data[x+3]]);
            }
        }
    );
}

结果:

Parallel=False, Avg=1.70273 ms
Parallel=True, Avg=4.82591 ms

哦,这看起来很有前途……我想知道当我们改变batchSize时会发生什么?
让我们这样改变我们的测试装置:
void Main()
{    
    foreach(var useParallel in new[]{false, true})
    {
        for(int batchSize = 1; batchSize < 1024; batchSize <<= 1)
        {
            var totalRunTime = TimeSpan.Zero;
            var sw = new Stopwatch();
            var runCount = 10;
            for(int run=0; run < runCount; run++)
            {
                GC.Collect();
                GC.WaitForPendingFinalizers();
                GC.Collect();
                sw.Reset();
                sw.Start();
                var bmp = Bitmap.FromFile(@"c:\temp\banner.bmp") as Bitmap;
                var hist = new Histogram();
                hist.ComputeHistogram(bmp, useParallel, batchSize);
                sw.Stop();
                totalRunTime = totalRunTime.Add(sw.Elapsed);
            }
            Console.WriteLine("Parallel={0}, BatchSize={1} Avg={2} ms", useParallel, batchSize, totalRunTime.TotalMilliseconds / runCount);
        }        
    }
}

结果:(仅显示parallel=true,因为non-parallel不会改变)

Parallel=True, BatchSize=1 Avg=5.57644 ms
Parallel=True, BatchSize=2 Avg=5.49982 ms
Parallel=True, BatchSize=4 Avg=5.20434 ms
Parallel=True, BatchSize=8 Avg=5.1721 ms
Parallel=True, BatchSize=16 Avg=5.00405 ms
Parallel=True, BatchSize=32 Avg=4.44973 ms
Parallel=True, BatchSize=64 Avg=2.28332 ms
Parallel=True, BatchSize=128 Avg=1.39957 ms
Parallel=True, BatchSize=256 Avg=1.29156 ms
Parallel=True, BatchSize=512 Avg=1.28656 ms

我们似乎在批处理大小达到64-128范围时接近一个渐近线,当然,您的里程可能会因位图大小等而异。
希望这能帮到您!这是我在等待生产构建完成的有趣消遣! :)

谢谢!这样的回答是具有感染力的,可以鼓励SO用户回答更多问题。太棒了。 - Raheel Khan
关于memcopy,我猜你这样做只是为了避免不安全的代码,对吗? - Raheel Khan
我在想是否有一种可以编程计算最佳批处理大小的方法,该方法基于图像大小。当然,你可以使用启发式方法,但这种方法在不同的机器上无法良好地移植。或者在另一个线程中使用类似于您的测试装置进行运行时调整。 - Raheel Khan
1
嗨 @RaheelKhan - 很高兴你觉得它有用!是的,试图避免 unsafe 调用是其中一部分;另一部分是尝试减少需要获取/锁定位图的长度。至于最佳批处理大小?我只能猜测,但我想最佳批处理大小、像素扫描线的大小(即 4 字节 * 宽度,或 ~stride),以及 L1/L2/(L3) 缓存的大小之间应该存在关系。 - JerKimball

1
创建线程会有相当大的开销。虽然执行速度可能比单线程版本快得多,但完成速度太快,无法弥补这个初始开销。
如果你每帧都这样做,那只会拖慢你的速度。
然而,如果你手动创建线程池,手动分配工作,并在每一帧重用线程,你可能会发现到第二或第三帧时,你的代码超过了单线程版本。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接