ITargetBlock<TInput>中的重试策略

12
我需要在工作流中引入重试策略。假设有3个块以如下方式连接:
var executionOptions = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 3 };
var buffer = new BufferBlock<int>();
var processing = new TransformBlock<int, int>(..., executionOptions);
var send = new ActionBlock<int>(...);

buffer.LinkTo(processing);
processing.LinkTo(send);

所以有一个缓冲区累积数据,然后将其发送到处理不超过3个项目的转换块,然后将结果发送到操作块。
在处理转换块期间可能会出现瞬态错误,如果错误是瞬态的,则要重试该块多次。
我知道块通常是不可重试的(传递给块的委托可以被设置为可重试)。其中一个选项是包装传递的委托以支持重试。
我也知道有一个非常好的库TransientFaultHandling.Core,它提供了对瞬态故障的重试机制。这是一个很棒的库,但不适用于我的情况。如果我将传递给转换块的委托包装到RetryPolicy.ExecuteAsync方法中,那么转换块内部的消息将被锁定,并且在重试完成或失败之前,转换块将无法接收新消息。想象一下,如果所有3条消息都进入重试(假设下一次重试尝试将在2分钟后进行)并失败,那么转换块将被卡住,直到至少有一条消息离开转换块。
我看到的唯一解决方案是扩展TranformBlock(实际上,ITargetBlock也足够了),然后手动重试(例如从这里开始)。
do
 {
    try { return await transform(input); }
    catch
    { 
        if( numRetries <= 0 ) throw;
        else Task.Delay(timeout).ContinueWith(t => processing.Post(message));
    }
 } while( numRetries-- > 0 );

将消息再次放入转换块中,并延迟,但在这种情况下,重试上下文(剩余的重试次数等)也应传递到该块中。听起来太复杂了...
有没有人看到实现工作流块的重试策略的更简单方法?

关于我刚刚大幅改进的“(like from here):”,你可能想要看一下。 - AgentFire
3个回答

16

我认为你必须要这样做,需要跟踪消息的重试次数并安排重试尝试。

但是你可以通过将其封装在单独的方法中来改进它。例如:

// it's a private class, so public fields are okay
private class RetryingMessage<T>
{
    public T Data;
    public int RetriesRemaining;
    public readonly List<Exception> Exceptions = new List<Exception>();
}

public static IPropagatorBlock<TInput, TOutput>
    CreateRetryingBlock<TInput, TOutput>(
    Func<TInput, Task<TOutput>> transform, int numberOfRetries,
    TimeSpan retryDelay, Action<IEnumerable<Exception>> failureHandler)
{
    var source = new TransformBlock<TInput, RetryingMessage<TInput>>(
        input => new RetryingMessage<TInput>
        { Data = input, RetriesRemaining = numberOfRetries });

    // TransformManyBlock, so that we can propagate zero results on failure
    TransformManyBlock<RetryingMessage<TInput>, TOutput> target = null;
    target = new TransformManyBlock<RetryingMessage<TInput>, TOutput>(
        async message =>
        {
            try
            {
                return new[] { await transform(message.Data) };
            }
            catch (Exception ex)
            {
                message.Exceptions.Add(ex);
                if (message.RetriesRemaining == 0)
                {
                    failureHandler(message.Exceptions);
                }
                else
                {
                    message.RetriesRemaining--;
                    Task.Delay(retryDelay)
                        .ContinueWith(_ => target.Post(message));
                }
                return null;
            }
        });

    source.LinkTo(
        target, new DataflowLinkOptions { PropagateCompletion = true });

    return DataflowBlock.Encapsulate(source, target);
}

我已添加代码来跟踪异常,因为我认为失败不应被忽略,至少应该记录下来。
另外,这段代码与完成度不太兼容:如果有重试正在等待它们的延迟,并且您使用 Complete()块,则会立即完成并且重试将丢失。如果这对您造成问题,您需要跟踪未完成的重试,并在 source 完成并且没有重试等待时完成 target 。

谢谢你的出色回答(特别是关于完成的说明),如果可以的话,我会点赞两次甚至三次。我已经实现了一个解决方案,通过使用类似以下代码的方式来正确(希望如此)完成:我有一组正在重试的消息(添加和删除),以及一个源完成的继续操作,执行轮询(例如await Task.Delay(100)),并验证目标块是否具有输入消息和一组空消息。当两个条件都为真时,我只需完成目标块。它能否在不进行轮询的情况下实现? - Oleks
@Alex,你不需要轮询,而是需要在正确的时间检查条件。但我认为你也需要知道是否有消息等待处理(因为其中一些可能会失败),我不确定如何合理地做到这一点。 - svick

