我有一个RabbitMQ队列,里面填满了成千上万条消息。我需要我的消费者每秒消费1条消息,因此我使用Polly实现了一个速率限制策略。我的配置如下:
public static IAsyncPolicy GetPolicy(int mps)
{
if (mps <= 0)
{
throw new ArgumentOutOfRangeException(nameof(mps));
}
return Policy
.HandleResult<HttpResponseMessage>(result => {
return result.StatusCode == System.Net.HttpStatusCode.TooManyRequests;
})
.Or<Polly.RateLimit.RateLimitRejectedException>()
.WaitAndRetryForeverAsync((retryNum, context) => {
Console.WriteLine($"Retrying. Num: {retryNum}");
return TimeSpan.FromSeconds(1);
}).WrapAsync(
Policy.RateLimitAsync(mps, TimeSpan.FromSeconds(1)));
}
其中mps
为1。
我注意到以下情况:
- 一开始,队列中有50个消息在1秒内被消耗掉了,速率限制器似乎没有起作用
- 接着,每秒只消耗一个消息,
WaitAndRetryForeverAsync
被执行了多次(十几次)
如果我将mps
设置为50,则会发生以下情况:
- 一开始立即消耗50条消息
- 然后每秒只消耗20条消息(而不是预期的50条)
Policy.RateLimitAsync
调用是否存在bug?
我是否操作有误?
WaitAndRetryForeverAsync
实现什么? - Peter Csalamps
条信息还是在每个“消费者组”中发送mps
条信息?您是否希望在消费者线程之间共享此策略? - Peter Csala