异步操作并行化的问题已经在.NET 6中引入
Parallel.ForEachAsync
API得到解决,但是使用旧版.NET平台的人们可能仍然需要一个合适的替代品。实现一种简单的方法是使用
TPL Dataflow库中的
ActionBlock<T>
组件。该库包含在标准.NET库(.NET Core和.NET 5+)中,并且可用作.NET Framework的
NuGet包。以下是如何使用它:
public static Task Parallel_ForEachAsync<T>(ICollection<T> source,
int maxDegreeOfParallelism, Func<T, Task> action)
{
var options = new ExecutionDataflowBlockOptions();
options.MaxDegreeOfParallelism = maxDegreeOfParallelism;
var block = new ActionBlock<T>(action, options);
foreach (var item in source) block.Post(item);
block.Complete();
return block.Completion;
}
此解决方案仅适用于实例化的“source”序列,因此参数类型为“ICollection<T>”,而不是更常见的“IEnumerable<T>”。它还具有忽略任何由“action”抛出的“OperationCanceledException”的令人惊讶的行为。解决这些细微差别并尝试精确复制“Parallel.ForEachAsync”的功能和行为是可行的,但需要几乎与使用更原始的工具一样多的代码。我在
答案的第9个修订版中发布了这样的尝试。
以下是另一种实现
Parallel.ForEachAsync
方法的尝试,提供与.NET 6 API完全相同的功能,并尽可能模拟其行为。它仅使用基本的TPL工具。其思想是创建与所需的
MaxDegreeOfParallelism
相等的工作任务数量,每个任务以同步方式枚举相同的枚举器。这类似于
Parallel.ForEachAsync
内部实现的方式。不同之处在于,.NET 6 API从单个工作线程开始逐渐添加更多线程,而下面的实现从一开始就创建所有工作线程:
public static Task Parallel_ForEachAsync<T>(IEnumerable<T> source,
ParallelOptions parallelOptions,
Func<T, CancellationToken, Task> body)
{
if (source == null) throw new ArgumentNullException("source");
if (parallelOptions == null) throw new ArgumentNullException("parallelOptions");
if (body == null) throw new ArgumentNullException("body");
int dop = parallelOptions.MaxDegreeOfParallelism;
if (dop < 0) dop = Environment.ProcessorCount;
CancellationToken cancellationToken = parallelOptions.CancellationToken;
TaskScheduler scheduler = parallelOptions.TaskScheduler ?? TaskScheduler.Current;
IEnumerator<T> enumerator = source.GetEnumerator();
CancellationTokenSource cts = CancellationTokenSource
.CreateLinkedTokenSource(cancellationToken);
var semaphore = new SemaphoreSlim(1, 1);
var workerTasks = new Task[dop];
for (int i = 0; i < dop; i++)
{
workerTasks[i] = Task.Factory.StartNew(async () =>
{
try
{
while (true)
{
if (cts.IsCancellationRequested)
{
cancellationToken.ThrowIfCancellationRequested();
break;
}
T item;
await semaphore.WaitAsync();
try
{
if (!enumerator.MoveNext()) break;
item = enumerator.Current;
}
finally { semaphore.Release(); }
await body(item, cts.Token);
}
}
catch { cts.Cancel(); throw; }
}, CancellationToken.None, TaskCreationOptions.DenyChildAttach, scheduler)
.Unwrap();
}
return Task.WhenAll(workerTasks).ContinueWith(t =>
{
try { semaphore.Dispose(); cts.Dispose(); } finally { enumerator.Dispose(); }
return t;
}, CancellationToken.None, TaskContinuationOptions.DenyChildAttach |
TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default).Unwrap();
}
签名不同。body参数的类型为Func,而不是Func。这是因为value-task是一个相对较新的特性,在.NET Framework中不可用。
行为上也有所不同。该实现对
body
抛出的OperationCanceledException做出反应,以取消完成。正确的行为应该是将这些异常作为单独的错误传播,并完成为故障。修复这个小缺陷是可行的,但我更喜欢不进一步复杂化这个相对简短和可读的实现。
Parallel.ForEachAsync
API。 - Theodor Zoulias