RxJava2的Publish

9
什么是两者之间的区别?
ObservableTransformer {
    Observable.merge(
        it.ofType(x).compose(transformerherex),
        it.ofType(y).compose(transformerherey)
    )
}

并且

ObservableTransformer {
    it.publish{ shared ->
        Observable.merge(
            shared.ofType(x).compose(transformerherex),
            shared.ofType(y).compose(transformerherey)
        )
    }
}

当我使用这两个运行我的代码时,我得到了相同的结果。在这里 publish 做了什么。
2个回答

26

区别在于顶部的转换器将为下游的单个订阅而两次订阅上游,复制通常不需要的上游任何副作用:

Observable<Object> mixedSource = Observable.<Object>just("a", 1, "b", 2, "c", 3)
      .doOnSubscribe(s -> System.out.println("Subscribed!"));


mixedSource.compose(f ->
   Observable.merge(
      f.ofType(Integer.class).compose(g -> g.map(v -> v + 1)),
      f.ofType(String.class).compose(g -> g.map(v -> v.toUpperCase()))
   )
)
.subscribe(System.out::println);

将打印

Subscribed!
2
3
4
Subscribed!
A
B
C

这里表示的副作用是输出 Subscribed!。根据实际源中的工作,这可能意味着将电子邮件发送两次,两次检索表的行。通过这个特定的示例,您可以看到即使源值在类型上交错,输出也将它们分开。

相比之下,publish(Function) 会为每个终端订阅者建立一个对源的订阅,因此源处的任何副作用只会发生一次。

mixedSource.publish(f ->
   Observable.merge(
      f.ofType(Integer.class).compose(g -> g.map(v -> v + 1)),
      f.ofType(String.class).compose(g -> g.map(v -> v.toUpperCase()))
   )
)
.subscribe(System.out::println);

打印输出

Subscribed!
A
2
B
3
C
4

因为源只订阅一次,每个项目都会被多播到.ofType().compose()的两个“分支”中。


4
我一直在使用Jake Wharton演讲中提到的“状态管理”,直到现在我才知道它是什么。非常简明清晰,非常感谢。 - Hohenheim

2

publish 操作符将您的 Observable 转换为 Connectable Observable

让我们看一下 Connectable Observable 是什么意思:假设您想多次订阅一个 Observable 并希望将相同的项目提供给每个订阅者。您需要使用 Connectable Observable

示例:

var period = TimeSpan.FromSeconds(1);
var observable = Observable.Interval(period).Publish();
observable.Connect();
observable.Subscribe(i => Console.WriteLine("first subscription : {0}", i));
Thread.Sleep(period);
observable.Subscribe(i => Console.WriteLine("second subscription : {0}", i));

输出:

first subscription : 0 
first subscription : 1 
second subscription : 1 
first subscription : 2 
second subscription : 2

在这种情况下,我们在第一次订阅时足够快地进行订阅,但仅限于第一次订阅。第二次订阅较晚,错过了第一次发布。
我们可以将 Connect() 方法的调用移动到所有订阅都完成后再进行。这样,即使有 Thread.Sleep 的调用,我们也不会真正订阅底层,直到两个订阅都完成。操作如下:
var period = TimeSpan.FromSeconds(1);
var observable = Observable.Interval(period).Publish();
observable.Subscribe(i => Console.WriteLine("first subscription : {0}", i));
Thread.Sleep(period);
observable.Subscribe(i => Console.WriteLine("second subscription : {0}", i));
observable.Connect();

输出:

first subscription : 0 
second subscription : 0 
first subscription : 1 
second subscription : 1 
first subscription : 2 
second subscription : 2 

因此,使用Completable Observable,我们有一种控制何时让Observable发出项的方法。
示例来自:http://www.introtorx.com/Content/v1.0.10621.0/14_HotAndColdObservables.html#PublishAndConnect 编辑:根据此链接中第180张幻灯片的说明:
发布的另一个特性是,如果任何观察者在Observable开始发出项目10秒后开始观察,则观察者仅获取在其订阅时(即10秒后)发出的项目,而不是所有项目。因此,在某些情况下,我能理解publish被用于UI事件。任何观察者只应接收在其订阅之后执行的那些事件,而不是所有已发生的事件。
希望对您有所帮助。

感谢@chandil03。是的,我知道发布的想法。但是看一下这个链接上的第180张幻灯片。为什么在调用Observable.merge()之前需要发布? - Hohenheim
@NovoDimaporo,请检查编辑并让我知道,如果您有任何疑问。 - Sachin Chandil
如果您能看到整个代码,就只有一个订阅发生了,没有进一步的订阅。正如您所看到的,它在observableTransformer中使用,发布的observable实际上并没有在外部显示,除非在'Observable.merge()'上,那么它的用途是什么? - Hohenheim
@NovoDimaporo 为什么你只考虑了简单情况?简单情况往往不能代表真实的使用情况。如果你需要使用两个订阅者,或者想要在一系列操作后使用 connect() 操作符让观察者发出项目,那该怎么办呢?如果你没有描述任何这样的用例,那么在这里使用 publish 操作符就没有意义了。 - Sachin Chandil
@chandil03,“Hohenheim”所说的是发布操作符,它接受一个函数作为参数(返回Observable而不是ConnectableObservable),而不是没有参数的publish()操作符。 - HiddenDroid

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接