当两个transformblocks都完成时,我如何重写代码才能使代码完成呢? 我认为完成意味着标记已完成并且“ out队列”为空?
public Test()
{
broadCastBlock = new BroadcastBlock<int>(i =>
{
return i;
});
transformBlock1 = new TransformBlock<int, string>(i =>
{
Console.WriteLine("1 input count: " + transformBlock1.InputCount);
Thread.Sleep(50);
return ("1_" + i);
});
transformBlock2 = new TransformBlock<int, string>(i =>
{
Console.WriteLine("2 input count: " + transformBlock1.InputCount);
Thread.Sleep(20);
return ("2_" + i);
});
processorBlock = new ActionBlock<string>(i =>
{
Console.WriteLine(i);
});
//Linking
broadCastBlock.LinkTo(transformBlock1, new DataflowLinkOptions { PropagateCompletion = true });
broadCastBlock.LinkTo(transformBlock2, new DataflowLinkOptions { PropagateCompletion = true });
transformBlock1.LinkTo(processorBlock, new DataflowLinkOptions { PropagateCompletion = true });
transformBlock2.LinkTo(processorBlock, new DataflowLinkOptions { PropagateCompletion = true });
}
public void Start()
{
const int numElements = 100;
for (int i = 1; i <= numElements; i++)
{
broadCastBlock.SendAsync(i);
}
//mark completion
broadCastBlock.Complete();
processorBlock.Completion.Wait();
Console.WriteLine("Finished");
Console.ReadLine();
}
}
我编辑了代码,在每个变换块中添加了一个输入缓冲计数。显然,所有100个项目都被流式传输到每个变换块中。但是,一旦其中一个变换块完成,处理器块就不再接受任何更多的项目,而是将不完整的变换块的输入缓冲区刷新。
SendAsync(i)
上使用await
。 - urbanhuskyTransformBlock
会错过消息。文档说明BroadcastBlock
保证在接受新项之前将其传播到所有链接的目标。如果目标上的BoundedCapacity
未绑定,则目标TransformBlock
将缓冲消息。现在,如果设置了BoundedCapacity
,则它将阻塞直到有容量,并且这可能会导致丢失消息,特别是使用未等待的SendAsync
。简而言之,我的理解是如果无限制,则不会丢失任何消息。 - Kevin Fichter