使用响应式扩展中的Observable.Publish

3

我对使用Observable.Publish进行多播处理的生命周期有点困惑。如何正确使用connect?令人意外的是,我发现我不需要调用connect来启动多播观察者的订阅。

var multicast = source.Publish();
var field0 = multicast.Select(record => record.field0);
var field1 = multicast.Select(record => record.field1);

// Do I need t*emphasized text*o call here?
var disposable = multicast.connect()

// Does calling 
disposable.Dispose();
// unsubscribe field0 and field1?

编辑

当我没有显式调用IConnectableObservable的Connect方法时,我为什么能够成功订阅是我的疑惑。然而,我在IConnectableObservable上调用了Await方法,该方法隐式地调用了Connect方法。

Public Async Function MonitorMeasurements() As Task


    Dim cts = New CancellationTokenSource

    Try
        Using dialog = New TaskDialog(Of Unit)(cts)

            Dim measurementPoints = 
                MeasurementPointObserver(timeout:=TimeSpan.FromSeconds(2)).
                TakeUntil(dialog.CancelObserved).Publish()

            Dim viewModel = New MeasurementViewModel(measurementPoints)
            dialog.Content = New MeasurementControl(viewModel)
            dialog.Show()

            Await measurementPoints
        End Using
    Catch ex As TimeoutException
        MessageBox.Show(ex.Message)
    Catch ex As Exception
        MessageBox.Show(ex.Message)
    End Try

End Function

请注意,我的TaskDialog公开了一个名为CancelObserved的observable,用于在按下取消按钮时观察。

解决方案

解决方案由@asti发布的链接中提供。以下是来自该链接中RX团队的引用:

请注意,使用await会通过导致订阅来使observable序列变热。本次发布中包括IConnectableObservable的await支持,这将导致连接序列到其源以及订阅它。如果没有Connect调用,await操作将永远无法完成。


我想我们都编辑了链接。哦,算了! - Asti
2个回答

6
在源上发布返回一个>,它本质上是带有Connect方法的>。您可以使用Connect和它返回的来控制对源的订阅。
Rx旨在成为一个“fire and forget”系统。除非您明确处理它们或它们完成/出错,否则订阅不会终止。
例如,disp0 = field0.Subscribe(...); disp1 = field1.Subscribe(...) - 订阅不会终止,直到显式处理disp0, disp1 - 这与连接到多播源无关。
您可以连接和断开连接而不干扰下面的管道。不用担心手动管理连接的更简单的方法是使用.Publish().RefCount(),只要至少有一个观察者仍然订阅它,它就会保持连接。这被称为预热observable。

更新问题后的版本

OP在IConnectableObservable<T>上调用了await

来自Rx的发布说明:

..使用await使可观察序列变为热的,通过引起订阅。这个版本中包含了对IConnectableObservable的await支持,它会导致连接序列到其源以及订阅它。如果没有调用Connect,await操作将永远不会完成。

示例(取自同一页)

static async  void Foo()
{
    var xs = Observable.Defer(() =>
    {
        Console.WriteLine("Operation started!");
        return Observable.Interval(TimeSpan.FromSeconds(1)).Take(10);
    });

    var ys = xs.Publish();

    // This doesn't trigger a connection with the source yet.
    ys.Subscribe(x => Console.WriteLine("Value = " + x));

    // During the asynchronous sleep, nothing will be printed.
    await Task.Delay(5000);

    // Awaiting causes the connection to be made. Values will be printed now,
    // and the code below will return 9 after 10 seconds.
    var y =  await ys;
    Console.WriteLine("Await result = " + y);
}

我的困惑在于,我只调用了.Publish()而没有在结果上调用connect,但它仍然可以工作。 - bradgonesurfing
啊哈!!我正在调用await来等待IConnectableObservable序列完成。我猜这其实是在隐式地调用Connect :) - bradgonesurfing
1
@bradgonesurfing 哇,我真的没有想到。这是公告链接:http://social.msdn.microsoft.com/Forums/en-US/rx/thread/23062737-e154-41af-99f6-45d819992254/ 请看第二篇帖子。 - Asti
请注意,使用await会使可观察序列变为热序列,因为它会导致订阅发生。在此版本中包含了对IConnectableObservable<T>的await支持,这将导致连接序列到其源并订阅它。如果没有调用Connect,则await操作将永远无法完成。 - bradgonesurfing
在阅读了您提供的链接并了解了“随处等待”的概念后,我有了一种顿悟。请参见https://dev59.com/wmnWa4cB1Zd3GeqP5egK - bradgonesurfing
1
回答你的问题 - F# 的计算表达式是用于单子结构的表达式重写,而 async 则是将用户代码重写为状态机 - 它不太强大。在 C# 中尝试这样做会让你与语言作斗争。 - Asti

5
发布(Publish)功能允许您共享一个订阅。这对于将一个冷(Cold)的可观测序列变为热(Hot)的是非常有用的。也就是说,将会导致某些订阅影响的序列(比如连接到网络)执行一次,并确保该影响仅执行一次,然后在消费者之间共享序列结果。
实际上,在您的冷序列上调用发布(publish),在订阅消费者之后连接(connect)已发布的序列,以减轻任何竞争条件。
所以,基本上,您已经完成了上述操作。
对于已经是热序列(Hot sequences)的情况,如主题(Subjects)、FromEventPattern或已发布和连接的内容,它们很大程度上没有意义。
从Connect()方法处处置值将“断开”序列,从而防止消费者获得更多值。如果其中任何订阅想要提前分离,则也可以处置消费者订阅。
尽管如此,您似乎正在做正确的事情。您遇到的问题是什么?我假设您正在连接到一个已经是热序列(Hot sequence)。

连接不是“热的”。订阅后将建立一个UDP连接,取消订阅后UDP连接应该关闭。但是解析UDP帧会导致每个帧有几个字段。每个字段都需要设置为可观察对象,因为它们具有不同的客户端观察者。如果我不调用Publish,则会创建多个连接到同一套接字,这当然是错误的。调用Publish可以给我正确的结果,但我似乎不必调用Connect,这是一个谜题。 - bradgonesurfing
啊...也许您有一个淘气的方法?当您连接到消息层并且没有将代码包装在Observable.Create(...)中时,可能会出现返回渴望值(非惰性)IObservable的方法的情况。 - Lee Campbell
没有看到问题的更新,我通过 @asti 发布了解决方案。在 ICOnnectableObservable 上调用 await 隐式地调用 connect。 - bradgonesurfing
忽略那个..我看到你现在有答案了。是的,你希望Await连接可观察对象,否则它怎么会被连接呢? - Lee Campbell

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接