TPL Dataflow加速?

5
我想知道以下代码是否能被优化以提高执行速度。我目前似乎在一个相当简单的数据流结构上最多只能达到每秒140万个简单消息。我知道这个示例进程同步地传递/转换消息,但是我目前正在测试TPL Dataflow作为我的自定义任务和并发集合解决方案的可能替代品。我知道“并发”这个术语已经表明我要并行运行事物,但是为了进行当前的测试目的,我使用自己的解决方案同步地推送消息,我达到了每秒510万条消息。我错过了什么吗?我读到TPL Dataflow被推作高吞吐量、低延迟的解决方案,但到目前为止,我肯定忽略了性能调整。请问有谁能指导我正确的方向?
class TPLDataFlowExperiments
{
    public TPLDataFlowExperiments()
    {
        var buf1 = new BufferBlock<int>();

        var transform = new TransformBlock<int, string>(t =>
            {
                return "";
            });

        var action = new ActionBlock<string>(s =>
            {
                //Thread.Sleep(100);
                //Console.WriteLine(s);
            });

        buf1.LinkTo(transform);
        transform.LinkTo(action);

        //Propagate all Completions down the flow
        buf1.Completion.ContinueWith(t =>
        {
            transform.Complete();
            transform.Completion.ContinueWith(u =>
            {
                action.Complete();
            });
        });

        Stopwatch watch = new Stopwatch();
        watch.Start();

        int cap = 10000000;
        for (int i = 0; i < cap; i++)
        {
            buf1.Post(i);
        }

        //Mark Buffer as Complete
        buf1.Complete();

        action.Completion.ContinueWith(t =>
            {
                watch.Stop();

                Console.WriteLine("All Blocks finished processing");
                Console.WriteLine("Units processed per second: " + cap / watch.ElapsedMilliseconds * 1000);
            });

        Console.ReadLine();
    }
}
4个回答

9
我认为这主要归结于一件事:你的测试基本上是没有意义的。所有这些块都应该做某些事情,并使用多个核心和异步操作来完成。
此外,在您的测试中,很可能会花费大量时间进行同步。使用更真实的代码,代码将需要一些时间来执行,因此争用会减少,因此实际开销将小于您测量的开销。
但是实际上回答您的问题,是的,您忽略了一些性能调整。具体来说,SingleProducerConstrained,这意味着可以使用更少锁定的数据结构。如果我在这两个块(BufferBlock 在这里完全无用,可以安全地删除它)上使用它,则在我的计算机上,速率从每秒约3-4百万个项目提高到超过500万个。

1
这是我在运行你的代码时在电脑上得到的数字。而我想说的是,你不能像这样比较性能,这不是真实性能的好表示。 - svick
我接受您的批评,感谢您的建议。我可以修改一些代码,运行更多的测试并发布更新。之后您介意再看一遍吗?非常感谢。 - Matt
1
当你发布它的时候,我会看一看。 - svick
我将您的解决方案标记为所需解决方案。我在IPropagatorBlock上做了很多尝试(还记得吗?你在我几天前提出的一个TPL Dataflow相关问题中提出了一个解决方案),并行运行Transform块中的操作(在大多数情况下应该是预期的),将SingleProducerConstraint设置为true(适用于我的项目),在所有LinkTo(...)中传播完成,并在每个数据块中进行实际工作,这表现出了巨大的超越我的当前框架。 - Matt
当前的框架能够在满负荷情况下处理约320万个项目,而使用您的IPropagatorBlock解决方案的TPL Dataflow则将速度提高到每秒超过500万个项目。太棒了。这激起了我进一步深入研究TPL Dataflow内部工作的兴趣。 - Matt

2
为了补充svick的回答,该测试仅使用单个处理线程来执行单个操作块。这样测试的内容仅限于使用块的开销。
DataFlow的工作方式类似于F# Agents、Scala actors和MPI实现。每个操作块一次只执行一个任务,监听输入并生成输出。通过将算法分解为可以在多个核心上独立执行的步骤,并仅相互传递消息来提供加速。
虽然您可以增加并发任务的数量,但最重要的问题是设计流程以独立地执行尽可能多的步骤。

1
谢谢您的评论,但我认为您想说的是数据块而不是操作块,对吗?操作块不会产生输出。 - Matt

0

您还可以增加数据流块的并行度。这可能会提供额外的加速,并且如果您发现其中一个块成为其他块的瓶颈,则可以帮助负载平衡线性任务。


0
如果您的工作负载非常细粒度,预计每秒处理数百万条消息,则通过管道传递单个消息变得不可行,因为会带来额外的开销。您需要通过将消息分批到数组或列表中来对工作负载进行分块处理。例如:
var transform = new TransformBlock<int[], string[]>(batch =>
{
    var results = new string[batch.Length];
    for (int i = 0; i < batch.Length; i++)
    {
        results[i] = ProcessItem(batch[i]);
    }
    return results;
});

如果要批处理您的输入,您可以使用BatchBlock,或者来自System.Interactive包的“linqy”Buffer扩展方法,或者来自MoreLinq包的类似功能的Batch方法,或手动执行。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接