TPL数据流,核心设计令人困惑

4
我一直在使用TPL Dataflow,但是遇到了一个无法解决的问题:
我有以下架构:
BroadCastBlock<List<object1>> -> 2个不同的TransformBlock<List<Object1>, Tuple<int, List<Object1>>> -> 都链接到TransformManyBlock<Tuple<int, List<Object1>>, Object2>
我在链的末端变化TransformManyBlock中的lambda表达式:(a) 对流式元组执行操作的代码,(b) 没有任何代码。
在TransformBlocks内,我从第一个项目到达时开始计时,并在TransformBlock.Completion指示块完成时停止计时(broadCastBlock链接到具有propagateCompletion设置为true的transfrom blocks)。
我无法调和的是,在情况(b)下的transformBlocks比情况(a)下的快5-6倍。这完全违背了整个TDF设计意图的初衷。来自转换块的项目已传递到transfromManyBlock,因此,影响转换块完成时间的是transfromManyBlock对项目所做的任何事情都无关紧要。我没有看到任何一个原因,说明在transfromManyBlock中发生的任何事情都可能影响前面的TransformBlocks。
有人能调和这种奇怪的观察结果吗?
这里是一些代码以显示差异。运行代码时,请确保更改以下两行:
        tfb1.transformBlock.LinkTo(transformManyBlock);
        tfb2.transformBlock.LinkTo(transformManyBlock);

to:

        tfb1.transformBlock.LinkTo(transformManyBlockEmpty);
        tfb2.transformBlock.LinkTo(transformManyBlockEmpty);

为了观察前面的transformBlocks在运行时间上的差异。
class Program
{
    static void Main(string[] args)
    {
        Test test = new Test();
        test.Start();
    }
}

class Test
{
    private const int numberTransformBlocks = 2;
    private int currentGridPointer;
    private Dictionary<int, List<Tuple<int, List<Object1>>>> grid;

    private BroadcastBlock<List<Object1>> broadCastBlock;
    private TransformBlockClass tfb1;
    private TransformBlockClass tfb2;

    private TransformManyBlock<Tuple<int, List<Object1>>, Object2> 
               transformManyBlock;
    private TransformManyBlock<Tuple<int, List<Object1>>, Object2> 
               transformManyBlockEmpty;
    private ActionBlock<Object2> actionBlock;

    public Test()
    {
        grid = new Dictionary<int, List<Tuple<int, List<Object1>>>>();

        broadCastBlock = new BroadcastBlock<List<Object1>>(list => list);

        tfb1 = new TransformBlockClass();
        tfb2 = new TransformBlockClass();

        transformManyBlock = new TransformManyBlock<Tuple<int, List<Object1>>, Object2>
                (newTuple =>
            {
                for (int counter = 1; counter <= 10000000;  counter++)
                {
                    double result = Math.Sqrt(counter + 1.0);
                }

                return new Object2[0];

            });

        transformManyBlockEmpty 
            = new TransformManyBlock<Tuple<int, List<Object1>>, Object2>(
                  tuple =>
            {
                return new Object2[0];
            });

        actionBlock = new ActionBlock<Object2>(list =>
            {
                int tester = 1;
                //flush transformManyBlock
            });

        //linking
        broadCastBlock.LinkTo(tfb1.transformBlock
                              , new DataflowLinkOptions 
                                  { PropagateCompletion = true }
                              );
        broadCastBlock.LinkTo(tfb2.transformBlock
                              , new DataflowLinkOptions 
                                  { PropagateCompletion = true }
                              );

        //link either to ->transformManyBlock or -> transformManyBlockEmpty
        tfb1.transformBlock.LinkTo(transformManyBlock);
        tfb2.transformBlock.LinkTo(transformManyBlock);

        transformManyBlock.LinkTo(actionBlock
                                  , new DataflowLinkOptions 
                                       { PropagateCompletion = true }
                                  );
        transformManyBlockEmpty.LinkTo(actionBlock
                                       , new DataflowLinkOptions 
                                            { PropagateCompletion = true }
                                       );

        //completion
        Task.WhenAll(tfb1.transformBlock.Completion
                     , tfb2.transformBlock.Completion)
                       .ContinueWith(_ =>
            {
                transformManyBlockEmpty.Complete();
                transformManyBlock.Complete();
            });

        transformManyBlock.Completion.ContinueWith(_ =>
            {
                Console.WriteLine("TransformManyBlock (with code) completed");
            });

        transformManyBlockEmpty.Completion.ContinueWith(_ =>
        {
            Console.WriteLine("TransformManyBlock (empty) completed");
        });

    }

