RxJS:WebSocket 重新连接处理

4
我创建了一个服务来处理WebSocket连接以发送和接收消息,在其中我处理了一些可能会导致断开连接的情况以及如何从中恢复,但是我忽略了浏览器本身(话说)自动重试的情况,即当navigator.onLine发出true值时。

到目前为止,通过使用retryWhen运算符,当服务器端出现错误时,我能够创建一个新的会话并重新启动它,直到达到最大重试次数。 但是,通过使用navigator.onLine(尽管不可靠,但我知道它适用于我们正在使用的浏览器),我可以在客户端离线时重新启动重新连接过程。

我有这个在线检查器observable:

private onlineCheck: Observable<boolean>;

this.onlineCheck = merge(
      of(navigator.onLine),
      fromEvent(window, 'online').pipe(mapTo(true)),
      fromEvent(window, 'offline').pipe(mapTo(false)));

这是我创建WebSocket并处理重新连接的方法:
public connect(): void {
  // no more than one connection simultaneously
  if (!(this.socket == null)) {
    this.socket.complete();
  }

  this.socket = new WebSocketSubject(this.config);
  this.socket
    .pipe(
      retryWhen((errors) => {
        return errors.pipe(
          tap((error) => { console.log('Error: ', error); }),
          concatMap((e, i) =>
            iif(
              () => i < this.reconnectAttempts,
              of(e).pipe(delay(this.reconnectInterval)),
              throwError('Max amount of retries reached')
            )));
      }))
    .subscribe(
      (message: MessageEvent) => { this.onMessage(message); },
      (error: Event) => { this.onError(error); },
      () => { console.log('completed'); });
  }

如何将onlineCheck订阅与retryWhen操作符结合使用,以便在客户端下线(且达到最大连接重试次数)并重新上线时重试它?如果这不是最佳方法,请建议其他方法。
以下是一个完整的StackBlitz示例(请检查控制台日志以获得更好的反馈):https://stackblitz.com/edit/angular-f4oa3y
1个回答

1
在您的iif真条件中,请使用此可观察对象,而不是of(e).pipe(delay(this.reconnectInterval)):
timer(this.reconnectInterval) // wait for reconnect interval
  .pipe(
    concatMap(() => onlineCheck),
    first(Boolean) // now wait for online to be true
  )

这将发出一个可观察对象,在重新连接间隔在线检查为true后产生单个值


谢谢你的回复,但我认为这并没有按预期工作。我甚至在其他地方为onlineCheck添加了一个订阅,每当navigator.onLine改变状态时,该订阅会发出新值,但是你添加的这个新的重试流程却没有发生。 正如我在问题中提到的,可能有一些情况下reconnectAttempts将达到其最大值;如果navigator.onLinefalse,这实际上可能发生。因此,当它再次发出true值时,应该再次尝试连接套接字。 - NunoM

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接