如何使用TPL Dataflow库指定无序执行块?

6
我想设置一个并行处理其项目的TransformBlock。因此,我将ExecutionDataflowBlockOptions.MaxDegreeOfParallelism设置为> 1。我不关心消息的顺序,但是文档说:

当您指定大于1的最大并行度时,多个消息将同时处理,因此,消息可能不按接收顺序处理。然而,从块输出消息的顺序将被正确排序。

"正确排序"是否意味着如果队列中有一个需要长时间处理的消息,则在处理此消息之前不会输出其他消息?

如果是这样,那么我该如何指定执行块(例如TransformBlock),以使其不关心顺序?还是我必须在消费端指定我不关心顺序?


相关:理解TPL Dataflow的并行度排序 - Theodor Zoulias
2个回答

5

在库中没有这样的块,但是您可以通过组合ActionBlockBufferBlock来轻松创建一个。类似这样:

public static IPropagatorBlock<TInput, TOutput>
    CreateUnorderedTransformBlock<TInput, TOutput>(
    Func<TInput, TOutput> func, ExecutionDataflowBlockOptions options)
{
    var buffer = new BufferBlock<TOutput>(options);
    var action = new ActionBlock<TInput>(
        async input =>
        {
            var output = func(input);
            await buffer.SendAsync(output);
        }, options);

    action.Completion.ContinueWith(
        t =>
        {
            IDataflowBlock castedBuffer = buffer;

            if (t.IsFaulted)
            {
                castedBuffer.Fault(t.Exception);
            }
            else if (t.IsCanceled)
            {
                // do nothing: both blocks share options,
                // which means they also share CancellationToken
            }
            else
            {
                castedBuffer.Complete();
            }
        });

    return DataflowBlock.Encapsulate(action, buffer);
}

这种方式,一旦ActionBlock处理完一个项目,它就立即移动到BufferBlock,这意味着不维护顺序。

这段代码的一个问题是它不能很好地观察设置的BoundedCapacity:实际上,该块的容量是选项中设置容量的两倍(因为每个块都有单独的容量)。


谢谢!这应该作为一个示例用于 https://msdn.microsoft.com/en-us/library/hh228606(v=vs.110).aspx,因为它清楚地表明封装对于改变数据流的工作方式非常有用。 - nlawalker
2
EnsureOrdered在2016年被添加到DataflowBlockOptions中,TransformBlock和TransformManyBlock实现了它。对于无序的TransformBlock,在DataflowBlockOptions中将EnsureOrdered设置为false。https://github.com/dotnet/corefx/pull/5191 - NPNelson

0

(将NPNelson评论升级为答案)

DataflowBlockOptions类包含一个可配置属性EnsureOrdered(在2016年引入),该属性确定接收到的消息的顺序是否会保留在块的输出中。默认情况下,此属性为true。将此属性设置为false会使块在处理完消息后立即传播消息,从而增加管道的吞吐量,因为传播速度更快且开销减少。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接