如何使一个Observable序列在发出之前等待另一个序列完成?

132

假设我有一个 Observable,像下面这样:

var one = someObservable.take(1);

one.subscribe(function(){ /* do something */ });

接着,我有了第二个 Observable

var two = someOtherObservable.take(1);

现在,我想要subscribe()two,但是我希望确保在触发two的订阅器之前已经完成了one

我可以在two上使用什么样的缓冲方法来使第二个订阅器等待第一个订阅器完成呢?

我想我正在寻找一种方法来暂停two直到one完成。


1
我认为这个问题的答案是使用 .exhaustMap() 方法,但我不会假装知道如何实现它 - 完整描述在这里:https://blog.angular-university.io/rxjs-higher-order-mapping/ - Peter Nixey
11个回答

71

我能想到几种方式

import {take, publish} from 'rxjs/operators'
import {concat} from 'rxjs'

//Method one

var one = someObservable.pipe(take(1));
var two = someOtherObservable.pipe(take(1));
concat(one, two).subscribe(function() {/*do something */});

//Method two, if they need to be separate for some reason
var one = someObservable.pipe(take(1));
var two = someOtherObservable.pipe(take(1), publish());
two.subscribe(function(){/*do something */});
one.subscribe(function(){/*do something */}, null, two.connect.bind(two));

2
最终我使用了pauseresume而不是publishconnect,但第二个示例基本上是我采取的路线。 - Stephen
3
在subscribe()函数中,这种方法是否总是会让第一个observable(one)先被解析,然后才是第二个(two)? - John
1
为什么不使用 Observable.forkJoin()?请参考此链接 https://www.learnrxjs.io/operators/combination/forkjoin.html - mspasiuk
23
根据OP的要求,他们只希望第二个订阅者在第一个完成后才订阅。而forkJoin会同时订阅。 - paulpdaniels
@paulpdaniels 一个问题需要我理解:在第一种方法中,调用 concat(one, two) 上的 .pipe(take(1)) 不就足够了吗?这样做:concat(one, two).pipe(take(1)).subscribe(function() { /* do something */}); - Spray'n'Pray
1
@Spray'n'Pray 不行,因为这样会在从 one 接收到第一个值后完成订阅,所以它甚至不会订阅 two - paulpdaniels

25

如果您希望确保执行顺序保持不变,您可以像以下示例一样使用flatMap。

const first = Rx.Observable.of(1).delay(1000).do(i => console.log(i));
const second = Rx.Observable.of(11).delay(500).do(i => console.log(i));
const third = Rx.Observable.of(111).do(i => console.log(i));

first
  .flatMap(() => second)
  .flatMap(() => third)
  .subscribe(()=> console.log('finished'));

结果将会是:

"1"
"11"
"111"
"finished"

24

skipUntil()与last()方法

skipUntil: 忽略直到另一个observable发出值

last: 从序列中发出最后一个值(即等待序列完成后再发出)

请注意,任何从传递给skipUntil的observable中发出的内容都会取消忽略,这就是为什么我们需要添加last() - 等待流完成。

main$.skipUntil(sequence2$.pipe(last()))

官方文档:https://rxjs-dev.firebaseapp.com/api/operators/skipUntil


