使用AsObservable观察TPL Dataflow块而不消费消息

9
我有一串TPL数据流块,并希望在系统内某处观察进度。我知道可以在想要观察的网格中添加TransformBlock,让其发布到某个进度更新器并将消息未更改地返回给下一个块。但我不喜欢这个解决方案,因为块仅用于副作用,而且我还必须更改任何想要观察的块连接逻辑。因此,我想知道是否可以使用ISourceBlock<T>.AsObservable观察网格内传递的消息,而不会更改它或消耗消息。如果可行,这似乎是更纯粹和更实用的解决方案。根据我对Rx的(有限)理解,这意味着我需要使可观察对象处于热状态而不是冷态,以便我的进度更新器看到消息但不会消耗它。.Publish().RefCount()似乎是使可观察对象处于热状态的方法。然而,它并没有按预期工作-相反,block2或progress会接收并消耗每条消息。
// Set up mesh
var block1 = new TransformBlock<int, int>(i => i + 20, new ExecutionDataflowBlockOptions() { BoundedCapacity = 1 });
var block2 = new ActionBlock<int>(i => Debug.Print("block2:" + i.ToString()), new ExecutionDataflowBlockOptions() { BoundedCapacity = 1 }); 
var obs = block1.AsObservable().Publish().RefCount(); // Declare this here just in case it makes a difference to do it before the LinkTo call.
var l1 = block1.LinkTo(block2, new DataflowLinkOptions() { PropagateCompletion = true});

// Progress
obs.ForEachAsync(i => Debug.Print("progress:" + i.ToString()));

// Start
var vals = Enumerable.Range(1, 5);
foreach (var v in vals)
{
    block1.Post(v);
}
block1.Complete();

结果是不确定的,但我得到了类似这样混合的东西:

block2:21
progress:22
progress:24
block2:23
progress:25

那么,我是做错了什么,还是因为TPL Dataflow的AsObservable实现方式而无法实现这一点?
我知道我也可以用Observable/Observer对替换block1和block2之间的LinkTo,这可能会起作用,但是使用具有下游BoundedCapacity = 1的LinkTo正是我首先使用TPL Dataflow的原因。
编辑: 一些澄清:
- 我确实打算在block2中设置BoundedCapacity = 1。虽然在这个简单的例子中是不必要的,但受限下游的情况是我发现TPL Dataflow真正有用的地方。 - 为了澄清我在第二段拒绝的解决方案,它将是添加以下程序块,链接在block1和block2之间: ``` var progressBlock = new TransformBlock( i => {SomeUpdateProgressMethod(i); return i;}); ``` - 我还希望保持反向压力,以便如果更上游的块正在分配工作给block1和其他等效的工作者,那么如果该链已经忙碌,它不会向block1发送工作。

我并不完全理解,但是多个观察者确实是我想要的,即block2progress。我的猜测是,block2没有被视为一个Observer,因为它与block1的链接不是以RX方式完成的,而是TPL Dataflow在内部实现了LinkTo(和AsObservable)。因此,我们无法成功地进行MultiCast,因为需要在block1内部设置这样的设置。这听起来正确吗? - theStrawMan
1
是的,这样听起来更好。问题在于您的代码中可观察对象只有一个观察者。数据流块不是您可观察对象的观察者。您唯一拥有的观察者是在 ForEachAsync 方法调用中创建的(您应该使用 .Subscribe 代替)。 - Enigmativity
@theStrawMan 你尝试将块设置为非贪婪模式而不是限制它们的容量吗?这可能是你想要的,因为如果块忙碌,它不会请求更多的消息。 - VMAtm
@user4388177 - 这是一段时间之前的事情了,但我相当确定我只是使用了一个hacky方法,其中我让LinkTo谓词将每个消息发布到一个observable中,然后返回true。为了隐藏这种不纯性,可能可以创建一个扩展方法ObservableLinkTo,它创建observable并同时调用常规的LinkTo - theStrawMan
1
@theStrawMan 谢谢。我跟一个朋友说了,他正在实现几个新的区块,这将允许这样做。我会叫他在这里发布它们。 - user4388177
显示剩余9条评论
4个回答

5
您的代码存在问题,原因是您将block1的两个消费者连接在一起。Dataflow只会向其中一个消费者提供值。
因此,您需要将block1的值广播到另外两个块中,以便能够独立地消费这些值。
顺便提一下,不要使用.Publish().RefCount(),因为它并不做你想象的那样。它实际上会生成一个仅运行一次的Observable,在该运行期间允许多个观察者连接并查看相同的值。它与数据源以及Dataflow块的交互方式无关。
请尝试以下代码:
// Set up mesh
var block1 = new TransformBlock<int, int>(i => i + 20);
var block_boadcast = new BroadcastBlock<int>(i => i, new DataflowBlockOptions());
var block_buffer = new System.Threading.Tasks.Dataflow.BufferBlock<int>();
var block2 = new ActionBlock<int>(i => Debug.Print("block2:" + i.ToString()));
var obs = block_buffer.AsObservable();
var l1 = block1.LinkTo(block_boadcast);
var l2 = block_boadcast.LinkTo(block2);
var l3 = block_boadcast.LinkTo(block_buffer);

