RxJS:takeUntil忽略Subject事件

3
在下面的代码示例中,意图是通过向Subject mid$ 发出 1 来阻止来自second$ 的事件。
import { Subject, timer } from "rxjs";
import { switchMap, takeUntil, tap } from "rxjs/operators";

const first$ = timer(1000);
const second$ = timer(2000);

const mid$ = new Subject();

first$.pipe(
  tap(() => { 
    mid$.next(1); 
  }),
  switchMap(() => second$.pipe(
    takeUntil(mid$),
    tap(() => console.log("MISSED!"))
  )),
).subscribe();


mid$.subscribe(() => console.log("RECEIVED"));

Stackblitz

但由于控制台显示以下错误,因此某种原因它无法正常工作:

RECEIVED
MISSED!

即,第mid$.next(1);行中发出的事件未被takeUntil(mid$)考虑在内。

这里的逻辑是什么?

我注意到,如果我将第mid$.next(1);行替换为timer(0).subscribe(() => mid$.next(1));,它可以按预期工作,但我想知道在 RxJS 中处理这种情况的正确方式是什么。

2个回答

1

takeUntil仅在下一次observable发出时取消订阅。它不知道observable之前是否已经发出过。

const first$ = timer(1000);
const second$ = timer(2000);

const mid$ = new Subject();

first$.pipe(
  tap((first) => {
    console.log('first', first)
    mid$.next(1);
    console.log('first', first)
  }),
  switchMap(() => second$.pipe(
    tap((second) => console.log('second', second)),
    takeUntil(mid$), // Unsubscribing next time mid$ emits
    tap(() => console.log("MISSED!"))
  )),
).subscribe(second => console.log('final', second));


mid$.subscribe(() => console.log("RECEIVED"));

这将记录日志。
// 1s passes
first 0
RECEIVED
first 0
// 1s passes
second 0
MISSED!
final 0

这是正在发生的事情:

过了一秒钟,first$ 发出信号并且你使用 switchMap 切换到 second$。再过一秒钟,second$ 发出信号。只有此时你才告诉它在 mid$ 发出 下一个 信号后取消订阅。然而 mid$ 已经发出信号(且不会再次发出)。如果你将 second$ 替换为 interval(2000),你就会明白我的意思。"MISSED!" 将会每隔2秒记录一次。


感谢您的回复!如果我在第一个和第二个发射之间向mid$发送事件,它可以正常工作: timer(1500).subscribe(() => mid$.next(1));Stackblitz - moon-413

1
这样做不会按预期工作。
const first$ = timer(1000);
const second$ = timer(2000);

const mid$ = new Subject();

first$.pipe(
  tap(() => { 
    mid$.next(1);
  }),
  switchMap(() => second$.pipe(
    takeUntil(mid$),
    tap(() => console.log("MISSED!"))
  )),
).subscribe();

因为当到达mid$.next(1);时,switchMap的内部observable还没有被创建。所以,takeUntil还没有订阅那个mid$主题。
使用timer(0).subscribe(() => mid$.next(1));(大致相当于setTimeout(() => mid$.next() , 0))可以解决这个问题,因为在这种情况下,当mid$发出信号时,switchMap已经创建了内部observable。
解决这个问题的一种快速方法可能涉及使用BehaviorSubject而不是Subject,因为BehaviorSubject会向新的订阅者发送上次发出的值:
const first$ = timer(1000);
const second$ = timer(2000);

const mid$ = new BehaviorSubject(null);

first$.pipe(
  tap(() => { 
    mid$.next(1);
  }),
  switchMap(() => second$.pipe(
    // here, when `mid$` is subscribed, the subscriber will receive `1` 
    // and the entire inner observable will complete
    takeUntil(mid$),
    tap(() => console.log("MISSED!"))
  )),
).subscribe();

1
谢谢!使用BehaviorSubject时,takeUntil也会对初始值为null做出反应,即使我们删除了mid$.next(1)这一行,第二个$的发射也将被忽略。因此,我们应该改用类似于以下代码,以仅接受非null值: takeUntil(mid$.pipe(filter(x => x!== null))) - moon-413
顺便问一下,你能否推荐一些好的教程或书籍来学习可观察对象的创建细节,例如可观察对象的创建和订阅的确切顺序? - moon-413
1
@moon-413 当然,关于这个主题可能有很多优秀的资料,但我个人从阅读源代码中学到了很多。你也可以查看ncjamieson.comBen Lesh的文章。 - Andrei Gătej

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接