TPL Dataflow块在PropagateCompletion上永远不会完成

5
自从我对我的传播完成流水线进行最后一次修改之后,我的缓冲块中的一个永远不会完成。让我总结一下以前工作和现在不再工作的内容:
先前正常工作的内容:
A.LinkTo(B, PropagateCompletion);
B.LinkTo(C, PropagateCompletion);
C.LinkTo(D, PropagateCompletion);
D.Receive();

// everything completes

不再工作:


A.LinkTo(B, PropagateCompletion);
C.LinkTo(D, PropagateCompletion);

await A.Completion;
someWriteOnceBlock.Post(B.Count);
// B.Complete(); commented on purpose
B.LinkTo(C, PropagateCompletion);

D.Receive();

// Only A reaches completion
// B remains in 'waiting for activation'
// C executes but obviously never completes since B doesn't either

如果我取消注释该行代码,一切都能正常工作,但显然这行代码不应该是必要的。
一些方式中,我的缓冲块B永远无法完成,即使与它相连的块已经完成并传播了其完成信号,并且与它相连的块已经接收到所有缓冲的项。

你应该展示你的实际代码。 - i3arnon
@i3arnon 我不能放整个代码,因为管道非常复杂,清理只保留相关部分需要很长时间,但更重要的是,我的管道中的所有内容都非常标准,所有块都链接在一起,一个东西进去一个东西出来。那些在await周围的行是唯一不寻常的事情,它们正是我所拥有的方式,我相信在我完美工作的管道的其余部分中没有任何东西可以解释取消注释的行使其工作,并且注释它会防止B完成。 - Luis Ferrao
你最终有没有成功呢? - Jacques Bosch
2个回答

2

通过等待 A 的完成,剩余的代码直到 A 完成才会执行。这就是 await 的工作方式——在它之后的代码被包装在一个续体中,准备好等待所等待的代码完成。因此,在这种情况下,B 与 A 关联在 A 完成后,因此不会传播完成状态。


首先,您可能是指A链接到B(而不是B链接到A),其次,您的说明仅确认等待A.Completion是问题,这显然我已经知道,但它并没有说明传播的完成状态发生了什么,它是否被忽略了? - Luis Ferrao

1
我遇到了同样的问题 - 我的最终块是一个TransformBlock,它只是等待完成。
如果一个块没有剩余的输入要处理,所有的输出已经被处理并且已经完成,那么这个块才会完成,可以通过调用block.Complete或者将完成传播到链中来完成。
我通过将最终块设置为ActionBlock来解决了这个问题,因为它不会产生任何输出。这是我的代码,它接受一系列文本文件并反序列化它们的内容:
Dim maxDop = Environment.ProcessorCount
Dim executionOptions = New ExecutionDataflowBlockOptions With {.MaxDegreeOfParallelism = maxDop}
Dim linkOptions = New DataflowLinkOptions With {.PropagateCompletion = True}

Dim inputBlock = New BufferBlock(Of String)
Dim transformBlock = New TransformBlock(Of String, IEnumerable(of JsonObject))(Function(fileName) DeserialiseFromFileAsync(fileName)
Dim outputBlock As New BufferBlock(Of IEnumerable(of JsonObject))
Dim combineBlock = New ActionBlock(Of IEnumerable(of JsonObject))(Sub(col) ' Do something to add all the collections together)

inputBlock.LinkTo(transformBlock, linkOptions)
transformBlock.LinkTo(outputBlock, linkOptions)
outputBlock.LinkTo(combineBlock, linkOptions)

For fileNo = 1 To 10
    inputBlock.Post(String.Concat("JsonFile", fileNo, ".txt"))
Next

inputBlock.Complete() 'Complete the first block, propogation will handle the rest
Await combineBlock.Completion 'Await the last block completing

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接