我有一串TPL数据流块,并希望在系统内某处观察进度。我知道可以在想要观察的网格中添加TransformBlock,让其发布到某个进度更新器并将消息未更改地返回给下一个块。但我不喜欢这个解决方案,因为块仅用于副作用,而且我还必须更改任何想要观察的块连接逻辑。因此,我想知道是否可以使用ISourceBlock<T>.AsObservable观察网格内传递的消息,而不会更改它或消耗消息。如果可行,这似乎是更纯粹和更实用的解决方案。根据我对Rx的(有限)理解,这意味着我需要使可观察对象处于热状态而不是冷态,以便我的进度更新器看到消息但不会消耗它。.Publish().RefCount()似乎是使可观察对象处于热状态的方法。然而,它并没有按预期工作-相反,block2或progress会接收并消耗每条消息。
那么,我是做错了什么,还是因为TPL Dataflow的AsObservable实现方式而无法实现这一点?
我知道我也可以用Observable/Observer对替换block1和block2之间的LinkTo,这可能会起作用,但是使用具有下游BoundedCapacity = 1的LinkTo正是我首先使用TPL Dataflow的原因。
编辑: 一些澄清:
- 我确实打算在block2中设置BoundedCapacity = 1。虽然在这个简单的例子中是不必要的,但受限下游的情况是我发现TPL Dataflow真正有用的地方。 - 为了澄清我在第二段拒绝的解决方案,它将是添加以下程序块,链接在block1和block2之间: ``` var progressBlock = new TransformBlock( i => {SomeUpdateProgressMethod(i); return i;}); ``` - 我还希望保持反向压力,以便如果更上游的块正在分配工作给block1和其他等效的工作者,那么如果该链已经忙碌,它不会向block1发送工作。
// Set up mesh
var block1 = new TransformBlock<int, int>(i => i + 20, new ExecutionDataflowBlockOptions() { BoundedCapacity = 1 });
var block2 = new ActionBlock<int>(i => Debug.Print("block2:" + i.ToString()), new ExecutionDataflowBlockOptions() { BoundedCapacity = 1 });
var obs = block1.AsObservable().Publish().RefCount(); // Declare this here just in case it makes a difference to do it before the LinkTo call.
var l1 = block1.LinkTo(block2, new DataflowLinkOptions() { PropagateCompletion = true});
// Progress
obs.ForEachAsync(i => Debug.Print("progress:" + i.ToString()));
// Start
var vals = Enumerable.Range(1, 5);
foreach (var v in vals)
{
block1.Post(v);
}
block1.Complete();
结果是不确定的,但我得到了类似这样混合的东西:
block2:21
progress:22
progress:24
block2:23
progress:25
那么,我是做错了什么,还是因为TPL Dataflow的AsObservable实现方式而无法实现这一点?
我知道我也可以用Observable/Observer对替换block1和block2之间的LinkTo,这可能会起作用,但是使用具有下游BoundedCapacity = 1的LinkTo正是我首先使用TPL Dataflow的原因。
编辑: 一些澄清:
- 我确实打算在block2中设置BoundedCapacity = 1。虽然在这个简单的例子中是不必要的,但受限下游的情况是我发现TPL Dataflow真正有用的地方。 - 为了澄清我在第二段拒绝的解决方案,它将是添加以下程序块,链接在block1和block2之间: ``` var progressBlock = new TransformBlock( i => {SomeUpdateProgressMethod(i); return i;}); ``` - 我还希望保持反向压力,以便如果更上游的块正在分配工作给block1和其他等效的工作者,那么如果该链已经忙碌,它不会向block1发送工作。
block2
和progress
。我的猜测是,block2
没有被视为一个Observer
,因为它与block1
的链接不是以RX方式完成的,而是TPL Dataflow在内部实现了LinkTo
(和AsObservable
)。因此,我们无法成功地进行MultiCast
,因为需要在block1
内部设置这样的设置。这听起来正确吗? - theStrawManForEachAsync
方法调用中创建的(您应该使用.Subscribe
代替)。 - EnigmativityLinkTo
谓词将每个消息发布到一个observable中,然后返回true。为了隐藏这种不纯性,可能可以创建一个扩展方法ObservableLinkTo
,它创建observable并同时调用常规的LinkTo
。 - theStrawMan