这是.NET 6的一个变体
Parallel.ForEachAsync
API,它允许动态配置并行度。它与原生API共享相同的参数和行为,唯一的区别是它接受一个派生版本的
ParallelOptions
作为参数(
DynamicParallelOptions
)。这个类有一个额外的属性
DegreeOfParallelism
。改变这个属性会迅速适应当前活动的并行度。
这个实现是基于对
Parallel.ForEachAsync
API的源序列进行限流的想法。API本身被配置为
DegreeOfParallelism
的最大预期值。实际的并行度通过限制循环对源元素的自由访问来有效地进行限制。每当处理另一个元素时,就会向前传播一个元素。限流本身是使用一个无界的
SemaphoreSlim
来执行的。通过调用信号量的
Release
/
WaitAsync
方法来改变最大并行度。
public static Task DynamicParallelForEachAsync<TSource>(
IAsyncEnumerable<TSource> source,
DynamicParallelOptions options,
Func<TSource, CancellationToken, ValueTask> body)
{
ArgumentNullException.ThrowIfNull(source);
ArgumentNullException.ThrowIfNull(options);
ArgumentNullException.ThrowIfNull(body);
SemaphoreSlim throttler = new(options.DegreeOfParallelism);
options.DegreeOfParallelismChangedDelta += Options_ChangedDelta;
void Options_ChangedDelta(object sender, int delta)
{
if (delta > 0)
throttler.Release(delta);
else
for (int i = delta; i < 0; i++) throttler.WaitAsync();
}
async IAsyncEnumerable<TSource> GetThrottledSource(
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
IAsyncEnumerator<TSource> enumerator = source.GetAsyncEnumerator(
cancellationToken);
await using (enumerator.ConfigureAwait(false))
{
while (true)
{
await throttler.WaitAsync().ConfigureAwait(false);
if (!await enumerator.MoveNextAsync().ConfigureAwait(false)) break;
yield return enumerator.Current;
}
}
}
return Parallel.ForEachAsync(GetThrottledSource(), options, async (item, ct) =>
{
try { await body(item, ct).ConfigureAwait(false); }
finally { throttler.Release(); }
}).ContinueWith(t =>
{
options.DegreeOfParallelismChangedDelta -= Options_ChangedDelta;
return t;
}, default, TaskContinuationOptions.DenyChildAttach |
TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default).Unwrap();
}
public class DynamicParallelOptions : ParallelOptions
{
private int _degreeOfParallelism;
public event EventHandler<int> DegreeOfParallelismChangedDelta;
public DynamicParallelOptions(int maxDegreeOfParallelism)
{
base.MaxDegreeOfParallelism = maxDegreeOfParallelism;
_degreeOfParallelism = Environment.ProcessorCount;
}
public int DegreeOfParallelism
{
get { return _degreeOfParallelism; }
set
{
if (value < 1) throw new ArgumentOutOfRangeException();
if (value == _degreeOfParallelism) return;
int delta = value - _degreeOfParallelism;
DegreeOfParallelismChangedDelta?.Invoke(this, delta);
_degreeOfParallelism = value;
}
}
}
DynamicParallelOptions.DegreeOfParallelism
属性不是线程安全的。假定控制并行度将由单个线程执行,或者至少操作将被同步。
使用示例,以 Channel<T>
作为并行循环的源:
Channel<int> channel = Channel.CreateUnbounded<int>();
DynamicParallelOptions options = new(maxDegreeOfParallelism: 50)
{
DegreeOfParallelism = 2
};
await DynamicParallelForEachAsync(
channel.Reader.ReadAllAsync(), options, async (item, ct) =>
{
Console.WriteLine($"Processing #{item}");
await Task.Delay(1000, ct);
});
// Push values to the channel from any thread
channel.Writer.TryWrite(1);
channel.Writer.TryWrite(2);
channel.Writer.TryWrite(3);
channel.Writer.Complete();
// Set the DegreeOfParallelism to a positive value at any time from a single thread
options.DegreeOfParallelism = 5;
一些使用同步的重载函数,包括同步的source或同步的body:
public static Task DynamicParallelForEachAsync<TSource>(
IEnumerable<TSource> source,
DynamicParallelOptions options,
Func<TSource, CancellationToken, ValueTask> body)
{
ArgumentNullException.ThrowIfNull(source);
#pragma warning disable CS1998
async IAsyncEnumerable<TSource> GetSource()
{ foreach (TSource item in source) yield return item; }
#pragma warning restore CS1998
return DynamicParallelForEachAsync(GetSource(), options, body);
}
public static void DynamicParallelForEach<TSource>(
IEnumerable<TSource> source,
DynamicParallelOptions options,
Action<TSource, CancellationToken> body)
{
ArgumentNullException.ThrowIfNull(body);
DynamicParallelForEachAsync(source, options, (item, ct) =>
{
body(item, ct); return ValueTask.CompletedTask;
}).Wait();
}
DegreeOfParallelism
的默认值是
Environment.ProcessorCount
。允许将
DegreeOfParallelism
设置为大于
MaxDegreeOfParallelism
的值,尽管它没有任何效果。
MaxDegreeOfParallelism
表示有效并行度的硬上限。建议使用合理的值配置
MaxDegreeOfParallelism
。如果将其设置得太高,例如
Int32.MaxValue
,可能会增加整个操作的内存和CPU开销,特别是如果
source
包含大量元素。