// Progress
obs.Subscribe(i => Debug.Print("progress:" + i.ToString()));

// Start
var vals = Enumerable.Range(1, 5);
foreach (var v in vals)
{
    block1.Post(v);
}
block1.Complete();

这给了我:

block2:21
block2:22
block2:23
block2:24
block2:25
progress:21
progress:22
progress:23
progress:24
progress:25

这应该是您想要的。

顺便提一下,对于此操作使用Rx可能是更好的选择。它比任何TPL或Dataflow选项都更强大且声明式。

您的代码归结为以下内容:

Observable
    .Range(1, 5)
    .Select(i => i + 20)
    .Do(i => Debug.Print("progress:" + i.ToString()));
    .Subscribe(i => Debug.Print("block2:" + i.ToString()));

那基本上会得到相同的结果。

@theStrawMan - 缓冲区块只是为了可观察对象而存在的。它不应该影响你的其他数据流程。 - Enigmativity
啊,抱歉我看错了。我需要测试一下你的解决方案。我承认我实际上不知道broadcast_block是否有丢失消息的风险,如果block2拒绝它,broadcast_block的当前消息是否会被替换掉?我希望不会,但我不确定。如果不会,那么这确实是一个合理的解决方案。 - theStrawMan
我刚刚测试了一下,不幸的是它确实丢失了消息。我以为 BroadcastBlock 可能会被实现成在背压情况下向上游块传递“我已满”的信息,这样它们就不会再发送另一个消息了,但事实并非如此。 - theStrawMan
@theStrawMan - 尝试使用 Rx。它会创建反压,但不会丢失值。 - Enigmativity
哦?我的理解是 Rx.NET 没有 反压功能,至少生产者不会意识到下游的阻塞并调整行为。虽然这有点偏离我在这里最初的问题。 - theStrawMan
显示剩余2条评论

2
创建可观察的数据流块时,有两个选项需要考虑。您可以选择:
  1. 每次处理消息时发出通知,或
  2. 当存储在块的输出缓冲区中的先前处理过的消息被链接块接受时发出通知。
这两个选项都有优点和缺点。第一种选项提供及时但无序的通知。第二种选项提供有序但延迟的通知,并且还必须处理块与块之间链接的可处置性。如果在块完成之前手动取消两个块之间的链接,应该如何处理可观察对象?
以下是第一种选项的实现,它创建了一个TransformBlock以及该块的非消耗IObservable。还有一个基于第一个实现的ActionBlock相当的实现(尽管也可以通过复制和调整TransformBlock实现来独立实现,因为代码并不多)。
public static TransformBlock<TInput, TOutput>
    CreateObservableTransformBlock<TInput, TOutput>(
    Func<TInput, Task<TOutput>> transform,
    out IObservable<(TInput Input, TOutput Output,
        int StartedIndex, int CompletedIndex)> observable,
    ExecutionDataflowBlockOptions dataflowBlockOptions = null)
{
    if (transform == null) throw new ArgumentNullException(nameof(transform));
    dataflowBlockOptions = dataflowBlockOptions ?? new ExecutionDataflowBlockOptions();

    var semaphore = new SemaphoreSlim(1);
    int startedIndexSeed = 0;
    int completedIndexSeed = 0;

    var notificationsBlock = new BufferBlock<(TInput, TOutput, int, int)>(
        new DataflowBlockOptions() { BoundedCapacity = 100 });

    var transformBlock = new TransformBlock<TInput, TOutput>(async item =>
    {
        var startedIndex = Interlocked.Increment(ref startedIndexSeed);
        var result = await transform(item).ConfigureAwait(false);
        await semaphore.WaitAsync().ConfigureAwait(false);
        try
        {
            // Send the notifications in synchronized fashion
            var completedIndex = Interlocked.Increment(ref completedIndexSeed);
            await notificationsBlock.SendAsync(
                (item, result, startedIndex, completedIndex)).ConfigureAwait(false);
        }
        finally
        {
            semaphore.Release();
        }
        return result;
    }, dataflowBlockOptions);

    _ = transformBlock.Completion.ContinueWith(t =>
    {
        if (t.IsFaulted) ((IDataflowBlock)notificationsBlock).Fault(t.Exception);
        else notificationsBlock.Complete();
    }, TaskScheduler.Default);

    observable = notificationsBlock.AsObservable();
    // A dummy subscription to prevent buffering in case of no external subscription.
    observable.Subscribe(
        DataflowBlock.NullTarget<(TInput, TOutput, int, int)>().AsObserver());
    return transformBlock;
}

