实现可重试代码块的正确完成

54

提示:各位,这个问题不是关于如何实现重试策略的,而是关于正确完成TPL Dataflow块的。

这个问题主要是我之前的一个问题Retry policy within ITargetBlock的延续。对于这个问题的答案是@svick的聪明解决方案,利用了TransformBlock(源)和TransformManyBlock(目标)。唯一剩下的问题是以正确的方式完成这个块:等待所有重试都完成,然后完成目标块。以下是我的代码(只是片段,请勿过多关注非线程安全的retries集合):

var retries = new HashSet<RetryingMessage<TInput>>();

TransformManyBlock<RetryableMessage<TInput>, TOutput> target = null;
target = new TransformManyBlock<RetryableMessage<TInput>, TOutput>(
    async message =>
    {
        try
        {
            var result = new[] { await transform(message.Data) };
            retries.Remove(message);
            return result;
        }
        catch (Exception ex)
        {
            message.Exceptions.Add(ex);
            if (message.RetriesRemaining == 0)
            {
                if (failureHandler != null)
                    failureHandler(message.Exceptions);

                retries.Remove(message);
            }
            else
            {
                retries.Add(message);
                message.RetriesRemaining--;

                Task.Delay(retryDelay)
                    .ContinueWith(_ => target.Post(message));
            }
            return null;
        }
    }, dataflowBlockOptions);

source.LinkTo(target);

source.Completion.ContinueWith(async _ =>
{
    while (target.InputCount > 0 || retries.Any())
        await Task.Delay(100);

    target.Complete();
});

这个想法是进行一些轮询并验证是否仍有待处理的消息,并且没有需要重试的消息。但在这种解决方案中,我不喜欢轮询的想法。

是的,我可以将添加/删除重试逻辑封装到单独的类中,甚至可以在重试集合变为空时执行某些操作,但如何处理target.InputCount > 0条件呢?没有这样的回调函数会在块中没有待处理消息时被调用,因此似乎使用带有小延迟的循环验证target.ItemCount是唯一的选择。

有人知道更聪明的方法吗?


1
看起来ITargetBlock通过AsObserver扩展方法返回的观察者支持基于推送的通知。请参阅http://msdn.microsoft.com/en-us/library/hh160359.aspx和http://msdn.microsoft.com/en-us/library/ee850490.aspx。 - JamieSee
4
@Nullius,重试逻辑基于异常 - 在出现瞬态错误时重试。我认为在try块中使用重试逻辑并不是一个好主意,因为你不知道错误类型以及这种错误是否是瞬态的。 - Oleks
你是否愿意使用企业库的重试策略?没必要重新发明轮子,你可以直接扩展一个。 - georgiosd
@georgiosd:来自EntLib的瞬时故障处理应用程序块(或瞬时故障处理核心nuget包)无法满足我的需求-这在我的此前和本问题中有所解释。 - Oleks
@ Alex 回复:@Nullius 我认为他指的是在重试失败后才应该引发异常,因此如果您知道条件x会导致异常,则在执行操作之前进行测试,然后进入等待重试循环,只有当仍然失败时才引发异常。引发异常是昂贵的,因此只有在问题完全不可预测或需要传递有关错误的复杂信息以便正确处理调用例程时才应该这样做。 - MikeT
显示剩余3条评论
2个回答

2
也许一个ManualResetEvent可以帮你达成目的。
TransformManyBlock中添加一个公共属性。
private ManualResetEvent _signal  = new ManualResetEvent(false);
public ManualResetEvent Signal { get { return _signal; } }

这是您要的内容:
var retries = new HashSet<RetryingMessage<TInput>>();

TransformManyBlock<RetryableMessage<TInput>, TOutput> target = null;
target = new TransformManyBlock<RetryableMessage<TInput>, TOutput>(
    async message =>
    {
        try
        {
            var result = new[] { await transform(message.Data) };
            retries.Remove(message);

            // Sets the state of the event to signaled, allowing one or more waiting threads to proceed
            if(!retries.Any()) Signal.Set(); 
            return result;
        }
        catch (Exception ex)
        {
            message.Exceptions.Add(ex);
            if (message.RetriesRemaining == 0)
            {
                if (failureHandler != null)
                    failureHandler(message.Exceptions);

                retries.Remove(message);

                // Sets the state of the event to signaled, allowing one or more waiting threads to proceed
                if(!retries.Any()) Signal.Set(); 
            }
            else
            {
                retries.Add(message);
                message.RetriesRemaining--;

                Task.Delay(retryDelay)
                    .ContinueWith(_ => target.Post(message));
            }
            return null;
        }
    }, dataflowBlockOptions);

source.LinkTo(target);

source.Completion.ContinueWith(async _ =>
{
    //Blocks the current thread until the current WaitHandle receives a signal.
    target.Signal.WaitOne();

    target.Complete();
});

我不确定您的 target.InputCount 是在哪里设置的。所以在您更改 target.InputCount 的地方,您可以添加以下代码:
if(InputCount == 0)  Signal.Set();

问题是:target.InputCount 是一个黑盒子 - 它是 TPL Dataflow 中 TransformManyBlock 的只读属性。 - Oleks

1

结合hwcverwe的回答和JamieSee的评论可能是最理想的解决方案。

首先,您需要创建多个事件:

var signal  = new ManualResetEvent(false);
var completedEvent = new ManualResetEvent(false);

接下来,您需要创建一个观察者,并订阅TransformManyBlock,这样当相关事件发生时,您就会收到通知:

var observer = new RetryingBlockObserver<TOutput>(completedEvent);
var observable = target.AsObservable();
observable.Subscribe(observer);

Observable 可以非常简单:

private class RetryingBlockObserver<T> : IObserver<T> {
        private ManualResetEvent completedEvent;

        public RetryingBlockObserver(ManualResetEvent completedEvent) {                
            this.completedEvent = completedEvent;
        }

        public void OnCompleted() {
            completedEvent.Set();
        }

        public void OnError(Exception error) {
            //TODO
        }

        public void OnNext(T value) {
            //TODO
        }
    }

你可以等待信号,或完成(所有源项目的耗尽),或两者兼而有之

 source.Completion.ContinueWith(async _ => {

            WaitHandle.WaitAll(completedEvent, signal);
            // Or WaitHandle.WaitAny, depending on your needs!

            target.Complete();
        });

您可以检查WaitAll的结果值,以了解哪个事件被设置,并相应地做出反应。您还可以将其他事件添加到代码中,将它们传递给观察者,以便在需要时设置它们。例如,当出现错误时,您可以区分自己的行为并做出不同的响应。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接