在.NET 6.0中,是否有可能通过限制并行执行(Parallel.ForEachAsync)的速率来避免速率限制?

3

我对编程还比较新(< 3年经验),所以我对本篇文章中的主题理解不够深刻,请多多包涵。

我的团队正在开发与第三方系统的集成,其中一个第三方终端缺乏一种有意义的方式来获取符合条件的实体列表。

我们一直在通过循环请求集合,并将每个等待调用的结果添加到列表中来获取这些实体。这样做可以正常工作,但是获取实体的时间比从其他允许我们提供ID列表来获取实体列表的终端花费的时间要长得多。

.NET 6.0引入了Parallel.ForEachAsync(),它可以让我们在并行异步执行多个可等待任务。

例如:

public async Task<List<TEntity>> GetEntitiesInParallelAsync<TEntity>(List<IRestRequest> requests) 
where TEntity : IEntity
{
    var entities = new ConcurrentBag<TEntity>();

    // Create a function that takes a RestRequest and returns the 
    // result of the request's execution, for each request
    var requestExecutionTasks = requests.Select(i => 
        new Func<Task<TEntity>>(() => GetAsync<TEntity>(i)));

    // Execute each of the functions asynchronously in parallel, 
    // and add the results to the aggregate as they come in
    await Parallel.ForEachAsync(requestExecutionTasks, new ParallelOptions
    {
        // This lets us limit the number of threads to use. -1 is unlimited
        MaxDegreeOfParallelism = -1 
    }, async (func, _) => entities.Add(await func()));

    return entities.ToList();
}

使用这段代码而不是简单的foreach循环,可以平均加快在我的测试实例上获取约30个实体所需的时间91%。这太棒了。然而,我们担心当我们在客户系统上使用它时可能会出现速率限制,该系统可能有数千个实体。我们有一个系统来检测他们的API中的“您的速率已受限制”消息,并在一秒钟左右排队请求,然后再尝试,但这不是一个好的解决方案,只是一种安全措施。
如果我们只是循环遍历请求,我们可以通过在每次迭代中执行类似于await Task.Delay(minimumDelay)的操作来限制调用。请纠正我,如果我错了,但据我所知,当在并行foreach中执行请求时,这实际上不起作用,因为它会使所有请求在执行之前等待相同的时间。是否有办法使每个单独的请求在执行之前等待一定的时间,只有当我们接近速率限制时才这样做?如果可能的话,我想这样做而不限制要使用的线程数。
编辑
我想让这个问题保持一段时间,以便更多的人可以回答。由于没有新的答案或评论被添加,我将我得到的唯一答案标记为正确。也就是说,答案提供了一种不同的方法,而不是使用Parallel.ForEachAsync。
如果我正确理解了当前的答案,那么对于我的原始问题是否可能限制Parallel.ForEachAsync的操作,答案将是:“不可能”。

2
您可以在这个问题中找到RateLimiter类。速率限制意味着您希望在任何时间段内对可以启动的操作数量进行限制。不考虑每个操作的持续时间。如果您想对同时运行的操作数量进行限制,而不是对启动的操作数量进行限制,请使用MaxDegreeOfParallelism选项。 - Theodor Zoulias
@auburg 是的,这是可能的,但我更喜欢在创建要执行的任务时配置延迟。我知道第三方API之前限制了我们的调用次数。我不记得是哪一个,但他们的限制要么是每秒60个调用,要么是每秒100个调用。 - Hyperwave
正如@TheodorZoulias所暗示的那样,Parallel.ForEachAsync对此来说有点太基础了。如果您添加了延迟,这并不会阻止foreach向线程池添加任务,它们只是都带有延迟(呕)。您需要一个“真正”的速率限制器,在达到限制时停止添加新任务。 - AndyPook
顺便提一下,使用带有无限并行性的Parallel.ForEachAsync API与一次启动所有任务并使用await Task.WhenAll(tasks)等待它们没有太大区别。可以说,使用这种更简单的方法可能更好,因为Task.WhenAll API自然地返回原始顺序的结果,并且不需要处理并发集合,如ConcurrentQueue<T>ConcurrentBag<T> - Theodor Zoulias
如果我使用MaxDegreeOfParallelism来限制调用次数,这是否意味着我需要计算系统吞吐量或平均执行时间之类的内容,以便在保持低于限制的情况下优化速率?@TheodorZoulias - Hyperwave
显示剩余2条评论
1个回答

2

我建议放弃使用Parallel.ForEachAsync方法,而是使用新的Chunk LINQ运算符,结合Task.WhenAll方法。您可以像这样每秒启动100个异步操作:

public async Task<List<TEntity>> GetEntitiesInParallelAsync<TEntity>(
    List<IRestRequest> requests) where TEntity : IEntity
{
    var tasks = new List<Task<TEntity>>();
    foreach (var chunk in requests.Chunk(100))
    {
        tasks.AddRange(chunk.Select(request => GetAsync<TEntity>(request)));
        await Task.Delay(TimeSpan.FromSeconds(1.0));
    }
    return (await Task.WhenAll(tasks)).ToList();
}

假设启动异步操作(调用GetAsync方法)所需的时间是可以忽略不计的。

这种方法的固有缺点在于,如果发生异常,则在完成所有操作之前,故障将不会传播。相比之下,Parallel.ForEachAsync方法在检测到第一个失败后立即停止调用异步委托并尽快完成。


@Hyperwave Parallel.ForEachAsync 允许限制并行度 (MaxDegreeOfParallelism),但它没有内置功能来限制速率。在下一个 .NET 版本 (.NET 7) 中,可能会提供新的 API 来提供此功能 (速率限制 API)。目前只存在自定义/第三方解决方案,如 这个 - Theodor Zoulias

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接