如何在 .Net Core Web API 中限制并发的外部 API 调用?

5

我目前正在开发一个.net core web api项目,其中从外部web api获取数据。他们在其端口设置了25个并发速率限制器(允许进行25个并发api调用),第26个API调用将失败。

因此,我希望在我的web API项目中实现并发API速率限制器,并需要跟踪第26个API调用失败并需要重试它(可能是get或post调用)。 我的api代码中有多个get请求和post请求。

以下是我的httpservice.cs在我的web api中。

public HttpClient GetHttpClient()
{
    HttpClient client = new HttpClient
    {
        BaseAddress = new Uri(APIServer),
    };
    client.DefaultRequestHeaders.Accept.Add(new MediaTypeWithQualityHeaderValue("application/json"));
    client.DefaultRequestHeaders.Add("Authorization", ("Bearer " + Access_Token));
    return client;
}
private HttpClient Client;
public async Task<Object> Get(string apiEndpoint)
{

    Client = GetHttpClient();
    HttpResponseMessage httpResponseMessage = await Client.GetAsync(apiEndpoint);
    if (httpResponseMessage.IsSuccessStatusCode)
    {
        Object response = await httpResponseMessage.Content.ReadAsStringAsync();
        return response;
    }
    else if (httpResponseMessage.StatusCode == System.Net.HttpStatusCode.TooManyRequests)
    {
        //need to track failed calls                    
        return StatusCode(httpResponseMessage.StatusCode.GetHashCode());
    }
}

public async Task<Object> Post(string apiEndpoint, Object request)
{
    Client = GetHttpClient();
    HttpResponseMessage httpResponseMessage = await Client.PostAsJsonAsync(apiEndpoint, request);
    if (httpResponseMessage.IsSuccessStatusCode)
    {
        return await httpResponseMessage.Content.ReadAsAsync<Object>();
    }

    else if (httpResponseMessage.StatusCode == System.Net.HttpStatusCode.TooManyRequests)
    {
        //need to track
        return StatusCode(httpResponseMessage.StatusCode.GetHashCode());
    }
} 

如何在上述示例中限制并发API调用
SemaphoreSlim _semaphoregate = new SemaphoreSlim(25);
await _semaphoregate.WaitAsync();      
_semaphoregate.Release();  

这能行吗?

AspNetCoreRateLimit nuget 包在这里有用吗?它会限制上面示例中的并发吗?

请帮忙。


1
我知道的最简单的解决方案是使用类似于SemaphoreSlim或TPL数据流块的东西来实现受限并发,最大并行度。您是否已经考虑过使用这种方法? - Enrico Massone
@EnricoMassone 是的,考虑过 SemaphoreSlim,就像问题中展示的那样,但这里会起作用吗?但是如何在这里使用?对于获取和发布请求都是如何使用的? - A_developer
2
一个想法是使用Polly库,并通过适当的策略执行您的代码。您可以在这里看到一个例子。 - Theodor Zoulias
在C++中,我正在调用一个公共API,该API每秒钟有10个调用限制。 - Biagio Paruolo
1个回答

2

我所知道的限制对一段代码进行并发访问的最简单解决方案是使用SemaphoreSlim对象来实现节流机制。

您可以考虑采用下面展示的方法,但需要根据当前场景进行适当调整(以下代码仅为了向您展示一般思路而过于简化):

public class Program 
{
    private static async Task DoSomethingAsync()
    {
      // this is the code for which you want to limit the concurrent execution
    }

    // this is meant to guarantee at most 5 concurrent execution of the code in DoSomethingAsync
    private static readonly SemaphoreSlim _semaphore = new SemaphoreSlim(5); 

    // here we execute 100 calls to DoSomethingAsync, by ensuring that at most 5 calls are executed concurrently
    public static async Task Main(string[] args) 
    {
        var tasks = new List<Task>();
        
        for(int i = 0; i < 100; i++) 
        {
            tasks.Add(ThrottledDoSomethingAsync());
        }
        
        await Task.WhenAll(tasks);
    }

    private static async Task ThrottledDoSomethingAsync()
    {
      await _semaphore.WaitAsync();
      
      try
      {
        await DoSomethingAsync();
      }
      finally
      {
        _semaphore.Release();
      }
    }
}