3
除了svick的优秀答案外,还有其他几个选项:
  1. 您可以使用 TransientFaultHandling.Core 库 - 只需将 MaxDegreeOfParallelism 设置为 Unbounded,以便其他消息可以通过。
  2. 您可以修改块输出类型以包括故障指示和重试计数,并创建数据流循环,通过传递一个过滤器给 LinkTo 来检查是否需要进行另一次重试。这种方法更加复杂;如果块正在进行重试,您必须为其添加延迟,并添加一个 TransformBlock 来删除其余网格的故障/重试信息。

如果我将 MaxDegreeOfParallelism 设置为 Unbounded,并且例如我的缓冲区接收到 20K 条输入消息,并且假设其中一半需要重试 - 我认为整个系统会卡住。我是正确的吗? - Oleks
前面的评论还有一个:我认为当块只需等待并且可能会处理另一条消息时,保留该块并不是一个很好的主意-如果我错了,请纠正我。 - Oleks
@Alex: MaxDegreeOfParallelism只是一个最大值,它不会导致饥饿问题。我不确定你所说的"保留"区块是什么意思;通常情况下,只要可能还有更多数据,你会一直保留数据流图。 - Stephen Cleary
据我理解,当使用“TransientFaultHandling.Core”中的“retryPolicy.ExecuteAsync”方法时,它会在块内停止执行并在重试间隔后重试。这意味着,如果该块最多可以同时处理3条消息,并且其中一条消息正在等待重试,则该块将无法获取新消息而是进行重试,直到重试失败或完成。 - Oleks
@Alex:MaxDegreeOfParallelism是一个块上的设置,因此如果您将其设置为3,然后有一条消息等待重试,该块就可以自由地启动另一个任务来处理下一条消息。附注:我假设ExecuteAsync会表现得很聪明(即使用Task.Delay而不是Thread.Sleep)。如果它很愚蠢(使用Thread.Sleep),那么解决方案仍将起作用,但效率会非常低下。 - Stephen Cleary
显示剩余4条评论

2
以下是两种方法CreateRetryTransformBlockCreateRetryActionBlock,它们基于以下假设运作:
  1. 调用者希望处理所有项目,即使其中一些项目反复失败。
  2. 调用者对所有发生的异常感兴趣,即使对于最终成功的项目也是如此(对于CreateRetryActionBlock不适用)。
  3. 调用者可能希望设置总重试次数的上限,之后该块应转换为故障状态。
  4. 调用者希望能够设置正常块的所有可用选项,包括MaxDegreeOfParallelismBoundedCapacityCancellationTokenEnsureOrdered,以及与重试功能相关的选项。
下面的实现使用SemaphoreSlim来控制首次尝试的操作和在其延迟持续时间过去后重试的先前失败操作之间的并发级别。
public class RetryExecutionDataflowBlockOptions : ExecutionDataflowBlockOptions
{
    /// <summary>The limit after which an item is returned as failed.</summary>
    public int MaxAttemptsPerItem { get; set; } = 1;
    /// <summary>The delay duration before retrying an item.</summary>
    public TimeSpan RetryDelay { get; set; } = TimeSpan.Zero;
    /// <summary>The limit after which the block transitions to a faulted
    /// state (unlimited is the default).</summary>
    public int MaxRetriesTotal { get; set; } = -1;
}

public readonly struct RetryResult<TInput, TOutput>
{
    public readonly TInput Input { get; }
    public readonly TOutput Output { get; }
    public readonly bool Success { get; }
    public readonly Exception[] Exceptions { get; }

    public bool Failed => !Success;
    public Exception FirstException => Exceptions != null ? Exceptions[0] : null;
    public int Attempts =>
        Exceptions != null ? Exceptions.Length + (Success ? 1 : 0) : 1;

    public RetryResult(TInput input, TOutput output, bool success,
        Exception[] exceptions)
    {
        Input = input;
        Output = output;
        Success = success;
        Exceptions = exceptions;
    }
}

public class RetryLimitException : Exception
{
    public RetryLimitException(string message, Exception innerException)
        : base(message, innerException) { }
}

