如何以(线程)安全的方式跟踪TPL管道中的故障项

3

我正在使用TPL管道设计和Stephen Cleary的Try库。简而言之,它将值/异常包装并沿管道漂浮下去。因此,即使在其处理方法中引发了异常的项目,在最后当我await resultsBlock.Completion;时,也具有Status=RunToCompletion。所以我需要其他方式来注册故障项。这是一个小示例:

var downloadBlock = new TransformBlock<int, Try<int>>(construct => Try.Create(() =>
{
    //SomeProcessingMethod();
    return 1;
}));
var processBlock = new TransformBlock<Try<int>, Try<int>>(construct => construct.Map(value =>
{
    //SomeProcessingMethod();
    return 1;
}));
var resultsBlock = new ActionBlock<Try<int>>(construct =>
{
    if (construct.IsException)
    {
        var exception = construct.Exception;
        switch (exception)
        {
            case GoogleApiException gex:
                //_notificationService.NotifyUser("OMG, my dear sir, I think I messed something up:/"
                //Register that this item was faulted, so we know that we need to retry it.
                break;
            default:
                break;
        }
    }
});

一个解决方案是创建一个List<int> FaultedItems;,在我的Exception处理块中插入所有故障项,然后在await resultsBlock.Completion;之后检查列表是否不为空,并为故障项创建新的管道。我的问题是,如果我使用List<int>,如果我决定调整MaxDegreeOfParallelism设置,我是否会有线程安全问题,并且最好使用一些ConcurrentCollection?或者这种方法存在其他缺陷吗?


你误解了Stephen Cleary的解决方案。是数据存在问题,而不是块本身。管道仍然可以工作,因此最终块应该成功完成。如果你不想使用Result<T>对象(这会让你的代码变得简单得多),你可以将故障消息重定向到BufferBlocks(而不是Lists),甚至是写入日志记录器的ActionBlocks。 - Panagiotis Kanavos
1
LinkTo 接受一个谓词,该谓词可用于将消息定向发送到管道中的下一步或其他块(如 BufferBlock、日志记录块甚至是 NullBlock)。但要小心,因为任何未匹配任何谓词的消息都将留在其块的输出缓冲区中,从而实际上阻塞了管道。 - Panagiotis Kanavos
@Panagiotis 谢谢您的回答!我现在完全困惑了。我确实理解流经管道的数据包含故障,并且我很乐意利用它使我的代码尽可能简单。所以您的意思是,如果resultsBlock中的消息包含故障,我应该将其重定向到某个BufferBlock吗?如果我听起来很蠢,请原谅我。这对我来说是新东西。 - niks
看看这个:ITargetBlock<TInput>中的重试策略。实现带有重试功能的数据流块相当棘手,因为一些必须选项会产生固有的困难(EnsureOrderedBoundedCapacity),而且也不明显如何强制执行同一项的重复尝试之间的特定延迟。但是它是可行的。 - Theodor Zoulias
@Theodor 谢谢你的回答。我会查看你提供的链接。看起来需要掌握的东西很多。 - niks
1个回答

3
我将一个重试块实现从类似问题的回答中转换为支持Stephen Cleary的Try类型作为输入和输出。方法CreateRetryTransformBlock返回一个TransformBlock<Try<TInput>, Try<TOutput>>,而方法CreateRetryActionBlock返回一个几乎是ActionBlock<Try<TInput>>的东西。
此外,还有三个选项可用,即MaxAttemptsPerItemMinimumRetryDelayMaxRetriesTotal,以及标准执行选项
public class RetryExecutionDataflowBlockOptions : ExecutionDataflowBlockOptions
{
    /// <summary>The limit after which an item is returned as failed.</summary>
    public int MaxAttemptsPerItem { get; set; } = 1;
    /// <summary>The minimum delay duration before retrying an item.</summary>
    public TimeSpan MinimumRetryDelay { get; set; } = TimeSpan.Zero;
    /// <summary>The limit after which the block transitions to a faulted
    /// state (unlimited is the default).</summary>
    public int MaxRetriesTotal { get; set; } = -1;
}

public class RetryLimitException : Exception
{
    public RetryLimitException(string message, Exception innerException)
        : base(message, innerException) { }
}