    public void Start()
    {
        const int numberBlocks = 100;
        const int collectionSize = 300000;


        //send collection numberBlock-times
        for (int i = 0; i < numberBlocks; i++)
        {
            List<Object1> list = new List<Object1>();
            for (int j = 0; j < collectionSize; j++)
            {
                list.Add(new Object1(j));
            }

            broadCastBlock.Post(list);
        }

        //mark broadCastBlock complete
        broadCastBlock.Complete();

        Console.WriteLine("Core routine finished");
        Console.ReadLine();
    }
}

class TransformBlockClass
{
    private Stopwatch watch;
    private bool isStarted;
    private int currentIndex;

    public TransformBlock<List<Object1>, Tuple<int, List<Object1>>> transformBlock;

    public TransformBlockClass()
    {
        isStarted = false;
        watch = new Stopwatch();

        transformBlock = new TransformBlock<List<Object1>, Tuple<int, List<Object1>>>
           (list =>
        {
            if (!isStarted)
            {
                StartUp();
                isStarted = true;
            }

            return new Tuple<int, List<Object1>>(currentIndex++, list);
        });

        transformBlock.Completion.ContinueWith(_ =>
            {
                ShutDown();
            });
    }

    private void StartUp()
    {
        watch.Start();
    }

    private void ShutDown()
    {
        watch.Stop();

        Console.WriteLine("TransformBlock : Time elapsed in ms: " 
                              + watch.ElapsedMilliseconds);
    }
}

class Object1
{
    public int val { get; private set; }

    public Object1(int val)
    {
        this.val = val;
    }
}

class Object2
{
    public int value { get; private set; }
    public List<Object1> collection { get; private set; }

    public Object2(int value, List<Object1> collection)
    {
        this.value = value;
        this.collection = collection;
    }    
}

*编辑:我又发布了另一段代码,这次使用值类型的集合,我无法重现我在上面代码中观察到的问题。是不是传递引用类型并在它们之间同时操作(甚至在不同的数据流块中)可能会阻塞并引起争用?*

class Program
{
    static void Main(string[] args)
    {
        Test test = new Test();
        test.Start();
    }
}

class Test
{
    private BroadcastBlock<List<int>> broadCastBlock;
    private TransformBlock<List<int>, List<int>> tfb11;
    private TransformBlock<List<int>, List<int>> tfb12;
    private TransformBlock<List<int>, List<int>> tfb21;
    private TransformBlock<List<int>, List<int>> tfb22;
    private TransformManyBlock<List<int>, List<int>> transformManyBlock1;
    private TransformManyBlock<List<int>, List<int>> transformManyBlock2;
    private ActionBlock<List<int>> actionBlock1;
    private ActionBlock<List<int>> actionBlock2;

