速率限制 - 不正确的限制

4

我有一个RabbitMQ队列,里面填满了成千上万条消息。我需要我的消费者每秒消费1条消息,因此我使用Polly实现了一个速率限制策略。我的配置如下:

public static IAsyncPolicy GetPolicy(int mps)
{
    if (mps <= 0)
    {
        throw new ArgumentOutOfRangeException(nameof(mps));
    }
    
    return Policy
        .HandleResult<HttpResponseMessage>(result => {
            return result.StatusCode == System.Net.HttpStatusCode.TooManyRequests;
        })
        .Or<Polly.RateLimit.RateLimitRejectedException>()
        .WaitAndRetryForeverAsync((retryNum, context) => {
            Console.WriteLine($"Retrying. Num: {retryNum}");
            return TimeSpan.FromSeconds(1);
        }).WrapAsync(
            Policy.RateLimitAsync(mps, TimeSpan.FromSeconds(1)));
}

其中mps为1。

我注意到以下情况:

  • 一开始,队列中有50个消息在1秒内被消耗掉了,速率限制器似乎没有起作用
  • 接着,每秒只消耗一个消息,WaitAndRetryForeverAsync被执行了多次(十几次)

如果我将mps设置为50,则会发生以下情况:

  • 一开始立即消耗50条消息
  • 然后每秒只消耗20条消息(而不是预期的50条)

Policy.RateLimitAsync调用是否存在bug?
我是否操作有误?


你想通过WaitAndRetryForeverAsync实现什么? - Peter Csala
你的消费者是单线程还是多线程? - Peter Csala
我不知道。这是一个用于RabbitMQ队列的标准“EasyNetQ”消费者。我认为它是多线程的。(我几乎确定它是) - Katia S.
好的,那么你希望达到什么目标?是在每个线程中发送 mps 条信息还是在每个“消费者组”中发送 mps 条信息?您是否希望在消费者线程之间共享此策略? - Peter Csala
我需要我的消费者每秒监听“mps”消息。但它没有。我告诉它监听50个/秒,但实际上它只以20个/秒的速率监听。 - Katia S.
显示剩余2条评论
2个回答

3

我必须在这里强调,写作时速率限制策略被认为是非常新的。它自7.2.3版本开始提供,这是当前稳定版本。因此,它不像其他策略一样成熟。


根据它的文档,我认为它真正的工作原理不清楚。

让我通过一个简单的例子来说明我的意思。

var localQueue = new Queue<int>();
for (int i = 0; i < 1000; i++)
{
    localQueue.Enqueue(i);
}

RateLimitPolicy rateLimiter = Policy
    .RateLimit(20, TimeSpan.FromSeconds(1));

while (localQueue.TryPeek(out _))
{
    rateLimiter.Execute(() =>
    {
        Console.WriteLine(localQueue.Dequeue());
        Thread.Sleep(10);
    });
}

如果您运行此程序,它将打印01,然后崩溃并显示RateLimitRejectedException
  • 为什么?
  • 为什么不打印前20个数字,然后再崩溃?
答案是该策略被定义为可以防止滥用。我们在两个操作之间只等待10毫秒。从策略的角度来看,这被视为一种滥用
因此,如果没有这种滥用预防,我们将在200毫秒内消耗允许的带宽,并且在剩余的800毫秒内无法执行任何进一步的操作。

将睡眠时间更改为49

结果会更或多或少相同
  • 它可能能够达到除1以外的其他数字,但肯定达不到20

将睡眠时间更改为50

它可以愉快地消耗整个队列而永远不会停止
为什么?因为1000毫秒/ 20允许执行= 50毫秒令牌
这意味着您的允许执行在时间上分布均匀

允许突发

让我们来试试突发模式。

将maxBurst设置为2,sleep设置为49。

RateLimitPolicy rateLimiter = Policy
    .RateLimit(20, TimeSpan.FromSeconds(1), 2);

while (localQueue.TryPeek(out _))
{
    rateLimiter.Execute(() =>
    {
        Console.WriteLine(localQueue.Dequeue());
        Thread.Sleep(49);
    });
}

在这种模式下,异常将在10-20之间抛出。(多次运行它)
让我们将 maxBurst 改为4。它将在20-60之间崩溃...

结论

因此,速率限制器并不像你可能期望的那样工作。它允许平均分布的流量。


2
我找到了解决我的问题的答案(多亏了Polly's GitHub)。正确的配置如下:
public static IAsyncPolicy<HttpResponseMessage> GetPolicy(int mps)
{
    if (mps <= 0)
    {
        throw new ArgumentOutOfRangeException(nameof(mps));
    }

    IAsyncPolicy<HttpResponseMessage> limit = Policy
        .RateLimitAsync(mps, TimeSpan.FromSeconds(1), mps,
            (retryAfter, context) => {
                var response = new HttpResponseMessage(System.Net.HttpStatusCode.TooManyRequests);
                response.Headers.Add("Retry-After", retryAfter.Milliseconds.ToString());
                return response;
            });

    IAsyncPolicy<HttpResponseMessage> retry = Policy
        .HandleResult<HttpResponseMessage>(result => result.StatusCode == System.Net.HttpStatusCode.TooManyRequests)
        .Or<Polly.RateLimit.RateLimitRejectedException>()
        .WaitAndRetryForeverAsync((retryNum) => {
            return TimeSpan.FromSeconds(1);
        });

    var resilienceStrategy = Policy.WrapAsync(retry, limit);

    return resilienceStrategy;
}

1
根据MDN的说明,Retry-After应该使用秒而不是毫秒。 - Enes Sadık Özbek
你应该读取 retryafter 属性,而不是在重试策略中返回 TimeSpan.FromSeconds(1)。 - Luis Lavieri

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接