可能的问题:请注意,如果没有发出任何内容,last()本身会出错。当与谓词一起使用时,last()运算符确实具有default参数。我认为,如果这种情况对您造成了问题(如果sequence2$可能在不发出任何内容的情况下完成),则应该使用其中一个解决方案(目前未经测试):
main$.skipUntil(sequence2$.pipe(defaultIfEmpty(undefined), last()))
main$.skipUntil(sequence2$.pipe(last(), catchError(() => of(undefined))

请注意,undefined是可以被发出的有效项,但实际上它可以是任何值。还要注意,这是连接到sequence2$而不是main$的管道。

非常笨拙的演示:https://angular-vgznak.stackblitz.io 您需要点击打开控制台托盘。 - Simon_Weaver
你的语法有误。skipUntil不能直接附加到observable上,否则会出现以下错误:“在类型'Observable<any>'上不存在属性'skipUntil'。”你需要先通过.pipe()运行它。 - London804
是的,这是在管道被要求之前的旧答案。谢谢你提到它。我现在会更新它,但我正在使用手机。请随意编辑答案。 - Simon_Weaver

22

这是一种可重复使用的方法(它是TypeScript,但你可以将其适应为JS):

function waitFor<T>(signal: Observable<any>) {
    return (source: Observable<T>) => signal.pipe(
        first(),
        switchMap(_ => source),
    );
}

你可以像使用任何运算符一样使用它:

var two = someOtherObservable.pipe(waitFor(one), take(1));

它基本上是一种运算符,它推迟了对源可观测对象的订阅,直到信号可观测对象发出第一个事件。


有没有这个可重用函数的 RxSwift 版本? - sujith1406
1
请注意,这里与OP请求的内容有轻微的语义差异。在这种情况下,如果您请求“first”,并且信号“Observable”为空,则会出现错误。OP的示例使用了“take”,这将允许源为空但仍然运行第二个。 - paulpdaniels
虽然这可能与OP略有偏差,但我发现我的问题很简单,我想要一个可观察对象与NGRX通信,等待一个正在执行与firestore分开查找的可观察对象(明显比NGRX慢)。这个等待解决了我的问题,没有任何性能或内存影响 - 做得好。 - KeaganFouche

15

这里是利用switchMap的结果选择器的另一种可能性

var one$ = someObservable.take(1);
var two$ = someOtherObservable.take(1);
two$.switchMap(
    /** Wait for first Observable */
    () => one$,
    /** Only return the value we're actually interested in */
    (value2, value1) => value2
  )
  .subscribe((value2) => {
    /* do something */ 
  });

由于switchMap的结果选择器已经被弃用,这是更新后的版本。

const one$ = someObservable.pipe(take(1));
const two$ = someOtherObservable.pipe(
  take(1),
  switchMap(value2 => one$.map(_ => value2))
);
two$.subscribe(value2 => {
  /* do something */ 
});

6
如果第二个可观察对象是的,那么有另一种方法来实现暂停/恢复
var pauser = new Rx.Subject();
var source1 = Rx.Observable.interval(1000).take(1);
/* create source and pause */
var source2 = Rx.Observable.interval(1000).pausable(pauser);

source1.doOnCompleted(function () { 
  /* resume paused source2 */ 
  pauser.onNext(true);
}).subscribe(function(){
  // do something
});

source2.subscribe(function(){
  // start to recieve data 
});

另外,您还可以使用缓冲版本的pausableBuffered,以便在暂停期间保留数据。


5

这是使用TypeScript编写的自定义操作符,它在发出结果之前等待一个信号:

export function waitFor<T>(
    signal$: Observable<any>
) {
    return (source$: Observable<T>) =>
        new Observable<T>(observer => {
            // combineLatest emits the first value only when
            // both source and signal emitted at least once
            combineLatest([
                source$,
                signal$.pipe(
                    first(),
                ),
            ])
                .subscribe(([v]) => observer.next(v));
        });
}

你可以这样使用它:
two.pipe(waitFor(one))
   .subscribe(value => ...);

1
不错的模式!你甚至可以使用 three.pipe(waitFor(one), waitFor(two), take(1))。 - David Rinck
你在一个不正常的操作符内进行了订阅,依我看。 - Mehdi Benmoha
@MehdiBenmoha 为什么这样做?这是使用 first(), 运算符的订阅。我认为在性能方面是安全的。 - Sergiu

2

这里有另一种方法,我觉得更加直接和直观(或者至少对于习惯Promise的人来说更自然)。基本上,你可以使用Observable.create()创建一个Observable,将onetwo包裹在一个Observable中。这与Promise.all()的工作方式非常相似。

var first = someObservable.take(1);
var second = Observable.create((observer) => {
  return first.subscribe(
    function onNext(value) {
      /* do something with value like: */
      // observer.next(value);
    },
    function onError(error) {
      observer.error(error);
    },
    function onComplete() {
      someOtherObservable.take(1).subscribe(
        function onNext(value) {
          observer.next(value);
        },
        function onError(error) {
          observer.error(error);
        },
        function onComplete() {
          observer.complete();
        }
      );
    }
  );
});

所以,这里发生了什么?首先,我们创建了一个新的Observable。传递给Observable.create()的函数,适当地命名为onSubscription,会接收从参数中传递给subscribe()的观察者对象(类似于创建新Promise时传递给resolvereject的参数合并到一个对象中)。这就是我们的魔法所在。

onSubscription中,我们订阅了第一个Observable(在上面的示例中,它被称为one)。我们如何处理nexterror由你决定,但默认提供的示例对大多数情况来说应该是适当的。然而,当我们接收到complete事件时,也就是说one现在已经完成,我们可以订阅下一个Observable;这样,在第一个Observable完成后,第二个Observable就会被触发。

针对第二个Observable提供的示例观察者非常简单。基本上,second现在的行为就像OP中预期的two一样。更具体地说,second将发出someOtherObservable发出的第一个值,然后完成(由于take(1)),假设没有错误。

示例

以下是一个完整的工作示例,如果您想看到我的示例在实际运行中的效果,可以将其复制/粘贴:

var someObservable = Observable.from([1, 2, 3, 4, 5]);
var someOtherObservable = Observable.from([6, 7, 8, 9]);

var first = someObservable.take(1);
var second = Observable.create((observer) => {
  return first.subscribe(
    function onNext(value) {
      /* do something with value like: */
      observer.next(value);
    },
    function onError(error) {
      observer.error(error);
    },
    function onComplete() {
      someOtherObservable.take(1).subscribe(
        function onNext(value) {
          observer.next(value);
        },
        function onError(error) {
          observer.error(error);
        },
        function onComplete() {
          observer.complete();
        }
      );
    }
  );
}).subscribe(
  function onNext(value) {
    console.log(value);
  },
  function onError(error) {
    console.error(error);
  },
  function onComplete() {
    console.log("Done!");
  }
);

如果您观察控制台,上面的示例将打印出以下内容:

1

6

完成!


这是我需要的突破口,以创建自己的定制“cluster(T, X, D)”运算符,该运算符仅处理源中在时间跨度T内的前X个发射,并通过D延迟间隔发出结果。谢谢! - wonkim00
我很高兴它有所帮助,当我意识到这一点时,也感到非常启发。 - c1moore

1

嗯,我知道这很老了,但我认为你可能需要的是:

var one = someObservable.take(1);

var two = someOtherObservable.pipe(
  concatMap((twoRes) => one.pipe(mapTo(twoRes))),
  take(1)
).subscribe((twoRes) => {
   // one is completed and we get two's subscription.
})

0
也许你可以使用 delayWhen 操作符。
我们有两个可观察对象 one$two$。第一个可观察对象延迟 1 秒后发出 1 然后完成。第二个可观察对象只有在 one$ 发出后才会发出 2

const one$ = of(1).pipe(
  delay(1000),
  tap(() => console.log('one$ emitted'))
);

const two$ = of(2).pipe(
  delayWhen(() => one$),
  tap(() => console.log('two$ emitted')),
);

two$.subscribe(n => {
  console.log(`n=${n}`);
});
<script src="https://unpkg.com/rxjs@7.5.5/dist/bundles/rxjs.umd.min.js"></script>
<script>
const {of} = rxjs;
const {delay, delayWhen, tap} = rxjs.operators;
</script>


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接