这适合使用TPL Dataflow吗?

12

我在不同的任务上运行了一个相当典型的生产者/消费者模型。

任务1:从二进制文件中读取byte[]批次,并为每个byte数组集合启动一个新任务(该操作进行批处理以进行内存管理)。

任务2-n:这些是工作任务,每个任务都在传入的byte数组集合(来自任务1)上操作,对byte数组进行反序列化,按某些标准对它们进行排序,然后将结果对象的集合(每个byte数组反序列化为这样的对象)存储在Concurrent Dictionary中。

任务(n+1)我选择了一个并发字典,因为这个任务的工作是合并那些以与Task1源自相同顺序存储在并发字典中的集合。我通过从Task1一直传递一个collectionID(它是int类型的,并针对Task1中的每个新集合递增)来实现这一点。这个任务基本上检查下一个预期的集合ID是否已经存储在并发字典中,如果是,则将其取出,添加到Final Queue中,并检查并发字典中的下一个集合。

现在,根据我所读的和观看的视频,TPL Dataflow似乎是这种生产者/消费者模型的完美候选库。但由于我从未使用过TPL Dataflow,因此似乎无法设计并开始。就吞吐量和延迟而言,这个库是否适合这个任务?我目前每秒处理250万个byte数组和对象的结果集合。TPL Dataflow是否可以帮助简化工作?我特别关注以下问题的答案:当产生工作任务并在工作任务完成后重新合并它们时,TPL Dataflow能否保留Task1的集合批次顺序?它是否优化了一些东西?经过整个结构的分析,我觉得由于旋转和涉及太多并发集合,浪费了相当多的时间。

有任何想法、思路吗?

2个回答

12
编辑:事实证明我错了。即使配置为并行处理,TransformBlock仍然按照它们进入的顺序返回项目。因此,我原始答案中的代码完全无用,可以使用普通的TransformBlock代替。
原始答案: 据我所知,在.Net中只有一个并行性构造支持按照它们进入的顺序返回处理过的项目:带有AsOrdered()的PLINQ。但是我认为PLINQ不适合您想要的东西。
另一方面,TPL Dataflow非常适合,但它没有支持同时支持并行性和按顺序返回项的块(TransformBlock支持这两个功能,但不能同时使用)。幸运的是,Dataflow块是以组合性为设计思路的,因此我们可以构建自己的块来实现这一点。
但首先,我们必须弄清楚如何对结果进行排序。像您建议的那样使用并发字典以及一些同步机制肯定会起作用。但是我认为有一个更简单的解决方案:使用一个Task队列。在输出任务中,您出队一个Task,等待它完成(异步地),并在完成后发送其结果。对于队列为空的情况,我们仍然需要一些同步,但是如果聪明地选择要使用的队列,我们可以免费获得它。
因此,总体思路如下:我们正在编写一个IPropagatorBlock,具有一些输入和一些输出。创建自定义IPropagatorBlock的最简单方法是创建一个处理输入的块,另一个生成结果的块,并将它们视为一个块,使用DataflowBlock.Encapsulate()进行处理。
输入块必须按正确的顺序处理传入的项目,因此不会并行化。它将创建一个新的Task(实际上是一个TaskCompletionSource,以便稍后设置Task的结果),将其添加到队列中,然后发送要处理的项目,以及一种设置正确的Task结果的方法。因为我们不需要将此块链接到任何东西,所以可以使用ActionBlock
输出块将需要从队列中获取任务,异步等待它们,然后将它们发送出去。但由于所有块都嵌入了队列,并且具有委托的块内置了异步等待,因此这将非常简单:new TransformBlock<Task<TOutput>, TOutput>(t => t)。该块将同时作为队列和输出块工作。因此,我们不必处理任何同步问题。
最后一部分拼图实际上是并行处理项目。为此,我们可以使用另一个 ActionBlock,这次设置了 MaxDegreeOfParallelism。它将接收输入,处理它,并将结果设置为队列中正确的 Task
组合起来,它可能看起来像这样:
public static IPropagatorBlock<TInput, TOutput>
    CreateConcurrentOrderedTransformBlock<TInput, TOutput>(
    Func<TInput, TOutput> transform)
{
    var queue = new TransformBlock<Task<TOutput>, TOutput>(t => t);

    var processor = new ActionBlock<Tuple<TInput, Action<TOutput>>>(
        tuple => tuple.Item2(transform(tuple.Item1)),
        new ExecutionDataflowBlockOptions
        {
            MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
        });

    var enqueuer = new ActionBlock<TInput>(
        async item =>
        {
            var tcs = new TaskCompletionSource<TOutput>();
            await processor.SendAsync(
                new Tuple<TInput, Action<TOutput>>(item, tcs.SetResult));
            await queue.SendAsync(tcs.Task);
        });

    enqueuer.Completion.ContinueWith(
        _ =>
        {
            queue.Complete();
            processor.Complete();
        });

    return DataflowBlock.Encapsulate(enqueuer, queue);
}

