我将一个重试块实现从
类似问题的回答中转换为支持Stephen Cleary的
Try
类型作为输入和输出。方法
CreateRetryTransformBlock
返回一个
TransformBlock<Try<TInput>, Try<TOutput>>
,而方法
CreateRetryActionBlock
返回一个几乎是
ActionBlock<Try<TInput>>
的东西。
此外,还有三个选项可用,即
MaxAttemptsPerItem
,
MinimumRetryDelay
和
MaxRetriesTotal
,以及标准
执行选项。
public class RetryExecutionDataflowBlockOptions : ExecutionDataflowBlockOptions
{
public int MaxAttemptsPerItem { get; set; } = 1;
public TimeSpan MinimumRetryDelay { get; set; } = TimeSpan.Zero;
public int MaxRetriesTotal { get; set; } = -1;
}
public class RetryLimitException : Exception
{
public RetryLimitException(string message, Exception innerException)
: base(message, innerException) { }
}
public static TransformBlock<Try<TInput>, Try<TOutput>>
CreateRetryTransformBlock<TInput, TOutput>(
Func<TInput, Task<TOutput>> transform,
RetryExecutionDataflowBlockOptions dataflowBlockOptions)
{
if (transform == null) throw new ArgumentNullException(nameof(transform));
if (dataflowBlockOptions == null)
throw new ArgumentNullException(nameof(dataflowBlockOptions));
int maxAttemptsPerItem = dataflowBlockOptions.MaxAttemptsPerItem;
int maxRetriesTotal = dataflowBlockOptions.MaxRetriesTotal;
TimeSpan retryDelay = dataflowBlockOptions.MinimumRetryDelay;
if (maxAttemptsPerItem < 1) throw new ArgumentOutOfRangeException(
nameof(dataflowBlockOptions.MaxAttemptsPerItem));
if (maxRetriesTotal < -1) throw new ArgumentOutOfRangeException(
nameof(dataflowBlockOptions.MaxRetriesTotal));
if (retryDelay < TimeSpan.Zero) throw new ArgumentOutOfRangeException(
nameof(dataflowBlockOptions.MinimumRetryDelay));
var internalCTS = CancellationTokenSource
.CreateLinkedTokenSource(dataflowBlockOptions.CancellationToken);
var maxDOP = dataflowBlockOptions.MaxDegreeOfParallelism;
var taskScheduler = dataflowBlockOptions.TaskScheduler;
var exceptionsCount = 0;
SemaphoreSlim semaphore;
if (maxDOP == DataflowBlockOptions.Unbounded)
{
semaphore = new SemaphoreSlim(Int32.MaxValue);
}
else
{
semaphore = new SemaphoreSlim(maxDOP, maxDOP);
dataflowBlockOptions.MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded;
dataflowBlockOptions.TaskScheduler = new ConcurrentExclusiveSchedulerPair(
taskScheduler, maxDOP).ConcurrentScheduler;
}
var block = new TransformBlock<Try<TInput>, Try<TOutput>>(async item =>
{
if (item.IsException) return Try<TOutput>.FromException(item.Exception);
var result1 = await ProcessOnceAsync(item);
if (item.IsException || result1.IsValue) return result1;
for (int i = 2; i <= maxAttemptsPerItem; i++)
{
await Task.Delay(retryDelay, internalCTS.Token);
var result = await ProcessOnceAsync(item);
if (result.IsValue) return result;
}
return result1;
}, dataflowBlockOptions);
dataflowBlockOptions.MaxDegreeOfParallelism = maxDOP;
dataflowBlockOptions.TaskScheduler = taskScheduler;
_ = block.Completion.ContinueWith(_ => internalCTS.Dispose(),
TaskScheduler.Default);
return block;
async Task<Try<TOutput>> ProcessOnceAsync(Try<TInput> item)
{
await semaphore.WaitAsync(internalCTS.Token);
try
{
var result = await item.Map(transform);
if (item.IsValue && result.IsException)
{
ObserveNewException(result.Exception);
}
return result;
}
finally
{
semaphore.Release();
}
}
void ObserveNewException(Exception ex)
{
if (maxRetriesTotal == -1) return;
uint newCount = (uint)Interlocked.Increment(ref exceptionsCount);
if (newCount <= (uint)maxRetriesTotal) return;
if (newCount == (uint)maxRetriesTotal + 1)
{
internalCTS.Cancel();
throw new RetryLimitException($"The max retry limit " +
$"({maxRetriesTotal}) has been reached.", ex);
}
throw new OperationCanceledException();
}
}
public static ITargetBlock<Try<TInput>> CreateRetryActionBlock<TInput>(
Func<TInput, Task> action,
RetryExecutionDataflowBlockOptions dataflowBlockOptions)
{
if (action == null) throw new ArgumentNullException(nameof(action));
var block = CreateRetryTransformBlock<TInput, object>(async input =>
{
await action(input).ConfigureAwait(false); return null;
}, dataflowBlockOptions);
var nullTarget = DataflowBlock.NullTarget<Try<object>>();
block.LinkTo(nullTarget);
return block;
}
使用示例:
var downloadBlock = CreateRetryTransformBlock(async (int construct) =>
{
int result = await DownloadAsync(construct);
return result;
}, new RetryExecutionDataflowBlockOptions()
{
MaxDegreeOfParallelism = 10,
MaxAttemptsPerItem = 3,
MaxRetriesTotal = 100,
MinimumRetryDelay = TimeSpan.FromSeconds(10)
});
var processBlock = new TransformBlock<Try<int>, Try<int>>(
construct => construct.Map(async value =>
{
return await ProcessAsync(value);
}));
downloadBlock.LinkTo(processBlock,
new DataflowLinkOptions() { PropagateCompletion = true });
为了简化问题,如果一个项目已经重试了最大次数,保留的异常是第一次发生的异常。随后的异常会丢失。在大多数情况下,丢失的异常会与第一个异常相同。
注意:上述实现没有一个有效的输入队列。如果您将这个块用数百万个项目进行输入,内存使用量将会急剧增加。
Result<T>
对象(这会让你的代码变得简单得多),你可以将故障消息重定向到BufferBlocks(而不是Lists),甚至是写入日志记录器的ActionBlocks。 - Panagiotis KanavosLinkTo
接受一个谓词,该谓词可用于将消息定向发送到管道中的下一步或其他块(如 BufferBlock、日志记录块甚至是 NullBlock)。但要小心,因为任何未匹配任何谓词的消息都将留在其块的输出缓冲区中,从而实际上阻塞了管道。 - Panagiotis KanavosresultsBlock
中的消息包含故障,我应该将其重定向到某个BufferBlock吗?如果我听起来很蠢,请原谅我。这对我来说是新东西。 - niksEnsureOrdered
、BoundedCapacity
),而且也不明显如何强制执行同一项的重复尝试之间的特定延迟。但是它是可行的。 - Theodor Zoulias