    public Test()
    {
        broadCastBlock = new BroadcastBlock<List<int>>(item => item);

        tfb11 = new TransformBlock<List<int>, List<int>>(item =>
            {
                return item;
            });

        tfb12 = new TransformBlock<List<int>, List<int>>(item =>
            {
                return item;
            });

        tfb21 = new TransformBlock<List<int>, List<int>>(item =>
            {
                return item;
            });

        tfb22 = new TransformBlock<List<int>, List<int>>(item =>
            {
                return item;
            });

        transformManyBlock1 = new TransformManyBlock<List<int>, List<int>>(item =>
            {
                Thread.Sleep(100);
                //or you can replace the Thread.Sleep(100) with actual work, 
                //no difference in results. This shows that the issue at hand is 
                //unrelated to starvation of threads.

                return new List<int>[1] { item };
            });

        transformManyBlock2 = new TransformManyBlock<List<int>, List<int>>(item =>
            {
                return new List<int>[1] { item };
            });

        actionBlock1 = new ActionBlock<List<int>>(item =>
            {
                //flush transformManyBlock
            });

        actionBlock2 = new ActionBlock<List<int>>(item =>
        {
            //flush transformManyBlock
        });

        //linking
        broadCastBlock.LinkTo(tfb11, new DataflowLinkOptions 
                                      { PropagateCompletion = true });
        broadCastBlock.LinkTo(tfb12, new DataflowLinkOptions 
                                      { PropagateCompletion = true });
        broadCastBlock.LinkTo(tfb21, new DataflowLinkOptions 
                                      { PropagateCompletion = true });
        broadCastBlock.LinkTo(tfb22, new DataflowLinkOptions 
                                      { PropagateCompletion = true });

        tfb11.LinkTo(transformManyBlock1);
        tfb12.LinkTo(transformManyBlock1);
        tfb21.LinkTo(transformManyBlock2);
        tfb22.LinkTo(transformManyBlock2);

        transformManyBlock1.LinkTo(actionBlock1
                                   , new DataflowLinkOptions 
                                     { PropagateCompletion = true }
                                   );
        transformManyBlock2.LinkTo(actionBlock2
                                   , new DataflowLinkOptions 
                                     { PropagateCompletion = true }
                                   );

        //completion
        Task.WhenAll(tfb11.Completion, tfb12.Completion).ContinueWith(_ =>
            {
                Console.WriteLine("TransformBlocks 11 and 12 completed");
                transformManyBlock1.Complete();
            });

        Task.WhenAll(tfb21.Completion, tfb22.Completion).ContinueWith(_ =>
            {
                Console.WriteLine("TransformBlocks 21 and 22 completed");
                transformManyBlock2.Complete();
            });

        transformManyBlock1.Completion.ContinueWith(_ =>
            {
                Console.WriteLine
                    ("TransformManyBlock (from tfb11 and tfb12) finished");
            });

        transformManyBlock2.Completion.ContinueWith(_ =>
            {
                Console.WriteLine
                    ("TransformManyBlock (from tfb21 and tfb22) finished");
            });
    }

    public void Start()
    {
        const int numberBlocks = 100;
        const int collectionSize = 300000;

        //send collection numberBlock-times
        for (int i = 0; i < numberBlocks; i++)
        {
            List<int> list = new List<int>();
            for (int j = 0; j < collectionSize; j++)
            {
                list.Add(j);
            }

            broadCastBlock.Post(list);
        }

        //mark broadCastBlock complete
        broadCastBlock.Complete();

        Console.WriteLine("Core routine finished");
        Console.ReadLine();
    }
}

谢谢,我现在会看一下。 - casperOne
@casperOne,我添加了另一段代码,这次使用值类型的集合,可能是因为同时访问ref类型(即使在不同的数据流块中)会导致阻塞,从而导致延迟,即使在“更高层次”的数据块中也可能出现这种情况。 - Matt
糟糕,无法测试。最早要等到周一才能访问TPL数据流(安装有VS.NET 2012的机器坏了)...我正在努力! - casperOne
@casperOne,别担心,我忘记添加第二段代码会导致transformManyBlocks在每个块内执行多少工作时都在大约相同的时间内完成。我无法看出第一个代码库和第二个代码库之间有什么不同,可以解释第一个代码库中任务完成的停顿。 - Matt
1个回答

3

好的,最后一次尝试 ;-)

简介:

场景1中观测到的时间差异可以完全解释为垃圾回收器的行为不同。

在运行链接transformManyBlocks的场景1时,在主线程上创建新项目(列表)期间会触发垃圾收集,而在使用链接transformManyBlockEmptys运行场景1时则不是这种情况。

请注意,创建新的引用类型实例(Object1)会导致调用在GC堆中分配内存,从而可能触发GC收集运行。由于创建了相当多的Object1实例(和列表),因此垃圾收集器需要扫描堆以查找(可能)不可访问的对象。

