实际上,为了正确处理速率限制和超时问题,您需要处理一些其他异常 - 尤其是在使用较新的MongoDB V3.6端点(而不是较旧的V3.2端点)时。
- 对于V3.2端点:您关心的两个异常是
MongoCommandException
和MongoExecutionTimeoutException
。 MongoCommandException
在其Result
字段中包含一个BsonDocument
属性。此文档包含一个StatusCode
,您可以使用它来检测429。 也就是说,根据我的测试,我还发现我必须处理Http服务不可用(1)和操作超出时间限制(50)的状态码。
- 对于V3.6端点:您可能还想处理
MongoWriteException
和MongoBulkWriteException
。这些异常在异常消息中包括RetryAfterMs =
值(不总是)。不幸的是,这个值似乎并没有直接通过类属性公开 - 最可能是因为这是CosmosDB特定功能,因此不适用于MongoDB驱动程序定义的异常。
下面的代码实现了.NET Standard 2.0,并应该为您提供了一个很好的起点。根据您的情况和测试,您肯定需要调整一些常量。
public static class Policies
{
public const int HttpThrottleErrorCode = 429;
public const int HttpServiceIsUnavailable = 1;
public const int HttpOperationExceededTimeLimit = 50;
public const int RateLimitCode = 16500;
public const string RetryAfterToken = "RetryAfterMs=";
public const int MaxRetries = 10;
public static readonly int RetryAfterTokenLength = RetryAfterToken.Length;
private static readonly Random JitterSeed = new Random();
public static readonly IAsyncPolicy NoPolicy = Policy.NoOpAsync();
public static Func<int, TimeSpan> SleepDurationProviderWithJitter(double exponentialBackoffInSeconds, int maxBackoffTimeInSeconds) => retryAttempt
=> TimeSpan.FromSeconds(Math.Min(Math.Pow(exponentialBackoffInSeconds, retryAttempt), maxBackoffTimeInSeconds))
+ TimeSpan.FromMilliseconds(JitterSeed.Next(0, 1000));
public static readonly Func<int, TimeSpan> DefaultSleepDurationProviderWithJitter =
SleepDurationProviderWithJitter(1.5, 23);
public static readonly IAsyncPolicy MongoCommandExceptionPolicy = Policy
.Handle<MongoCommandException>(e =>
{
if (e.Code != RateLimitCode || !(e.Result is BsonDocument bsonDocument))
{
return false;
}
if (bsonDocument.TryGetValue("StatusCode", out var statusCode) && statusCode.IsInt32)
{
switch (statusCode.AsInt32)
{
case HttpThrottleErrorCode:
case HttpServiceIsUnavailable:
case HttpOperationExceededTimeLimit:
return true;
default:
return false;
}
}
if (bsonDocument.TryGetValue("IsValid", out var isValid) && isValid.IsBoolean)
{
return isValid.AsBoolean;
}
return true;
})
.WaitAndRetryAsync(
retryCount: MaxRetries,
DefaultSleepDurationProviderWithJitter
);
public static readonly IAsyncPolicy ExecutionTimeoutPolicy = Policy
.Handle<MongoExecutionTimeoutException>(e =>
e.Code == RateLimitCode || e.Code == HttpOperationExceededTimeLimit
)
.WaitAndRetryAsync(
retryCount: MaxRetries,
DefaultSleepDurationProviderWithJitter
);
public static readonly IAsyncPolicy MongoWriteExceptionPolicy = Policy
.Handle<MongoWriteException>(e =>
{
return e.WriteError?.Code == RateLimitCode
|| (e.InnerException is MongoBulkWriteException bulkException &&
bulkException.WriteErrors.Any(error => error.Code == RateLimitCode));
})
.WaitAndRetryAsync(
retryCount: MaxRetries,
sleepDurationProvider: (retryAttempt, e, ctx) =>
{
var timeToWaitInMs = ExtractTimeToWait(e.Message);
if (!timeToWaitInMs.HasValue && e.InnerException != null)
{
timeToWaitInMs = ExtractTimeToWait(e.InnerException.Message);
}
return timeToWaitInMs ?? DefaultSleepDurationProviderWithJitter(retryAttempt);
},
onRetryAsync: (e, ts, i, ctx) => Task.CompletedTask
);
public static readonly IAsyncPolicy MongoBulkWriteExceptionPolicy = Policy
.Handle<MongoBulkWriteException>(e =>
{
return e.WriteErrors.Any(error => error.Code == RateLimitCode);
})
.WaitAndRetryAsync(
retryCount: MaxRetries,
sleepDurationProvider: (retryAttempt, e, ctx) =>
{
var timeToWaitInMs = ExtractTimeToWait(e.Message);
return timeToWaitInMs ?? DefaultSleepDurationProviderWithJitter(retryAttempt);
},
onRetryAsync: (e, ts, i, ctx) => Task.CompletedTask
);
private static TimeSpan? ExtractTimeToWait(string messageToParse)
{
var retryPos = messageToParse.IndexOf(RetryAfterToken, StringComparison.OrdinalIgnoreCase);
if (retryPos >= 0)
{
retryPos += RetryAfterTokenLength;
var endPos = messageToParse.IndexOf(',', retryPos);
if (endPos > 0)
{
var timeToWaitInMsString = messageToParse.Substring(retryPos, endPos - retryPos);
if (Int32.TryParse(timeToWaitInMsString, out int timeToWaitInMs))
{
return TimeSpan.FromMilliseconds(timeToWaitInMs)
+ TimeSpan.FromMilliseconds(JitterSeed.Next(100, 1000));
}
}
}
return default;
}
public static readonly IAsyncPolicy DefaultPolicyForMongo3_2 = Policy.WrapAsync(MongoCommandExceptionPolicy, ExecutionTimeoutPolicy);
public static readonly IAsyncPolicy DefaultPolicyForMongo3_6 = Policy.WrapAsync(MongoCommandExceptionPolicy, ExecutionTimeoutPolicy, MongoWriteExceptionPolicy, MongoBulkWriteExceptionPolicy);
}
public static IAsyncPolicy DefaultPolicy { get; set; } = Policies.DefaultPolicyForMongo3_6;