public static TransformBlock<Try<TInput>, Try<TOutput>>
    CreateRetryTransformBlock<TInput, TOutput>(
    Func<TInput, Task<TOutput>> transform,
    RetryExecutionDataflowBlockOptions dataflowBlockOptions)
{
    if (transform == null) throw new ArgumentNullException(nameof(transform));
    if (dataflowBlockOptions == null)
        throw new ArgumentNullException(nameof(dataflowBlockOptions));
    int maxAttemptsPerItem = dataflowBlockOptions.MaxAttemptsPerItem;
    int maxRetriesTotal = dataflowBlockOptions.MaxRetriesTotal;
    TimeSpan retryDelay = dataflowBlockOptions.MinimumRetryDelay;
    if (maxAttemptsPerItem < 1) throw new ArgumentOutOfRangeException(
        nameof(dataflowBlockOptions.MaxAttemptsPerItem));
    if (maxRetriesTotal < -1) throw new ArgumentOutOfRangeException(
        nameof(dataflowBlockOptions.MaxRetriesTotal));
    if (retryDelay < TimeSpan.Zero) throw new ArgumentOutOfRangeException(
        nameof(dataflowBlockOptions.MinimumRetryDelay));

    var internalCTS = CancellationTokenSource
        .CreateLinkedTokenSource(dataflowBlockOptions.CancellationToken);

    var maxDOP = dataflowBlockOptions.MaxDegreeOfParallelism;
    var taskScheduler = dataflowBlockOptions.TaskScheduler;

    var exceptionsCount = 0;
    SemaphoreSlim semaphore;
    if (maxDOP == DataflowBlockOptions.Unbounded)
    {
        semaphore = new SemaphoreSlim(Int32.MaxValue);
    }
    else
    {
        semaphore = new SemaphoreSlim(maxDOP, maxDOP);

        // The degree of parallelism is controlled by the semaphore
        dataflowBlockOptions.MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded;

        // Use a limited-concurrency scheduler for preserving the processing order
        dataflowBlockOptions.TaskScheduler = new ConcurrentExclusiveSchedulerPair(
            taskScheduler, maxDOP).ConcurrentScheduler;
    }

    var block = new TransformBlock<Try<TInput>, Try<TOutput>>(async item =>
    {
        // Continue on captured context after every await
        if (item.IsException) return Try<TOutput>.FromException(item.Exception);
        var result1 = await ProcessOnceAsync(item);
        if (item.IsException || result1.IsValue) return result1;
        for (int i = 2; i <= maxAttemptsPerItem; i++)
        {
            await Task.Delay(retryDelay, internalCTS.Token);
            var result = await ProcessOnceAsync(item);
            if (result.IsValue) return result;
        }
        return result1; // Return the first-attempt exception
    }, dataflowBlockOptions);

    dataflowBlockOptions.MaxDegreeOfParallelism = maxDOP; // Restore initial value
    dataflowBlockOptions.TaskScheduler = taskScheduler; // Restore initial value

    _ = block.Completion.ContinueWith(_ => internalCTS.Dispose(),
        TaskScheduler.Default);

    return block;

    async Task<Try<TOutput>> ProcessOnceAsync(Try<TInput> item)
    {
        await semaphore.WaitAsync(internalCTS.Token);
        try
        {
            var result = await item.Map(transform);
            if (item.IsValue && result.IsException)
            {
                ObserveNewException(result.Exception);
            }
            return result;
        }
        finally
        {
            semaphore.Release();
        }
    }

    void ObserveNewException(Exception ex)
    {
        if (maxRetriesTotal == -1) return;
        uint newCount = (uint)Interlocked.Increment(ref exceptionsCount);
        if (newCount <= (uint)maxRetriesTotal) return;
        if (newCount == (uint)maxRetriesTotal + 1)
        {
            internalCTS.Cancel(); // The block has failed
            throw new RetryLimitException($"The max retry limit " +
                $"({maxRetriesTotal}) has been reached.", ex);
        }
        throw new OperationCanceledException();
    }
}

public static ITargetBlock<Try<TInput>> CreateRetryActionBlock<TInput>(
    Func<TInput, Task> action,
    RetryExecutionDataflowBlockOptions dataflowBlockOptions)
{
    if (action == null) throw new ArgumentNullException(nameof(action));
    var block = CreateRetryTransformBlock<TInput, object>(async input =>
    {
        await action(input).ConfigureAwait(false); return null;
    }, dataflowBlockOptions);
    var nullTarget = DataflowBlock.NullTarget<Try<object>>();
    block.LinkTo(nullTarget);
    return block;
}

使用示例:

var downloadBlock = CreateRetryTransformBlock(async (int construct) =>
{
    int result = await DownloadAsync(construct);
    return result;
}, new RetryExecutionDataflowBlockOptions()
{
    MaxDegreeOfParallelism = 10,
    MaxAttemptsPerItem = 3,
    MaxRetriesTotal = 100,
    MinimumRetryDelay = TimeSpan.FromSeconds(10)
});

var processBlock = new TransformBlock<Try<int>, Try<int>>(
    construct => construct.Map(async value =>
{
    return await ProcessAsync(value);
}));

downloadBlock.LinkTo(processBlock,
    new DataflowLinkOptions() { PropagateCompletion = true });

为了简化问题,如果一个项目已经重试了最大次数,保留的异常是第一次发生的异常。随后的异常会丢失。在大多数情况下,丢失的异常会与第一个异常相同。

注意:上述实现没有一个有效的输入队列。如果您将这个块用数百万个项目进行输入,内存使用量将会急剧增加。


1
嗨,Theodor!谢谢你的回答。当我深入研究时,我一定会向你提出一些问题! - niks
所以,为了明确起见,如果我将 var data = new List<string>(){a,b,c,d} 推入管道中,如果 "a" 失败,那么在等待 retryDelay 期间,可能会发生 SemaphoreSlim 允许其他条目进入的情况,比如说 "b" 继续进行,并且 SemaphoreSlim 将第一项 "a" 放在队列的末尾? - niks
我想测试这个行为,但是设置new RetryExecutionDataflowBlockOptions(){MaxAttemptsPerItem = 3, RetryDelay=TimeSpan.FromSeconds(10), EnsureOrdered = false });并不会让其他项排在失败的项前面。最终输出仍然是a、b、c、d。 - niks
1
对不起,那是我的错误。现在可以了。谢谢你的帮助,我现在会继续进行进一步的测试! - niks
1
啊,是的。我原本认为你可以使用原始的 CancellationToken 来实现这个功能,但你不能在其上调用 Cancel 方法。你需要使用 CancellationTokenSource。这是我的一个愚蠢疏忽。我想我现在已经回答完这个问题了。非常感谢你的巨大努力!即使在工作时间之外,也能如此详细地回答所有问题并跟进!:) 这感觉就像高级支持套餐!Efcharistó!:) - niks
显示剩余16条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接