限制异步方法的并行性,不阻塞线程池线程

11
我有一个异步方法RequestInternalAsync()用于向外部资源发出请求,想要编写一个包装器方法来通过减少并行度限制该方法的并发异步请求数量。
首先想到的选项是使用具有有限并发性的 TaskScheduler(如LimitedConcurrencyLevelTaskSchedulerConcurrentExclusiveSchedulerPair等)。
但是要使用自定义调度程序运行任务,必须使用只接受Action <>TaskFactory启动任务, 即不能为仅等待内部方法执行而不阻塞额外线程。
第二个选项是SemaphoreSlim,它可以完成工作,但在这种情况下,我正在实现自己的节流控制,而不是使用TaskScheduler
static void Main(string[] args)
{
    // TESTING 1

    var task1 = Task.WhenAll(Enumerable.Range(1, 10).Select(i => RequestAsyncBad()));

    task1.Wait();

    // TESTING 2

    var task2 = Task.WhenAll(Enumerable.Range(1, 10).Select(i => RequestAsyncBetter()));

    task2.Wait();
}

private static Task RequestInternalAsync()
{
    return Task.Delay(500);
}

解决方案 #1:

private static readonly ConcurrentExclusiveSchedulerPair _concurrentPair
    = new ConcurrentExclusiveSchedulerPair(TaskScheduler.Default, 2);

public static Task RequestAsyncBad()
{
    // Dumb: Because TaskFactory doesn't provide an overload which accepts another task, only action.
    // As result, we blocking a thread to just wait until the inner task finishes.

    return Task.Factory.StartNew(() => RequestInternalAsync().Wait(),
        CancellationToken.None, TaskCreationOptions.DenyChildAttach, _concurrentPair.ConcurrentScheduler);
}

解决方案 #2(更好):

private static readonly SemaphoreSlim _semaphore = new SemaphoreSlim(2);

public static async Task RequestAsyncBetter()
{
    // Here we don't waste thread-pool thread on waiting for a completion of inner task,
    // but instead of using TaskScheduler, implementing a hand-made stuff with semaphore. 

    await _semaphore.WaitAsync().ConfigureAwait(false);

    try
    {
        await RequestInternalAsync();
    }
    finally
    {
        _semaphore.Release();
    }
}

如何更优雅地实现此功能?

  • 重用TPL的标准Task API和TaskScheduler
  • 而不会阻塞额外的线程
2个回答

8

TaskScheduler 只对 CPU-bound 工作有用。你的工作没有使用线程,而是使用IO完成端口。这意味着你的网络调用根本不会占用任何线程。无法在 IO 操作中涉及 TaskScheduler

如果你还不确定:.NET 中的 Async IO 基于使用 TaskCompletionSource,它与线程或调度程序没有任何绑定。

SemaphoreSlim 是正确的方法。或者,创建一个 ServicePoint 并设置其最大并发数。仅适用于 HTTP 请求。

请注意,如果你发现自己在使用 Wait,你应该犹豫并思考你正在做什么。通常情况下,这是一个错误。


谢谢!那么,我的理解是,TaskScheduler只负责任务的启动时间和位置,而不设计控制已启动任务的生命周期? - Sergey Kostrukov
2
我不会这么说。需要注意的一点是,有两种任务类型(这容易让人混淆!):CPU工作(由最终运行的委托支持的任务)和由TaskCompletionSource支持的“其他工作”。只有第一种情况使用TaskScheduler(并且它总是使用一个)。因此,我认为TaskScheduler负责通过在适当的时间和地点运行委托来完成由委托支持的任务。 - usr
无法为TCS提供调度程序,也从未有TPL要求执行基于TCS的任务。我相信理解这一点在使用TPL时会非常有启发性。 - usr
+1. 我最近开始使用“委托任务”和“承诺任务”这些术语来区分不同类型的任务。并不是说每个人都必须使用这种术语,但在讨论差异时可能会有所帮助。 - Stephen Cleary
@StephenCleary 对我来说问题在于我没有找到任何信息来源,可以解释TaskScheduler的设计目的以及为什么TaskFactory有意只接受委托。感谢您的评论。我真的很喜欢阅读您的博客,并期待着Kindle版的问世 :) - Sergey Kostrukov

0
public static async Task RequestAsyncCool()
{
    await Task.Factory.StartNew(async () => {
            await RequestInternalAsync();
        },
        CancellationToken.None, 
        TaskCreationOptions.DenyChildAttach, 
        TaskScheduler.Current);
}

你真的不应该为任务使用Wait。请参考https://www.google.com/search?q=task+wait+deadlock

你有没有研究过TPL DataFlow?它可能正是你需要的东西...


这里的问题是,如果我将一个异步方法放到Task.Factory.StartNew中,并且该方法接受Func<Task>,并通过指定一个自定义的TaskScheduler来限制并发数量,则并发限制只对第一部分异步方法起作用,在第一个2个方法完成之前不会等待再开始新的方法(2是我的并行限制)。这就是为什么我不得不在那里加上Wait()的原因,也是为什么我不喜欢这样做并提出了这个问题 :) - Sergey Kostrukov

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接