在rxjs中排队HTTP调用

3
我正在开发一个会话服务,用于检查认证令牌是否过期。如果过期,会进行刷新令牌的调用。在此请求期间,所有传入的请求都应该排队,并在请求完成后一起处理。之后,所有传入的请求可以继续通过而无需排队,直到令牌再次过期。我为此画了一个大理石图表:
1. ---a---b---c--d-----e--
2. -t-------f------t------
3. ---a---b---------cd-e--

我将1.称为incoming$可观察对象,2.是valve$ - 如果它为true,则请求可以通过,如果它为false,则应该排队。当它变为true时,排队的请求会被触发。
到目前为止我做了什么?我认为这可以通过添加一个中间的Observable来完成,称为receiver$,它根据valve$的值改变自己的值。当valve$为true时,它只返回一个简单的主题,如果它为false,则返回一个能够记录值的主题。
receiver$ = valve.pipe(
  map((value) => {
    if (value) {
      return new Subject();
    } else {
      return (new Subject()).pipe(
        shareReplay(),
      );
    }
  })
);

然后,incoming$ 中获取的每个新值都应添加到 recevier$ 中的当前 observable 中:

incoming$.pipe(
  combineLatest(receiver$),
).subscribe((incomingValue, recevier) => {
  recevier.next(incomingValue);
});

这里有一部分我无法理解。每当 valve 变为 true 时,我需要从 receiver$ 中获取最后两个值。倒数第二个值将保存队列,最后一个值将保存活动主题。通过合并它们,我可以实现我的目标。我不知道如何实现这个功能以及如何管理订阅。此外,对于这样一个看似简单的用例来说,这看起来过于复杂。

实现这种行为的最佳方式是什么?


我大约一年前使用了egghead.io来进行RxJs的一些高级操作(现在大部分都忘记了 - 因为我正在做其他事情!)也许可以看看那个网站,或者尝试使用一些Google Kungfu来追踪作者的博客。 - JGFMK
2个回答

1
你可以考虑这样的解决方案。
首先,你需要创建一个主题,通过它发出所有你想要进行的请求。
const requests$ = new Subject<Observable<any>>()

然后,您可以创建一个主题(Subject),通过它来传达阀门的状态,即您是否可以立即执行请求或者必须将其缓冲

const valve$ = new Subject<boolean>();

现在你可以创建一个流,仅在 阀门 打开时传递请求,即仅在 valve$ 发出的最后一个值为 true 时。
const openStream$ = valve$.pipe(
  switchMap(valve => {
    if (valve) {
      return requests$;
    } else {
      return empty();
    }
  })
);

阀门关闭时,您还可以创建一个缓冲所有请求的流

const bufferedStream$ = requests$.pipe(
  bufferToggle(valve$.pipe(filter(valve => !valve)), () => valve$.pipe(filter(valve => valve))),
  mergeMap(bufferedCalls => bufferedCalls)
)

现在您需要做的就是将openStream$bufferedStream$进行merge,并订阅结果流,如下所示。
merge(openStream$, bufferedStream$).pipe(
  mergeMap(request => request)
)
.subscribe(httpCallResult => {// do stuff})

我已经使用以下数据测试了此解决方案,使用字符串的可观察对象模拟真实的http调用。
const requests$ = new Subject<Observable<string>>();
setTimeout(() => {requests$.next(of('A'))}, 50);
setTimeout(() => {requests$.next(of('B'))}, 60);
setTimeout(() => {requests$.next(of('C'))}, 100);
setTimeout(() => {requests$.next(of('D'))}, 110);
setTimeout(() => {requests$.next(of('E'))}, 130);
setTimeout(() => {requests$.next(of('F'))}, 250);
setTimeout(() => {requests$.next(of('G'))}, 260);
setTimeout(() => {requests$.next(of('H'))}, 300);
setTimeout(() => {requests$.next(of('I'))}, 310);
setTimeout(() => {requests$.next(of('L'))}, 330);


const valve$ = new Subject<boolean>();
setTimeout(() => {valve$.next(true)}, 30);
setTimeout(() => {valve$.next(false)}, 80);
setTimeout(() => {valve$.next(true)}, 120);
setTimeout(() => {valve$.next(false)}, 200);
setTimeout(() => {valve$.next(true)}, 290);

1
您可以通过使用concatMap来实现此操作,该操作基于valve$的值合并两个不同的流。请注意,这需要使用share()共享valve$incoming$两个流。
valve$
  .pipe(
    concatMap(v => v
      ? incoming$.pipe(takeUntil(valve$))
      : incoming$
        .pipe(
          takeUntil(valve$),
          bufferCount(Number.POSITIVE_INFINITY),
          mergeAll(),
        )
    ),
  )
  .subscribe(console.log)

现场演示:https://stackblitz.com/edit/rxjs6-demo-d3bsxb?file=index.ts


我也考虑过使用concatMap,但问题在于它会将所有请求排队,如果incoming$为真,则不等待其他请求完成更有效率。 - maxime1992
incoming$ 从未为 true,而 concatMap 也不会排队任何请求。 - martin

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接