经过这么多的讨论,我认为这是相当少量的代码。

看起来你非常关心性能,因此您可能需要优化此代码。例如,将processor块的MaxDegreeOfParallelism设置为类似于Environment.ProcessorCount的值,以避免超额订阅。此外,如果对您而言延迟比吞吐量更重要,则可以将同一块的MaxMessagesPerTask设置为1(或另一个较小的数字),以便在完成项目处理时立即将其发送到输出。

此外,如果您想限制传入的项,可以将enqueuerBoundedCapacity设置为适当的值。


1
这在第二段有解释:TransformBlock 无法同时并行处理项目并按顺序返回它们。它可以执行其中一个操作,但不能同时执行两个操作。 - svick
1
这就是所有的代码了。我这里实际上没有使用 LinkTo(),而是在 enqueuer 中两次使用了 SendAsync()。我可以enqueuer 变成一个 TransformBlock 并使用一个 LinkTo() 和一个 SendAsync(),但我更喜欢这种方式,因为它“对称”。ActionBlock通常是一个终点,但其中的代码可以做任何你想做的事情,包括将项发送到其他块,这就是我所做的。 - svick
1
我先讲一下 ActionBlockenqueuer),然后是另一个块,它是 TransformBlockqueue),最后是另一个块,它是 ActionBlockprocessor)。 - svick
1
不需要实现任何接口。只需将该方法放入您的代码中并调用它(传递要在并行执行的函数),它将返回一个新块,执行您所请求的操作。为了测试它,您可以将其链接到另一个“ActionBlock”,然后使用“Post()”或“SendAsync()”添加要处理的项目。 - svick
我已将您的答案标记为所需解决方案,棒极了。不可否认,需要花费相当长的时间来完全理解、尝试并实现性能测试,但是我终于到达目标了,并且您的TPL Dataflow解决方案远远优于我之前使用的并发集合和大量模板代码的实现。真的很棒,这让我更有动力深入研究TPL Dataflow。 - Matt
显示剩余3条评论

0

是的,TPL Dataflow 库非常适合这项工作。它支持您需要的所有功能:MaxDegreeOfParallelismBoundedCapacityEnsureOrdered。但是使用 BoundedCapacity 选项需要注意一些细节。

首先,您必须确保将管道中的第一个块使用SendAsync方法进行输入。否则,如果您使用Post方法并忽略其返回值,则可能会丢失消息。SendAsync永远不会丢失消息,因为它会异步阻塞调用者,直到块的内部缓冲区中有空闲空间来接收传入消息。

其次,您必须确保下游块中的可能异常不会无限期地阻塞饲料器,等待永远不会到来的空闲空间。没有内置的方法可以通过配置块自动完成此操作。相反,您必须手动将下游块的完成情况传播到上游块。以下示例中的PropagateFailure方法就是这样的意图:

public static async Task ProcessAsync(string[] filePaths,
    ConcurrentQueue<MyClass> finalQueue)
{
    var reader = new TransformBlock<string, byte[]>(filePath =>
    {
        byte[] result = ReadBinaryFile(filePath);
        return result;
    }, new ExecutionDataflowBlockOptions()
    {
        MaxDegreeOfParallelism = 1, // This is the default
        BoundedCapacity = 20, // keep memory usage under control
        EnsureOrdered = true // This is also the default
    });

    var deserializer = new TransformBlock<byte[], MyClass>(bytes =>
    {
        MyClass result = Deserialize(bytes);
        return result;
    }, new ExecutionDataflowBlockOptions()
    {
        MaxDegreeOfParallelism = Environment.ProcessorCount,
        BoundedCapacity = 20
    });

    var writer = new ActionBlock<MyClass>(obj =>
    {
        finalQueue.Enqueue(obj);
    });

    reader.LinkTo(deserializer,
        new DataflowLinkOptions() { PropagateCompletion = true });
    PropagateFailure(deserializer, reader); // Link backwards

    deserializer.LinkTo(writer,
        new DataflowLinkOptions() { PropagateCompletion = true });
    PropagateFailure(writer, deserializer); // Link backwards

    foreach (var filePath in filePaths)
    {
        var accepted = await reader.SendAsync(filePath).ConfigureAwait(false);
        if (!accepted) break; // This will happen in case that the block has failed
    }
    reader.Complete(); // This will be ignored if the block has already failed

    await writer.Completion; // This will propagate the first exception that occurred
}

public static async void PropagateFailure(IDataflowBlock block1,
    IDataflowBlock block2)
{
    try { await block1.Completion.ConfigureAwait(false); }
    catch (Exception ex) { block2.Fault(ex); }
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接