在执行Parallel.ForEach期间更改parallelOptions.MaxDegreeOfParallelism是否可能?

20

我正在运行一个多线程循环:

protected ParallelOptions parallelOptions = new ParallelOptions();

parallelOptions.MaxDegreeOfParallelism = 2;
Parallel.ForEach(items, parallelOptions, item =>
{
    // Loop code here
});
我想在并行循环执行期间更改parallelOptions.MaxDegreeOfParallelism,以减少或增加线程数量。
parallelOptions.MaxDegreeOfParallelism = 5;

它似乎无法增加线程。有人有任何想法吗?


你能解释一下为什么要那样做吗?如果需要的话,它将自动增加线程数,直到达到指定的最大值,或者根据需要使用更少的线程。 - Ian Mercer
1
主要是为了在使用共享服务器时,当应用程序占用过多资源时对其进行限制。我发现服务器运行达到100%,这会降低服务器上其他应用程序的响应能力。 - John Egbert
这是一个CPU密集型问题还是其他“资源”(如磁盘)的问题? - Ian Mercer
这种情况特别是CPU,但线程数量也可能会影响内存和磁盘,具体取决于foreach的内容。如果系统资源过度负荷,我希望能够更改所使用的总线程数。 - John Egbert
请参阅https://dev59.com/yXA75IYBdhLWcg3wFEsI。 - Ian Mercer
3个回答

5
尝试解决这个问题的困难在于它是一个艰巨的问题。首先,如何可靠地观察CPU和磁盘利用率?不经常对CPU进行采样会给出一个不准确的情况,而对磁盘利用率进行采样更加困难。其次,任务的粒度是多少,您可以多快地更改正在运行的任务数量。第三,事物随着时间的推移而迅速变化,因此您需要对观察结果应用某种过滤。第四,理想的线程数将取决于代码实际运行的CPU。第五,如果分配了太多线程,您将在它们之间反复切换而无法执行有用的工作。
请参见http://msdn.microsoft.com/en-us/magazine/ff960958.aspx,以了解.NET中的线程池如何处理决定使用多少线程的复杂任务的讨论。
您还可以使用反编译器并查看TPL用于分配线程和避免不必要上下文切换的代码 - 它很复杂,甚至没有考虑磁盘访问!
你可以尝试在较低优先级的线程上执行任务(创建一个运行优先级为 below-normal 的自定义 TaskScheduler 实际上非常容易)。这至少可以确保你可以在不影响系统其余部分的情况下使用 100% 的 CPU。调整线程优先级本身就充满问题,但如果这是纯后台任务,则可能很简单并可能有所帮助。
然而,当涉及相对较慢的磁盘访问时,磁盘利用率往往是其他应用程序因贪婪应用程序而受到影响的真正罪魁祸首。Windows 可以轻松地在应用程序之间公平分配 CPU,但当涉及相对较慢的磁盘访问时,情况就完全不同了。你可能需要简单地限制你的应用程序,使其不会太频繁地访问磁盘,而无需更改活动线程的数量。
你还可以查看 SetPriorityClass 作为一种通知操作系统你的进程不如系统上运行的其他应用程序重要的方法,请参见 How can I/O priority of a process be increased? 获取更多信息。但这假设你的整个进程不重要,而不仅仅是其中的一部分。

3

我不认为在调用ForEach之后改变并行度是可能的。据我了解,ForEach将确定它可以创建多少线程,创建相应数量的分区,并创建线程来操作这些分区。它没有任何时候可以说:“哦,等等,他改变了我们的资源分配,让我重新对数组进行分区和重新分配线程。”


1
是的,我同意你的理解。但是,你能否添加一个参考链接呢? - Abdul Saboor

