提示:各位,这个问题不是关于如何实现重试策略的,而是关于正确完成TPL Dataflow块的。
这个问题主要是我之前的一个问题Retry policy within ITargetBlock的延续。对于这个问题的答案是@svick的聪明解决方案,利用了TransformBlock
(源)和TransformManyBlock
(目标)。唯一剩下的问题是以正确的方式完成这个块:等待所有重试都完成,然后完成目标块。以下是我的代码(只是片段,请勿过多关注非线程安全的retries
集合):
var retries = new HashSet<RetryingMessage<TInput>>();
TransformManyBlock<RetryableMessage<TInput>, TOutput> target = null;
target = new TransformManyBlock<RetryableMessage<TInput>, TOutput>(
async message =>
{
try
{
var result = new[] { await transform(message.Data) };
retries.Remove(message);
return result;
}
catch (Exception ex)
{
message.Exceptions.Add(ex);
if (message.RetriesRemaining == 0)
{
if (failureHandler != null)
failureHandler(message.Exceptions);
retries.Remove(message);
}
else
{
retries.Add(message);
message.RetriesRemaining--;
Task.Delay(retryDelay)
.ContinueWith(_ => target.Post(message));
}
return null;
}
}, dataflowBlockOptions);
source.LinkTo(target);
source.Completion.ContinueWith(async _ =>
{
while (target.InputCount > 0 || retries.Any())
await Task.Delay(100);
target.Complete();
});
这个想法是进行一些轮询并验证是否仍有待处理的消息,并且没有需要重试的消息。但在这种解决方案中,我不喜欢轮询的想法。
是的,我可以将添加/删除重试逻辑封装到单独的类中,甚至可以在重试集合变为空时执行某些操作,但如何处理target.InputCount > 0
条件呢?没有这样的回调函数会在块中没有待处理消息时被调用,因此似乎使用带有小延迟的循环验证target.ItemCount
是唯一的选择。
有人知道更聪明的方法吗?
try
块中使用重试逻辑并不是一个好主意,因为你不知道错误类型以及这种错误是否是瞬态的。 - Oleks