在TPL数据流图中,是否有一种习惯用法来路由在TransformBlock中失败的元素?

4
我使用TPL数据流创建了一个输入元素的缓冲块,这些元素由TransformBlock进行处理,并输出到一个输出缓冲块。
inputQueue = new BufferBlock<InputPacket>;
processQueue = new TransformBlock <InputPacket, OutputPacket>;
outputQueue = new BufferBlock<OutputPacket>;

inputQueue.LinkTo(processQueue, new DataflowLinkOptions { PropagateCompletion = true });
processQueue.LinkTo(outputQueue, new DataflowLinkOptions { PropagateCompletion = true });

有没有一种惯用的方法来路由失败的元素?
当InputPacket元素完成处理时,与processQueue关联的操作将返回一个OutputPacket,该输出将被路由到outputQueue。
如果与processQueue关联的操作调用不可靠的Web服务,则某些InputPacket元素的处理将超时,并且我想重试这些元素x次。但我不想立即尝试它们,我想将它们放回输入队列。
我希望能够将超时的InputPacket元素路由回inputQueue,直到它们失败了x次,然后路由到failureQueue:
BufferBlock<CallPacket> failureQueue = new BufferBlock<InputPacket>;

使用LinkTo谓词的复杂性在于涉及到两种不同的类型:

InputPacket OutputPacket

看起来我可以通过更改以下内容来处理这个问题:

processQueue = new TransformBlock <InputPacket, ParentPacketType>;

接着基于数据包类型编写谓词。

或者

通过将输出存储在InputPacket的成员变量中的方式存储在inputElement中,但两种方式似乎都不是一个好的实现方法。


1
这个项目包含LinkSubTypeTo,看起来是为了这个目的而设计的:https://github.com/gridsum/DataflowEx - compound eye
1个回答

3

首先,我认为你的术语用词让人困惑,你应该坚持使用TPL Dataflow所使用的术语。数据流图不是由队列组成的,而是由组成的。这些块处理的是数据元素而不是块本身。

现在,当TPL Dataflow没有提供您想要的块时,其中一个解决方案是从提供的块中构建出自己需要的块。简单版本的块可以看起来像这样:

public static IPropagatorBlock<TInput, TOutput> CreateRetryTransformBlock<TInput, TOutput>(
    Func<TInput, TOutput> transform, int retryCount,
    ITargetBlock<(TInput, Exception)> failureBlock)
{
    var failedInputs = new Dictionary<TInput, int>();

    TransformManyBlock<TInput, TOutput> resultBlock = null;

    resultBlock = new TransformManyBlock<TInput, TOutput>(
        async input =>
        {
            try
            {
                return new[] { transform(input) };
            }
            catch (Exception exception)
            {
                failedInputs.TryGetValue(input, out int count);

                if (count < retryCount)
                {
                    failedInputs[input] = count + 1;
                    // ignoring the returned Task, to avoid deadlock
                    _ = resultBlock.SendAsync(input);
                }
                else
                {
                    failedInputs.Remove(input);
                    await failureBlock.SendAsync((input, exception));
                }

                return Array.Empty<TOutput>();
            }
        });

    return resultBlock;
}

我做出的假设:

  • 你可以使用C# 7.0。如果不行,我用到的功能很容易替换。
  • 如果忽略除了最后一个异常以外的所有异常,那么就没问题。否则,Dictionary 将不得不存储所有先前的异常,然后将它们发送到failureBlock
  • 将失败的数据元素发送回同一块是可以的。否则,该方法将不得不取一个更多的参数并用它来实现。
  • 块不需要支持并行处理。如果需要,您必须使代码线程安全(您可能会首先使用 ConcurrentDictionary 而不是 Dictionary)。
  • 输入数据元素可以存储在字典中(即:其GetHashCode的行为正确),且没有重复的输入。否则,您将不得不设计某种其他机制来计数重试。

我将使用这种方法进行工作,如果对我有效,我会回来打勾,加油! - compound eye

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接