因此,可以通过以下任何一种方式将观察到的差异最小化:

  • 将Object1从类转换为结构体(从而确保不在堆上分配实例的内存)。
  • 保持对生成的列表的引用(从而减少垃圾回收器需要识别不可访问对象的时间)。
  • 在将它们发布到网络之前生成所有项。

(注:我无法解释为什么垃圾收集器在场景1“transformManyBlock”与场景1“transformManyBlockEmpty”中的行为不同,但通过ConcurrencyVisualizer收集的数据清楚地显示了差异。)

结果:

(测试在Core i7 980X上运行,有6个核心和启用HT):

我对场景2进行了以下修改:

// Start a stopwatch per tfb
int tfb11Cnt = 0;
Stopwatch sw11 = new Stopwatch();
tfb11 = new TransformBlock<List<int>, List<int>>(item =>
{
    if (Interlocked.CompareExchange(ref tfb11Cnt, 1, 0) == 0)
        sw11.Start();

    return item;
});

// [...]

// completion
Task.WhenAll(tfb11.Completion, tfb12.Completion).ContinueWith(_ =>
{

     Console.WriteLine("TransformBlocks 11 and 12 completed. SW11: {0}, SW12: {1}",
     sw11.ElapsedMilliseconds, sw12.ElapsedMilliseconds);
     transformManyBlock1.Complete();
});

结果:

  1. 场景1(按原样发布,即链接到transformManyBlock):
    TransformBlock: 经过的时间(毫秒): 6826
    TransformBlock: 经过的时间(毫秒): 6826
  2. 场景1(链接到transformManyBlockEmpty):
    TransformBlock: 经过的时间(毫秒): 3140
    TransformBlock: 经过的时间(毫秒): 3140
  3. 场景1(transformManyBlock,循环体中的Thread.Sleep(200)):
    TransformBlock: 经过的时间(毫秒): 4949
    TransformBlock: 经过的时间(毫秒): 4950
  4. 场景2(按原样发布,但改为报告时间):
    完成转换块21和22。SW21: 619 毫秒,SW22: 669 毫秒
    完成转换块11和12。SW11: 669 毫秒,SW12: 667 毫秒

接下来,我更改了场景1和2,以便在将输入数据发布到网络之前准备它们:

// Scenario 1
//send collection numberBlock-times
var input = new List<List<Object1>>(numberBlocks);
for (int i = 0; i < numberBlocks; i++)
{
    var list = new List<Object1>(collectionSize);
    for (int j = 0; j < collectionSize; j++)
    {
        list.Add(new Object1(j));
    }
    input.Add(list);
}

foreach (var inp in input)
{
    broadCastBlock.Post(inp);
    Thread.Sleep(10);
}

// Scenario 2
//send collection numberBlock-times
var input = new List<List<int>>(numberBlocks);
for (int i = 0; i < numberBlocks; i++)
{
    List<int> list = new List<int>(collectionSize);
    for (int j = 0; j < collectionSize; j++)
    {
        list.Add(j);
    }

    //broadCastBlock.Post(list);
    input.Add(list);
 }

 foreach (var inp in input)
 {
     broadCastBlock.Post(inp);
     Thread.Sleep(10);
 }

结果:

  1. 场景 1(transformManyBlock)
    转换块:耗时 1029 毫秒
    转换块:耗时 1029 毫秒
  2. 场景 1(transformManyBlockEmpty)
    转换块:耗时 975 毫秒
    转换块:耗时 975 毫秒
  3. 场景 1(transformManyBlock,在循环体中使用 Thread.Sleep(200))
    转换块:耗时 972 毫秒
    转换块:耗时 972 毫秒

最后,我将代码改回了原始版本,但保留了所创建列表的引用:

var lists = new List<List<Object1>>();
for (int i = 0; i < numberBlocks; i++)
{
    List<Object1> list = new List<Object1>();
    for (int j = 0; j < collectionSize; j++)
    {
        list.Add(new Object1(j));
    }
    lists.Add(list);                
    broadCastBlock.Post(list);
}

