限制并发异步请求的循环节流。

6

我正在处理对Web API的大量请求。我尝试使用async来加速这个过程,但是我无法对连接进行限速,以便每秒不发送超过10个请求。我正在使用信号量进行限速,但是由于有嵌套循环,所以我不确定它在这种情况下是否有效。

基本上,我正在获取一个模型列表,每个模型内部都有一个天数列表。我需要为每个模型中的每一天发出一个请求。天数的数量可以从1到大约50,99%的时间只会是1。因此,我想要异步处理每个模型,因为大约会有3000个模型,但是在需要完成多个天数的情况下,我想要异步处理每一天。我需要保持在每秒10个请求以下,因此我认为最好的方法是在整个操作上设置请求限制为10。是否有一个地方可以放置信号量,以限制整个链的连接?

每个单独的请求还必须获取2个不同数据的请求,并且此API目前不支持任何批处理。

我对C#和async以及WebRequests / HttpClient都比较新,因此任何帮助都将不胜感激。我尝试在这里添加所有相关代码。如果您需要其他信息,请告诉我。

public static async Task GetWeatherDataAsync(List<Model> models)
{
    SemaphoreSlim semaphore = new SemaphoreSlim(10);
    var taskList = new List<Task<ComparisonModel>>();

    foreach (var x in models)
    {
        await semaphore.WaitAsync();
        taskList.Add(CompDaysAsync(x));
    }

    try
    {
        await Task.WhenAll(taskList.ToArray());
    }
    catch (Exception e) { }
    finally
    {
        semaphore.Release();
    }
}

public static async Task<Models> CompDaysAsync(Model model)
{
    var httpClient = new HttpClient();
    httpClient.DefaultRequestHeaders.Authorization = new 
                Headers.AuthenticationHeaderValue("Token","xxxxxxxx");
    httpClient.Timeout = TimeSpan.FromMinutes(5);
    var taskList = new List<Task<Models.DateTemp>>();

    foreach (var item in model.list)
    {
        taskList.Add(WeatherAPI.GetResponseForDayAsync(item, 
            httpClient, Latitude, Longitude));
    }
    httpClient.Dispose();
    try
    {
        await Task.WhenAll(taskList.ToArray());
    }
    catch (Exception e) { }

    return model;
}

public static async Task<DateTemp> GetResponseForDayAsync(DateTemp date, HttpClient httpClient, decimal? Latitude, decimal? Longitude)
{
    var response = await httpClient.GetStreamAsync(request1);
    StreamReader myStreamReader = new StreamReader(response);
    string responseData = myStreamReader.ReadToEnd();
    double[] data = new double[2];
    if (responseData != "[[null, null]]")
    {
        data = Array.ConvertAll(responseData.Replace("[", "").Replace("]", "").Split(','), double.Parse);
    }
    else { data = null; };

    double precipData = 0;
    var response2 = await httpClient.GetStreamAsync(request2);
    StreamReader myStreamReader2 = new StreamReader(response2);
    string responseData2 = myStreamReader2.ReadToEnd();
    if (responseData2 != null && responseData2 != "[null]" && responseData2 != "[0.0]")
    {
        precipData = double.Parse(responseData2.Replace("[", "").Replace("]", ""));
    }
    date.Precip = precipData;

    if (data != null)
    {
        date.minTemp = data[0];
        date.maxTemp = data[1];
    }
    return date;
}

我以前做过类似的事情,只需使用 Parallel.ForEach 即可。带有 ParallelOptions 的重载允许您设置 MaxDegreeOfParallelism,但您需要首先使用 Enumerable.SelectMany 来展开每个模型中的天数。 - Biscuits
那么,如果我对我的集合使用SelectMany,那么我会得到一个包含所有日期的大列表,但它们并没有与模型本身相关联,是吗?它是否天然地将它们关联起来,还是我需要做一些特殊的事情来确保它们之间有关联? - DevDevDev
有一个SelectMany的重载函数,它允许您指定一个结果选择器来将父对象和元素中的信息投影到一个新对象中。使用Linq语法可以更轻松地完成这项工作。请记住,Parallel.ForEach允许您异步运行操作(或任务),并且您仍然可以在每次迭代中等待它们完成。 - Biscuits
看起来这个 Parallel.ForEachAsync 的实现更适合使用 await 来等待你的 async 方法。https://blogs.msdn.microsoft.com/pfxteam/2012/03/05/implementing-a-simple-foreachasync-part-2/ - Biscuits
1个回答

3

我觉得你完全不理解SemaphoreSlim的作用。

  1. Your semaphore is a method-level based local variable, so every GetWeatherDataAsync method call will spawn 10 calls to your API, without waiting for other client.
  2. Moreover, your code will deadlock, if models.Count > 10, because you waiting for semaphore in each iteration, those requests are being stacked, and for 11th your thread will hang forever, as you aren't releasing semaphore:

    var semaphore = new SemaphoreSlim(10);
    
    foreach (var item in Enumerable.Range(0, 15))
    {
        // will stop after 9
        await semaphore.WaitAsync();
        Console.WriteLine(item);
    }
    
您真正需要做的是将信号量移到实例级别(甚至带有static关键字的类型级别),并在GetWeatherDataAsync内等待它,在finally块中放置Release
至于Parallel.Foreach - 在这种情况下,您不应该使用它,因为它不知道async方法(它是在async/await 之前引入的),而您的方法看起来不像是CPU绑定的。

你关于使用Parallel.ForEach的观点是错误的。一个框架库怎么会在它变得有用之前依赖于C#语言特性呢? - Biscuits
1
哦,我明白你的意思了。所以你不能在每个迭代中使用 await - Biscuits
这取决于你的代码。如果你每个请求创建一个实例,你需要一个类型级别的信号量。 - VMAtm
我上面发布的示例是静态的。我可以将它们制作成实例方法。我相信只需要一个实例来处理所有请求。我看不出创建多个实例的理由。因此,在这种情况下,最好在实例级别上创建一个信号量,并将其传递到我的实例中,然后在我的方法中使用它,这将防止此方法中的任何并发连接达到11+? - DevDevDev
在您的示例代码中,semaphore 是一个本地实例,这意味着它将为每个线程单独创建。如果需要,请将其移出作为静态变量,然后在GetWeatherDataAsync内等待它。现在,您的信号量根本不起作用。 - VMAtm
显示剩余2条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接