0
这是.NET 6的一个变体Parallel.ForEachAsync API,它允许动态配置并行度。它与原生API共享相同的参数和行为,唯一的区别是它接受一个派生版本的ParallelOptions作为参数(DynamicParallelOptions)。这个类有一个额外的属性DegreeOfParallelism。改变这个属性会迅速适应当前活动的并行度。
这个实现是基于对Parallel.ForEachAsync API的源序列进行限流的想法。API本身被配置为DegreeOfParallelism的最大预期值。实际的并行度通过限制循环对源元素的自由访问来有效地进行限制。每当处理另一个元素时,就会向前传播一个元素。限流本身是使用一个无界的SemaphoreSlim来执行的。通过调用信号量的Release/WaitAsync方法来改变最大并行度。
/// <summary>
/// Executes a parallel foreach operation on an asynchronous sequence, enforcing
/// a degree of parallelism that can be dynamically changed during the execution.
/// </summary>
public static Task DynamicParallelForEachAsync<TSource>(
    IAsyncEnumerable<TSource> source,
    DynamicParallelOptions options,
    Func<TSource, CancellationToken, ValueTask> body)
{
    ArgumentNullException.ThrowIfNull(source);
    ArgumentNullException.ThrowIfNull(options);
    ArgumentNullException.ThrowIfNull(body);

    SemaphoreSlim throttler = new(options.DegreeOfParallelism);
    options.DegreeOfParallelismChangedDelta += Options_ChangedDelta;
    void Options_ChangedDelta(object sender, int delta)
    {
        if (delta > 0)
            throttler.Release(delta);
        else
            for (int i = delta; i < 0; i++) throttler.WaitAsync();
    }

    async IAsyncEnumerable<TSource> GetThrottledSource(
        [EnumeratorCancellation] CancellationToken cancellationToken = default)
    {
        IAsyncEnumerator<TSource> enumerator = source.GetAsyncEnumerator(
            cancellationToken);
        await using (enumerator.ConfigureAwait(false))
        {
            while (true)
            {
                await throttler.WaitAsync().ConfigureAwait(false);
                if (!await enumerator.MoveNextAsync().ConfigureAwait(false)) break;
                yield return enumerator.Current;
            }
        }
    }

    return Parallel.ForEachAsync(GetThrottledSource(), options, async (item, ct) =>
    {
        try { await body(item, ct).ConfigureAwait(false); }
        finally { throttler.Release(); }
    }).ContinueWith(t =>
    {
        options.DegreeOfParallelismChangedDelta -= Options_ChangedDelta;
        return t;
    }, default, TaskContinuationOptions.DenyChildAttach |
        TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default).Unwrap();
}

/// <summary>
/// Stores options that configure the DynamicParallelForEachAsync method.
/// </summary>
public class DynamicParallelOptions : ParallelOptions
{
    private int _degreeOfParallelism;

    public event EventHandler<int> DegreeOfParallelismChangedDelta;

    public DynamicParallelOptions(int maxDegreeOfParallelism)
    {
        // The native Parallel.ForEachAsync will see the base.MaxDegreeOfParallelism.
        base.MaxDegreeOfParallelism = maxDegreeOfParallelism;
        _degreeOfParallelism = Environment.ProcessorCount;
    }

    public int DegreeOfParallelism
    {
        get { return _degreeOfParallelism; }
        set
        {
            if (value < 1) throw new ArgumentOutOfRangeException();
            if (value == _degreeOfParallelism) return;
            int delta = value - _degreeOfParallelism;
            DegreeOfParallelismChangedDelta?.Invoke(this, delta);
            _degreeOfParallelism = value;
        }
    }
}

DynamicParallelOptions.DegreeOfParallelism属性不是线程安全的。假定控制并行度将由单个线程执行,或者至少操作将被同步。

使用示例,以 Channel<T> 作为并行循环的源:

Channel<int> channel = Channel.CreateUnbounded<int>();
DynamicParallelOptions options = new(maxDegreeOfParallelism: 50)
{
    DegreeOfParallelism = 2
};

await DynamicParallelForEachAsync(
    channel.Reader.ReadAllAsync(), options, async (item, ct) =>
    {
        Console.WriteLine($"Processing #{item}");
        await Task.Delay(1000, ct); // Simulate an I/O-bound operation
    });

// Push values to the channel from any thread
channel.Writer.TryWrite(1);
channel.Writer.TryWrite(2);
channel.Writer.TryWrite(3);
channel.Writer.Complete();

// Set the DegreeOfParallelism to a positive value at any time from a single thread
options.DegreeOfParallelism = 5;

一些使用同步的重载函数,包括同步的source或同步的body:
public static Task DynamicParallelForEachAsync<TSource>(
    IEnumerable<TSource> source,
    DynamicParallelOptions options,
    Func<TSource, CancellationToken, ValueTask> body)
{
    ArgumentNullException.ThrowIfNull(source);

    #pragma warning disable CS1998
    async IAsyncEnumerable<TSource> GetSource()
    { foreach (TSource item in source) yield return item; }
    #pragma warning restore CS1998

    return DynamicParallelForEachAsync(GetSource(), options, body);
}

public static void DynamicParallelForEach<TSource>(
    IEnumerable<TSource> source,
    DynamicParallelOptions options,
    Action<TSource, CancellationToken> body)
{
    ArgumentNullException.ThrowIfNull(body);
    DynamicParallelForEachAsync(source, options, (item, ct) =>
    {
        body(item, ct); return ValueTask.CompletedTask;
    }).Wait();
}
DegreeOfParallelism的默认值是Environment.ProcessorCount。允许将DegreeOfParallelism设置为大于MaxDegreeOfParallelism的值,尽管它没有任何效果。 MaxDegreeOfParallelism表示有效并行度的硬上限。建议使用合理的值配置MaxDegreeOfParallelism。如果将其设置得太高,例如Int32.MaxValue,可能会增加整个操作的内存和CPU开销,特别是如果source包含大量元素。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接