结果:

  1. 场景1(transformManyBlock)
    TransformBlock:经过6052毫秒的时间
    TransformBlock:经过6052毫秒的时间
  2. 场景1(transformManyBlockEmpty)
    TransformBlock:经过5524毫秒的时间
    TransformBlock:经过5524毫秒的时间
  3. 场景1(transformManyBlock,循环体中使用Thread.Sleep(200))
    TransformBlock:经过5098毫秒的时间
    TransformBlock:经过5098毫秒的时间

同样地,将Object1从类更改为结构会导致两个块以大约相同的时间完成(并且快了约10倍)。


更新:以下答案不足以解释观察到的行为。

在场景一中,TransformMany lambda内部执行一个紧密的循环,这将占用CPU并使其他线程无法获得处理器资源。这就是为什么可以观察到执行Completion continuation任务时出现延迟的原因。在场景二中,TransformMany lambda内部执行Thread.Sleep,使其他线程有机会执行Completion continuation任务。观察到的运行时行为差异与TPL Dataflow无关。要改进观察到的差异,应在场景1的循环体内引入Thread.Sleep:

for (int counter = 1; counter <= 10000000;  counter++)
{
   double result = Math.Sqrt(counter + 1.0);
   // Back off for a little while
   Thread.Sleep(200);
}

你确定你正在测量正确的东西吗?请注意,当你执行像这样的操作:transformBlock.Completion.ContinueWith(_ => ShutDown());时,你的时间测量将受到TaskScheduler行为的影响(例如,直到继续任务开始执行需要多长时间)。虽然我无法在我的计算机上观察到你看到的差异,但是当使用专用线程来测量时间时,我得到了更精确的结果(以tfb1和tfb2完成时间之间的差异为准)。
       // Within your Test.Start() method...
       Thread timewatch = new Thread(() =>
       {
           var sw = Stopwatch.StartNew();
           tfb1.transformBlock.Completion.Wait();
           Console.WriteLine("tfb1.transformBlock completed within {0} ms",
                              sw.ElapsedMilliseconds);
        });

        Thread timewatchempty = new Thread(() =>
        {
            var sw = Stopwatch.StartNew();
            tfb2.transformBlock.Completion.Wait();
            Console.WriteLine("tfb2.transformBlock completed within {0} ms", 
                               sw.ElapsedMilliseconds);
        });

        timewatch.Start();
        timewatchempty.Start();

        //send collection numberBlock-times
        for (int i = 0; i < numberBlocks; i++)
        {
          // ... rest of the code

1
同样的原因:您的代码在执行紧密的for循环时会导致处理器过载。在此期间,线程调度程序决定其中一个已安排完成任务可以运行,然后授予执行循环的繁忙线程更多的CPU时间。然后一段时间后,它决定应该运行其他已安排的完成任务。另一方面,在情况2中,没有任何事情发生,执行TransformMany lambda的线程只是在睡觉,因此线程调度器决定完成任务可以立即运行。 - afrischke
嗯,我怀疑这不是问题所在。对于场景II,之前我让transformManyBlock工作而不是让它休眠,但没有任何区别。你可以尝试自己将Thread.Sleep(x)替换为一些简单的使CPU保持忙碌的东西。你会发现所有的transformBlocks都在完全相同的时间完成。 - Matt
没问题。只需在循环中加入一个睡眠(或者像示例二一样替换整个循环),看看是否会改变观察到的行为。如果有改变,那么我的解释很可能是正确的;如果没有改变,那么你就证明了我是错误的。 - afrischke
你的答案目前绝对是错误的。我已经告诉过你,你可以用实际工作替换Thread.Sleep,你会发现没有任何变化。关于线程饥饿的论点在这里不适用。 - Matt
我这么说是因为两个“transformBlocks”在大约相同的时间内都被提供了完全相同的项目,在本例中,两者执行了相同的工作。为什么我发送项目更快或更慢会有完成时间差异?现在它告诉我的只是部分开销来自准备项目(我以前就知道),但为什么实时创建项目会有差异呢?对结果感到困惑... - Matt
显示剩余7条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接