这里有关于 SemaphoreSlim 类的文档。

如果你想要一个类似于 ForEachAsync 方法,可以考虑阅读我在此问题上的提问

如果你正在寻找一种优雅的解决方案,以使用 SemaphoreSlim 作为你的服务的节流机制,那么可以考虑为服务本身定义一个接口并使用装饰器模式。在装饰器中,你可以实现使用如上面所示的 SemaphoreSlim 的限流逻辑,同时保持服务逻辑简单和无修改在服务的核心实现中。这与你的问题没有直接关系,只是一个提示,写下你的HTTP服务的实际实现。 SemaphoreSlim 用作限流机制的核心思想是上面代码中所展示的。

最基本的代码调整如下:

public sealed class HttpService
{
    // this must be static in order to be shared between different instances
    // this code is based on a max of 25 concurrent requests to the API
    // both GET and POST requests are taken into account (they are globally capped to a maximum of 25 concurrent requests to the API)
    private static readonly SemaphoreSlim _semaphore = new SemaphoreSlim(25);

    public HttpClient GetHttpClient()
    {
        HttpClient client = new HttpClient
        {
            BaseAddress = new Uri(APIServer),
        };
        client.DefaultRequestHeaders.Accept.Add(new MediaTypeWithQualityHeaderValue("application/json"));
        client.DefaultRequestHeaders.Add("Authorization", ("Bearer " + Access_Token));
        return client;
    }

    private HttpClient Client;

    public async Task<Object> Get(string apiEndpoint)
    {

        Client = GetHttpClient();
        HttpResponseMessage httpResponseMessage = await this.ExecuteGetRequest(apiEndpoint);
        if (httpResponseMessage.IsSuccessStatusCode)
        {
            Object response = await httpResponseMessage.Content.ReadAsStringAsync();
            return response;
        }
        else if (httpResponseMessage.StatusCode == System.Net.HttpStatusCode.TooManyRequests)
        {
            //need to track failed calls                    
            return StatusCode(httpResponseMessage.StatusCode.GetHashCode());
        }
    }

    private async Task<HttpResponseMessage> ExecuteGetRequest(string url)
    {
        await _semaphore.WaitAsync();

        try
        {
            return await this.Client.GetAsync(url);
        }
        finally
        {
            _semaphore.Release();
        }
    }

    public async Task<Object> Post(string apiEndpoint, Object request)
    {
        Client = GetHttpClient();
        HttpResponseMessage httpResponseMessage = await this.ExecutePostRequest(apiEndpoint, request);
        if (httpResponseMessage.IsSuccessStatusCode)
        {
            return await httpResponseMessage.Content.ReadAsAsync<Object>();
        }

        else if (httpResponseMessage.StatusCode == System.Net.HttpStatusCode.TooManyRequests)
        {
            //need to track
            return StatusCode(httpResponseMessage.StatusCode.GetHashCode());
        }
    }

    private async Task<HttpResponseMessage> ExecutePostRequest(string url, Object request)
    {
        await _semaphore.WaitAsync();

        try
        {
            return await this.Client.PostAsJsonAsync(url, request);
        }
        finally
        {
            _semaphore.Release();
        }
    }
}

重要提示:您发布的代码每次需要执行对API的HTTP请求时都会创建一个全新的HttpClient实例。这会带来一些问题,超出了您提问的范围。我强烈建议您阅读这篇文章这篇文章


我怎样才能在我的代码中做同样的事情?这只是我的疑问。 - A_developer
1
@athuman,你在使用ASP.NET Core吗?用的是哪个版本? - Enrico Massone
1
@athuman在我的答案编辑中给了您一些额外的提示。 - Enrico Massone
感谢您的回答。我正在使用 .net core 3.1 web api。 - A_developer
我可以使用哪个客户端工厂?能给一个名字吗?如果可以,如何在那里传递身份验证令牌?在上面的例子中,get和post请求都受到并发限制。所以我需要通过this.ExecuteGetRequest传递它们吗?请帮忙。 - A_developer
以上代码限制了HTTP POST和HTTP GET请求。有一个方法用于POST,一个方法用于GET。不是吗? - Enrico Massone

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接