RxJava / RxJs: 如何合并两个源observable,但在其中一个完成时立即完成

9

我有两个源可观察对象。

我想合并这两个源可观察对象,但是合并后的可观察对象应该在其中一个源可观察对象完成时立即完成。

所需行为:

Source 1: ---1--------3--4-----------------------------x
Source 2: -------2----------x
"merged"  ---1---2----3--4--x

如果其中一个源发生错误,错误应该传播到合并的可观察对象:

Source 1: ---1--------3--4-----------------------------x
Source 2: -------2----------e
"merged"  ---1---2----3--4--ex

“合并”操作符仅在两个源都完成时才完成合并流:
Source 1: ---1--------3--4-----------------------------x
Source 2: -------2----------x
"merged"  ---1---2----3--4-----------------------------x

我该如何实现我想要的行为?
4个回答

12

您需要处理元数据,即每个可观测对象的信息。为此,请在每个流上使用materialize()操作符,并在合并的流上使用dematerialize()以实际发出数据。

Observable.merge( observableA.materialize(),
                  observableB.materialize() )
  .takeWhile( notification -> notification.hasValue() )
  .dematerialize()
  .subscribe( ... );

这将合并这两个可观察对象,直到其中一个完成或发出错误。


1
不错!有几个语法纠正:-> 应该改为 =>hasValue 是一个属性而不是一个函数。 - bygrace
抱歉,我正在用Java编写回复。需要翻译成其他语言。 - Bob Dalgleish
啊,我在想用 JavaScript,但我猜这个问题标记了两种语言 :) - bygrace
1
takeWhile不会将失败转换为完成,似乎只需要dematerialize,当任一源发出完成或失败时,它将发出完成或失败,并终止合并。 - Michael Krussel
@MichaelKrussel的评论是正确的。TakeWhile将隐藏失败。另一方面,简单地省略它并Dematerializing将产生预期的结果,并在任何可观察到的错误或完成时发出错误或完成。 - adrian h.

1
我希望有人能提供更优雅的方法,但这个方法有效。
我认为你需要使用其中一个“take”运算符。当一个源完成时,你可以像这样完成所有源:

const a = Rx.Observable.interval(1000).take(3).map(x => `a${x}`);
const b = Rx.Observable.interval(800).take(6).map(x => `b${x}`);
Rx.Observable.merge(a.takeUntil(b.last()), b.takeUntil(a.last()))
  .subscribe(
    x => { console.log('next', x); },
    null,
    () => { console.log('complete'); }
  );
  
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.5/Rx.min.js"></script>

或者一个不太易读但更具可扩展性的版本:

function merge(...obs) {
  return Rx.Observable.merge(...obs.map(x => x.takeUntil(Rx.Observable.race(obs.filter(y => y !== x).map(z => z.last())))));
}

const a = Rx.Observable.interval(1000).take(3).map(x => `a${x}`);
const b = Rx.Observable.interval(800).take(6).map(x => `b${x}`);

merge(a, b)
  .subscribe(
    x => { console.log('next', x); },
    null,
    () => { console.log('complete'); }
  );
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.5/Rx.min.js"></script>

这是一个与错误传播有关的插图:

function merge(...obs) {
  return Rx.Observable.merge(...obs.map(x => x.takeUntil(Rx.Observable.race(obs.filter(y => y !== x).map(z => z.last())))));
}

const a = Rx.Observable.interval(1000).take(3).map(x => `a${x}`);
const b = Rx.Observable.interval(800).take(6).map(x => `b${x}`);
const c = Rx.Observable.timer(2200).map(x => { throw 'oops!'; });

merge(a, b, c)
  .subscribe(
    x => { console.log('next', x); },
    x => { console.log('error', x); },
    () => { console.log('complete'); }
  );
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.5/Rx.min.js"></script>

在合并操作中使用 takeUntil 需要小心,因为你会丢失最后一个发出的值。


1
当一个可观察对象完成时,它不会发出任何值,但我们可以将其与另一个发出单个值的“信号”可观察对象concat在一起。然后,我们可以使用takeWhile操作符来监视“信号”可观察对象的值。
当然,您必须确保“信号”可观察对象发出的值不是可合并的可观察对象可能发出的值-如果takeWhile谓词通过引用比较,则空对象就足够了。
以下是一个示例:

const obs1$ = Rx.Observable.interval(1000)
    .map(x => `obs1: ${x}`)
    .take(5);

const obs2$ = Rx.Observable.interval(300)
    .map(x => `obs2: ${x}`)
    .take(9);

const signalFinishMessage = {};
const signalFinish$ = Rx.Observable.of(signalFinishMessage);

Rx.Observable.merge(obs1$.concat(signalFinish$), obs2$.concat(signalFinish$))
    .takeWhile(x => x !== signalFinishMessage)
    .subscribe(
        x => console.log(x),
        err => console.log('received error:', err),
        () => console.log('complete')
    );
    
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.5/Rx.min.js"></script>

错误也会被传播:

const obs1$ = Rx.Observable.interval(1000)
    .map(x => `obs1: ${x}`)
    .take(5);

const obs2$ = Rx.Observable.interval(300)
    .map(x => `obs2: ${x}`)
    .take(9)
    .concat(Rx.Observable.throw(`the world's about to end`));

const signalFinishMessage = {};
const signalFinish$ = Rx.Observable.of(signalFinishMessage);

Rx.Observable.merge(obs1$.concat(signalFinish$), obs2$.concat(signalFinish$))
    .takeWhile(x => x !== signalFinishMessage)
    .subscribe(
        x => console.log(x),
        err => console.log('received error:', err),
        () => console.log('complete')
    );
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.5/Rx.min.js"></script>


1
我最终自己编写了代码:
import { Observable } from 'rxjs';

export function whileAll<T>(...observables: Observable<T>[]): Observable<T> {
  return new Observable<T>(function (observer) {
    if (observables.length === 0)
      observer.complete();
    else {
      const next = observer.next.bind(observer);
      const error = observer.error.bind(observer);
      const complete = observer.complete.bind(observer);
      for (let i = 0; i < observables.length; i++)
        observer.add(observables[i].subscribe(next, error, complete));
    }
  });
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接