使用BlockingCollection.GetConsumableEnumerable的Parallel.ForEach循环

23

为什么使用 GetConsumableEnumerable 时,Parallel.ForEach 循环会退出并出现 OperationCancelledException 异常?

//outside the function
static BlockingCollection<double> _collection = new BlockingCollection<double>();
    
    
var t = Task.Factory.StartNew(Producer);            
Parallel.ForEach(_collection.GetConsumingEnumerable(),
    item => Console.WriteLine("Processed {0}", item));
Console.WriteLine("FINISHED processing");


public static void Producer()
{
     var data = Enumerable.Range(1, 1000);
     foreach (var i in data)
     {
        _collection.Add(i);
        Console.WriteLine("Added {0}",i);
     }
                    
     Console.WriteLine("Finished adding");
     _collection.CompleteAdding();
}

我无法在.NET 7上重现OperationCancelledException行为。问题中的代码运行成功完成,没有抛出任何异常。 - Theodor Zoulias
需要注意的是,当使用BlockingCollection<T>作为并行操作的源时,建议使用配置了EnumerablePartitionerOptions.NoBuffering选项的Partitioner,如此处所示。否则,消费者可能会尝试咬下更多,导致延迟增加,并有可能死锁。 - Theodor Zoulias
2个回答

25

真正让我困惑的是,为什么在调用_collection.CompleteAdding()时Parallel.ForEach会抛出异常。 - Sam
@Sam:说实话,我不想发表意见。在那里有太多深奥的魔法,让我无法自信地说出正确的话 :) - Jon Skeet
Parallel Extensions Extras的当前URL为: https://code.msdn.microsoft.com/ParExtSamples 而且有人创建了该扩展的NuGet版本: https://www.nuget.org/packages/MSFT.ParallelExtensionsExtras/ - Reyhn
1
Can Bilgin在这篇新一点的文章中探讨了这种组合 - 链接 - MuKa

0
使用 Parallel.ForEachBlockingCollection<T> 作为源,需要进行两个特定的调整:
  1. 使用 EnumerablePartitionerOptions.NoBuffering 选项。
  2. MaxDegreeOfParallelism 指定为非 -1 的值。

正确使用示例:

Partitioner<Item> partitioner = Partitioner.Create(collection.GetConsumingEnumerable(),
    EnumerablePartitionerOptions.NoBuffering);

ParallelOptions options = new()
{
    MaxDegreeOfParallelism = Environment.ProcessorCount
};

Parallel.ForEach(partitioner, options, item =>
{
    //...
});

解释:
  1. 需要使用EnumerablePartitionerOptions.NoBuffering,否则Parallel.ForEach将不会立即处理每个消耗的项。相反,它会将项放入一个小缓冲区,并等待缓冲区达到任意大小后才处理所有项。这种行为会引入不必要的延迟,甚至在某些高级场景中可能导致死锁。
  2. 配置MaxDegreeOfParallelism是必需的,以便将ThreadPool的使用控制在合理范围内。否则,Parallel.ForEach将不断请求更多的线程,促使ThreadPool以每秒一个新线程的速度创建新线程,即使BlockingCollection<T>完全为空且并行循环处于空闲状态!有关此奇怪行为的实验演示,请参见此问题

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接