使用Parallel.ForEach并保留结果的顺序。

10
我有一个 `List`,我想将每个 `byte[]` 反序列化为 `Foo`。这个 `List` 是有序的,我想写一个并行循环,使得结果的 `List` 包含所有 `Foo`,并且顺序与原始的 `byte[]` 相同。由于列表的规模很大,进行并行操作是值得的。有没有内置的方法可以实现这个?
如果没有,有没有任何想法可以加速比起同步运行的速度?

请发布一些示例代码,记录一下,并不是让事情并行化就一定会更快更高效...不知道Parallel.For<T>是否能解决您的问题。 - adt
3
这是一个重复问题的链接:https://dev59.com/LXA65IYBdhLWcg3wxRs8为了回答这个问题,你可以使用PLINQ(AsOrdered、AsParallel)来完成工作。 - Kadelka
最好有SelectMap的并行等效版本以保留输入顺序。 - leppie
2
@adt,如果我有代码,我就不需要问了;-) 如果这有帮助的话,我可以添加一个简单的for循环? - Matt
点赞了你的评论,有道理:)) 但是看代码总是更容易看到。 - adt
显示剩余6条评论
3个回答

10

根据您提供的信息,我理解您想要一个大小与输入字节数组相等的Foo输出数组?是这样吗?

如果是这样,那么操作很简单。不必担心锁定或同步结构,这些将消耗并行化带来的所有加速效果。

相反,如果您遵守这个简单的规则,任何算法都可以在没有锁定或同步的情况下并行化:

对于每个处理的输入元素X[i],您可以从任何输入元素X[j]读取,但只能写入输出元素Y[i]

enter image description here

查找Scatter / Gather,这种操作称为gather,因为只有一个输出元素被写入。

如果您可以使用上述原则,则需要预先创建输出数组Foo [],并在输入数组上使用Parallel.For而不是ForEach。

例如:

        List<byte[]> inputArray = new List<byte[]>();
        int[] outputArray = new int[inputArray.Count];

        var waitHandle = new ManualResetEvent(false);
        int counter = 0;

        Parallel.For(0, inputArray.Count, index =>
            {
                // Pass index to for loop, do long running operation 
                // on input items
                // writing to only a single output item
                outputArray[index] = DoOperation(inputArray[index]);

                if(Interlocked.Increment(ref counter) == inputArray.Count -1)
                {
                    waitHandle.Set();
                }
            });

        waitHandler.WaitOne();

        // Optional conversion back to list if you wanted this
        var outputList = outputArray.ToList();

我将您的答案标记为所需解决方案,但最终我通过svick的帮助使用了自定义的IPropagatorBlock和TPLDataflow。虽然这个问题和数据流似乎是两个非常不同的概念,但在IPropagatorBlock中对数据块进行并行处理远远优于任何Parallel.For循环,特别是针对我的具体问题。我发现映射和完成通知的同步开销非常昂贵,而TPL Dataflow则针对像我这样的场景。但这并不否定您对我的具体问题的解决方案... - Matt
这篇文章并未涉及同步和完成通知的问题。它是关于保留顺序的Parallel.Foreach,而你提出的解决方案正好符合要求。非常感谢。 - Matt
好观点!任务并行库是一种更优越的解决方案。我很惊讶它的表现如此出色。也许它通过限制任务执行来使每个线程执行多个序列化操作?对于上述问题,您可以使用整数计数器和 Interlocked.Increment(ref counter) 进行完整通知,以测试是否已处理所有元素。然后设置一个等待句柄以继续进行。 - Dr. Andrew Burnett-Thompson

2
你可以使用一个线程安全的字典,并使用索引 int 键来存储从 foo 返回的结果,这样最终你会在字典中拥有所有按顺序排序的数据。

谢谢,我知道这一点,但听起来非常复杂。基本上,我需要另一个完整的过程来等待下一个按顺序到来的Foo,检查同步性,并按正确的顺序将项目添加到结果列表中。看起来有相当多的开销,可能会破坏并行运行的整个概念。 - Matt

0

将结果收集到数组中比收集到 List<Foo> 更容易。假设 List<byte[]> 的名称为 source,你可以这样做:

Foo[] output = new Foo[source.Count];

ParallelOptions options = new() { MaxDegreeOfParallelism = Environment.ProcessorCount };

Parallel.ForEach(source, options, (byteArray, state, index) =>
{
    output[index] = Deserialize(byteArray);
});

注意到没有任何形式的同步(lock等)。
上述代码之所以有效,是因为允许并发更新数组,只要每个线程更新其索引的独占子集¹。在完成Parallel.ForEach操作后,当前线程将看到output数组填充有完全初始化的Foo实例,无需手动插入内存屏障。TPL在任务执行结束时自动包含内存屏障(引用),而Parallel.ForEach在内部基于任务(因此有TPL缩写)。
直接在List<Foo>中收集结果更复杂,因为List<T>集合在并发写操作方面被明确文档化为不具备线程安全性。您可以使用lock语句安全地更新它,如下所示:
List<Foo> output = new(source.Count);
for (int i = 0; i < source.Count; i++) output.Add(default);

Parallel.ForEach(source, options, (byteArray, state, index) =>
{
    Foo foo = Deserialize(byteArray);
    lock (output) output[(int)index] = foo;
});

请注意,lock仅保护output列表的更新。如果Deserialize同步化,那么并行化的目的将会失败。
从.NET 8开始,可以使用高级的CollectionsMarshal.SetCount API,预先填充一个未初始化的List<T>,而无需进行循环。
List<Foo> output = new(source.Count); // Set the Capacity
CollectionsMarshal.SetCount(output, source.Count); // Set the Count

替代方案:如果你愿意从Parallel.ForEach切换到PLINQ,那就更简单了。通过使用PLINQ查询,你可以在并行操作中收集结果,而无需依赖副作用。只需在查询的末尾使用ToListToArray运算符:

List<Foo> output = source
    .AsParallel()
    .AsOrdered()
    .WithDegreeOfParallelism(Environment.ProcessorCount)
    .Select(byteArray => Deserialize(byteArray))
    .ToList();

不要忘记包括AsOrdered操作符,否则顺序将不会保留。
¹ 虽然没有明确记录,但普遍认同

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接