如何并行处理队列并保持顺序

3
我正在开发一个用于视频分析的C#程序,基本操作如下:
  1. 获取视频帧
  2. 处理帧
  3. 在帧上显示注释
  4. 将结果保存到硬盘
  5. 重复以上步骤
如果步骤2-4需要的时间超过了一帧的时间(30毫秒),则程序会丢失一些帧,这并不是我想要的。
通过并行化可以降低平均时间,从而减少丢帧的可能性。然而,输出必须与输入的顺序相同。例如,带注释和结果的帧必须按正确顺序出现。
目前我提高程序速度的方法是将处理过程分成不同的部分,并将它们分别放入BlockingCollection中,例如使用不同的Tasks进行以下操作:
  1. 将帧添加到BC#1
  2. 从BC#1中获取帧,处理后将结果放入BC#2
  3. 从BC#2中取得结果,添加注释并显示,保存到磁盘。
与其将处理过程的每一部分拆分成不同的线程,我希望有一个线程池,每个线程都能完整地处理一帧。问题是如何保持结果的顺序。
例如,我可以让每个线程将结果放入BlockingCollection中,但如果第二帧的处理比第一帧先完成,则结果将是无序的。
  • 有没有想法如何在C#中实现这一点?
  • 有哪些C#类或库可以使用?
  • 我应该使用什么来创建线程池以最大化性能?
更新: 由于该程序用于控制某些仪器,因此结果应该接近实时地显示给用户。因此,在程序运行后进行排序可能不可行。

MassTransit nuget包含有InMemory消息队列系统。您可以通过参数发布Message,其中一个消费者将接收该消息并执行相同的过程。但是,消费操作按顺序执行,返回结果不关心顺序。如果您逐个消费,则结果不会失去顺序属性。 - Adem Catamak
你可以为每个帧使用一个包装类来指示顺序,然后在保存到磁盘之前立即对整个集合进行排序吗? - SpruceMoose
@CalC 应该按照它们的到达顺序进行处理,因为处理后的结果应该实时显示在用户界面上。 - geometrikal
你做到哪一步了?你尝试过使用System.Reactive,也被称为反应式扩展吗?我正在使用它并取得了良好的结果。 - heltonbiker
1个回答

1

最简单的方法可能是使用PLinQ中的.AsParallel()

var processedFrames = frames.AsParallel()
                            .AsOrdered()
                            .Select(Process)
                            .ToList();

假设有一个方法。
ProcessedFrame Process(UnprocessedFrame original)
{
    // ...
}

这将处理整个序列,并返回一个有序的结果帧序列。

这很有趣,我不知道PLinQ。然而,帧应该按照它们进入的顺序进行处理,因为结果也需要实时显示给用户。 - geometrikal
你不需要应用AsOrdered吗?我认为默认情况下,AsParallel不保留顺序。 - Evk

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接