// Overload with synchronous lambda
public static TransformBlock<TInput, TOutput>
    CreateObservableTransformBlock<TInput, TOutput>(
    Func<TInput, TOutput> transform,
    out IObservable<(TInput Input, TOutput Output,
        int StartedIndex, int CompletedIndex)> observable,
    ExecutionDataflowBlockOptions dataflowBlockOptions = null)
{
    return CreateObservableTransformBlock(item => Task.FromResult(transform(item)),
        out observable, dataflowBlockOptions);
}

// ActionBlock equivalent (requires the System.Reactive package)
public static ITargetBlock<TInput>
    CreateObservableActionBlock<TInput>(
    Func<TInput, Task> action,
    out IObservable<(TInput Input, int StartedIndex, int CompletedIndex)> observable,
    ExecutionDataflowBlockOptions dataflowBlockOptions = null)
{
    if (action == null) throw new ArgumentNullException(nameof(action));
    var block = CreateObservableTransformBlock<TInput, object>(
        async item => { await action(item).ConfigureAwait(false); return null; },
        out var sourceObservable, dataflowBlockOptions);
    block.LinkTo(DataflowBlock.NullTarget<object>());
    observable = sourceObservable
        .Select(entry => (entry.Input, entry.StartedIndex, entry.CompletedIndex));
    return block;
}

// ActionBlock equivalent with synchronous lambda
public static ITargetBlock<TInput>
    CreateObservableActionBlock<TInput>(
    Action<TInput> action,
    out IObservable<(TInput Input, int StartedIndex, int CompletedIndex)> observable,
    ExecutionDataflowBlockOptions dataflowBlockOptions = null)
{
    return CreateObservableActionBlock(
        item => { action(item); return Task.CompletedTask; },
        out observable, dataflowBlockOptions);
}

在 Windows Forms 中的使用示例:

private async void Button1_Click(object sender, EventArgs e)
{
    var block = CreateObservableTransformBlock((int i) => i + 20,
        out var observable,
        new ExecutionDataflowBlockOptions() { BoundedCapacity = 1 });

    var vals = Enumerable.Range(1, 20).ToList();
    TextBox1.Clear();
    ProgressBar1.Value = 0;

    observable.ObserveOn(SynchronizationContext.Current).Subscribe(onNext: x =>
    {
        TextBox1.AppendText($"Value {x.Input} transformed to {x.Output}\r\n");
        ProgressBar1.Value = (x.CompletedIndex * 100) / vals.Count;
    }, onError: ex =>
    {
        TextBox1.AppendText($"An exception occured: {ex.Message}\r\n");
    },
    onCompleted: () =>
    {
        TextBox1.AppendText("The job completed successfully\r\n");
    });

    block.LinkTo(DataflowBlock.NullTarget<int>());

    foreach (var i in vals) await block.SendAsync(i);
    block.Complete();
}

在上面的例子中,observable变量的类型是:
IObservable<(int Input, int Output, int StartedIndex, int CompletedIndex)>

两个索引都是从1开始计数的。

0

尝试替换:

obs.ForEachAsync(i => Debug.Print("progressBlock:" + i.ToString()));

使用:

obs.Subscribe(i => Debug.Print("progressBlock:" + i.ToString()));

我想象中 ForEachAsync 方法没有正确挂钩/触发,但是异步部分出现了一些奇怪的问题。


谢谢您的建议,但是我得到了完全相同的行为。 - theStrawMan
糟糕,看到@VMAtm的回答了,也许你可以将这两个块解耦,使用你的observable作为唯一的消费者,然后将观察到的值输入到第二个块中?这样,你就可以尽情地使用观察到的值了。 - Clint

0
通过为链中的块指定BoundedCapacity,您创建了这样一种情况:目标块拒绝了一些消息,因为ActionBlock的缓冲区已满,并且消息被拒绝。
通过从缓冲块创建可观察对象,您提供了一种竞争条件:有两个数据消费者同时获取消息。 TPL Dataflow中的块将数据传播到第一个可用的消费者,这会导致应用程序处于不确定状态。
现在,回到您的问题。您可以引入BroadcastBlock,因为它向所有消费者提供数据的副本,而不仅仅是第一个消费者,但在这种情况下,您必须删除缓冲区大小限制,广播块就像电视频道,您无法获得以前的节目,只能看到当前的节目。
附注:您没有检查Post方法的返回值,您可以考虑使用await SendAsync,并为起始点块设置BoundedCapacity以获得更好的调节效果,而不是为中间块设置。

谢谢,我的数据流系统是下游受限的事实是有意为之的 - 在我看来,这是 Dataflow 库发挥作用的一个用例。我会添加一条相关说明。因此,我不会删除 BoundedCapacity=1 设置,并且 BroadcastBlock 也不适合。 - theStrawMan
仍然不明白为什么需要这样的限制,能否详细说明一下?如果缓冲区有限,数据流会增加一些小的开销。 - VMAtm

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接