public static IPropagatorBlock<TInput, RetryResult<TInput, TOutput>>
    CreateRetryTransformBlock<TInput, TOutput>(
    Func<TInput, Task<TOutput>> transform,
    RetryExecutionDataflowBlockOptions dataflowBlockOptions)
{
    if (transform == null) throw new ArgumentNullException(nameof(transform));
    if (dataflowBlockOptions == null)
        throw new ArgumentNullException(nameof(dataflowBlockOptions));
    int maxAttemptsPerItem = dataflowBlockOptions.MaxAttemptsPerItem;
    int maxRetriesTotal = dataflowBlockOptions.MaxRetriesTotal;
    TimeSpan retryDelay = dataflowBlockOptions.RetryDelay;
    if (maxAttemptsPerItem < 1) throw new ArgumentOutOfRangeException(
        nameof(dataflowBlockOptions.MaxAttemptsPerItem));
    if (maxRetriesTotal < -1) throw new ArgumentOutOfRangeException(
        nameof(dataflowBlockOptions.MaxRetriesTotal));
    if (retryDelay < TimeSpan.Zero) throw new ArgumentOutOfRangeException(
        nameof(dataflowBlockOptions.RetryDelay));
    var cancellationToken = dataflowBlockOptions.CancellationToken;

    var exceptionsCount = 0;
    var semaphore = new SemaphoreSlim(
        dataflowBlockOptions.MaxDegreeOfParallelism);

    async Task<(TOutput, Exception)> ProcessOnceAsync(TInput item)
    {
        await semaphore.WaitAsync(); // Preserve the SynchronizationContext
        try
        {
            var result = await transform(item).ConfigureAwait(false);
            return (result, null);
        }
        catch (Exception ex)
        {
            if (maxRetriesTotal != -1)
            {
                if (Interlocked.Increment(ref exceptionsCount) > maxRetriesTotal)
                {
                    throw new RetryLimitException($"The max retry limit " +
                        $"({maxRetriesTotal}) has been reached.", ex);
                }
            }
            return (default, ex);
        }
        finally
        {
            semaphore.Release();
        }
    }

    async Task<Task<RetryResult<TInput, TOutput>>> ProcessWithRetryAsync(
        TInput item)
    {
        // Creates a two-stages operation. Preserves the context on every await.
        var (result, firstException) = await ProcessOnceAsync(item);
        if (firstException == null) return Task.FromResult(
            new RetryResult<TInput, TOutput>(item, result, true, null));
        return RetryStageAsync();

        async Task<RetryResult<TInput, TOutput>> RetryStageAsync()
        {
            var exceptions = new List<Exception>();
            exceptions.Add(firstException);
            for (int i = 2; i <= maxAttemptsPerItem; i++)
            {
                await Task.Delay(retryDelay, cancellationToken);
                var (result, exception) = await ProcessOnceAsync(item);
                if (exception != null)
                    exceptions.Add(exception);
                else
                    return new RetryResult<TInput, TOutput>(item, result,
                        true, exceptions.ToArray());
            }
            return new RetryResult<TInput, TOutput>(item, default, false,
                exceptions.ToArray());
        };
    }

    // The input block awaits the first stage of each operation
    var input = new TransformBlock<TInput, Task<RetryResult<TInput, TOutput>>>(
        item => ProcessWithRetryAsync(item), dataflowBlockOptions);

    // The output block awaits the second (and final) stage of each operation
    var output = new TransformBlock<Task<RetryResult<TInput, TOutput>>,
        RetryResult<TInput, TOutput>>(t => t, dataflowBlockOptions);

    input.LinkTo(output, new DataflowLinkOptions { PropagateCompletion = true });

    // In case of failure ensure that the input block is faulted too,
    // so that its input/output queues are emptied, and any pending
    // SendAsync operations are aborted
    PropagateFailure(output, input);

    return DataflowBlock.Encapsulate(input, output);

    async void PropagateFailure(IDataflowBlock block1, IDataflowBlock block2)
    {
        try { await block1.Completion.ConfigureAwait(false); }
        catch (Exception ex) { block2.Fault(ex); }
    }
}

public static ITargetBlock<TInput> CreateRetryActionBlock<TInput>(
    Func<TInput, Task> action,
    RetryExecutionDataflowBlockOptions dataflowBlockOptions)
{
    if (action == null) throw new ArgumentNullException(nameof(action));
    var block = CreateRetryTransformBlock<TInput, object>(async input =>
    {
        await action(input).ConfigureAwait(false); return null;
    }, dataflowBlockOptions);
    var nullTarget = DataflowBlock.NullTarget<RetryResult<TInput, object>>();
    block.LinkTo(nullTarget);
    return block;
}

1
对于使用 Stephen Cleary 的优雅且轻量级 Try 类型的实现,而不是这里使用的有些臃肿的 RetryResult 类型,请参考问题。 - Theodor Zoulias
更新:我不再满意上述实现,因为它优先重试重复失败的方式不自然。我目前认为最好的RetryTransformBlock实现是创建一个链接的TransformBlock管道,其长度等于最大重试次数,并使所有消息通过管道。所有TransformBlock都具有相同的逻辑:如果未处理,则处理消息,否则将其与其结果一起向下传播。在我看来,这种配置会产生最自然的行为。 - Theodor Zoulias

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接