使用定义顺序的`Parallel.ForEach`最后一步。

5
我正在寻找一种“整洁”高效的方法来完成以下操作:先进行一个长时间的步骤1(可以并行处理),然后进行一个需要按照原始顺序进行的步骤2(如果可能,最小化在RAM中保留第一步骤数据的量),同时允许第二步骤在第一个对象的步骤1数据可用时开始,并且继续处理更多数据的步骤2。
为了更加清晰明了地表达,我需要压缩大量图片(较慢-步骤1),然后按顺序(步骤2)通过网络连接发送每张图片。在任何阶段限制RAM中准备好的压缩数据块的数量也很重要,因此例如,如果要发送1000张图片,则希望将“已完成但未发送”的图片数量限制为使用的线程/处理器数。
我已经手写了一个版本,使用了Task对象数组,但它看起来很混乱,而且我相信其他人肯定有类似的需求,所以是否有更“标准”的方法来完成这个任务呢?理想情况下,我希望有一个Parallel.ForEach的变体,其中包含两个委托-一个用于步骤1,另一个用于步骤2,并且我希望其中一个标准重载(例如包括“localFinal”参数的重载)可能会有所帮助,但事实证明这些最终阶段是“每个线程”,而不是“每个委托”。
有人能指出一个现有的优雅方法来完成这个任务吗?

3
似乎PLINQ的AsOrderedAsSequential正是设计用来解决这类问题的。你考虑过使用PLINQ吗? - Damien_The_Unbeliever
1
在我的研究中,我确实看到它出现在讨论中,但我无法看到任何机制来限制已处理但未发送数据的中间队列的大小 - 因此,在我的情况下,如果压缩运行非常快,而网络很慢,那么就没有什么可以告诉PLINQ在步骤1上减速。也许我错过了一些微妙的选项(特别是因为PLINQ对我来说非常新!) - medconn
TPL Dataflow 库是首选,因为它支持反压(带有有限容量的块)。 PLINQ库并不是作为管道基础设施而设计的,如果您尝试将其用作此类基础设施,则可能会惊讶地发现其他限制 - Theodor Zoulias
1个回答

4

您可以使用 Plinq(使用 WithDegreeOfParallelism() 限制第一阶段的并发),以及 BlockingCollection 来完成块。此外,请注意它使用 AsOrdered() 以保留原始顺序。

以下示例演示了这一点。对于您的实际应用程序,您将使用此处显示的 int 工作项替换为您的工作项类型 - 要么是文件名,要么是包含有关每个工作项的信息的类。

using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace Demo
{
    static class Program
    {
        static void Main()
        {
            int maxThreads = 4;
            int maxOutputQueueSize = 10;
            var workItems = Enumerable.Range(1, 100); // Pretend these are your files
            var outputQueue = new BlockingCollection<int>(maxOutputQueueSize);
            var worker = Task.Run(() => output(outputQueue));

            var parallelWorkItems = 
                workItems
                .AsParallel()
                .AsOrdered()
                .WithDegreeOfParallelism(maxThreads)
                .WithMergeOptions(ParallelMergeOptions.NotBuffered)
                .Select(process);

            foreach (var item in parallelWorkItems)
                outputQueue.Add(item);

            outputQueue.CompleteAdding();
            worker.Wait();

            Console.WriteLine("Done.");
        }

        static int process(int value) // Pretend that this compresses the data.
        {
            Console.WriteLine($"Worker {Thread.CurrentThread.ManagedThreadId} is processing {value}");
            Thread.Sleep(250);  // Simulate slow operation.
            return value; // Return updated work item.
        }

        static void output(BlockingCollection<int> queue)
        {
            foreach (var item in queue.GetConsumingEnumerable())
                Console.WriteLine($"Output is processing {item}");

            Console.WriteLine("Finished outputting.");
        }
    }
}

请注意,您可以通过WithDegreeOfParallelism限制输入队列处理和maxOutputQueueSize参数来限制输出队列的大小。

或者,如果您使用的是.Net 4.5或更高版本,则可以查看TPL Dataflow库,该库对这种类型的事情有很多支持。如果可以的话,我建议使用它 - 但是在这里描述它有点太多了。


很棒的答案,只是不要犯我犯过的错误,在每次迭代中创建新线程,否则列表顺序将不被尊重。 - Eibel

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接