RXJS retryWhen 重置等待间隔。

7
我希望通过逐步增加的时间间隔来触发 retrywhen()。
   socketResponse.retryWhen(attempts => {
    return attempts.zip(Observable.range(1, 4)).mergeMap(([error, i]) => {
        console.log(`[socket] Wait ${i} seconds, then retry!`);
        if (i === 4) {
            console.log(`[socket] maxReconnectAttempts ${i} reached!`);
        }
        return Observable.timer(i * 1000);
    });
});

上面的代码运行良好。

在连接错误时(第一次)


  • [socket] 等待1秒,然后重试!//等待1秒
  • [socket] 等待2秒,然后重试!//等待2秒

在连接成功时


//连接成功。

在连接错误时(第二次)


  • [socket] 等待3秒,然后重试!//等待3秒
  • [socket] 等待4秒,然后重试!//等待4秒

现在我想在套接字连接成功时重置等待时间。

期望的输出:

在连接错误时(第一次)


  • [socket] 等待1秒,然后重试!//等待1秒
  • [socket] 等待2秒,然后重试!//等待2秒

在连接成功时


//连接成功。

在连接错误时(第二次)


  • [socket] 等待1秒,然后重试!//等待1秒

  • [socket] 等待2秒,然后重试!//等待2秒

但我不知道如何重置retrywhen()的时间间隔。

3个回答

1
我也遇到了同样的问题。我的目标是使用可观测对象包装套接字连接。我希望网络可观测对象永远不会完成(除非明确要求),并且在出现任何错误时无限重试连接。
以下是我如何实现这一点(使用rxjs 6+版本)。这是我的retryWhen操作符,使用缩放等待时间scalingDuration,并在maxDuration上限内进行无限次重试。
import { Observable, timer } from 'rxjs';
import { mergeMap } from 'rxjs/operators';

export const infiniteRetryStrategy = (opts: { scalingDuration: number, maxDuration: number }, attempt?: () => number) => (attempts: Observable<{}>) => {
    return attempts.pipe(
        mergeMap((error, i) => {
            // use attempt if we have it, otherwise use the number of errors received
            const retryAttempt = attempt ? attempt() : i + 1;
            const waitDuration = Math.min(opts.maxDuration, retryAttempt * opts.scalingDuration);

            console.error(`Got error: ${error}. Retrying attempt ${retryAttempt} in ${waitDuration}ms`);
            return timer(waitDuration);
        })
    );
};

使用方法非常简单。如果您提供了attempt函数,则可以在外部管理重试次数,因此可以在下一个时钟周期上重置它。

import { retryWhen, tap } from 'rxjs/operators';

class Connection {
    private reconnectionAttempt = 0;

    public connect() {
        // replace myConnectionObservable$ by your own observale
        myConnectionObservable$.pipe(
            retryWhen(
                infiniteRetryStrategy({
                    scalingDuration: 1000,
                    maxDuration: 10000
                }, () => ++this.reconnectionAttempt)
            ),
            tap(() => this.reconnectionAttempt = 0)
        ).subscribe(
            (cmd) => console.log(cmd),
            (err) => console.error(err)
        );
    }
}

这并不完美,因为你需要在流之外保留状态,但这是我唯一设法做到的方法。欢迎任何更清晰的解决方案。


0

我最近通过使用一个辅助类来处理retryWhen处理程序来解决了这个问题。这是在RxJava而不是RxJS中,但我认为概念是相同的。在我的情况下,如果超过了最大重试次数,我想要出错,因此您可能需要调整该部分以执行您需要的操作。

private static class RetryWhenHandler implements Function<Observable<? extends Throwable>, Observable<?>> {
    int retryCount = 0;

    @Override
    public Observable<?> apply(Observable<? extends Throwable> attempts) throws Exception {
        if (++retryCount <= MAX_RETRIES) {
            return Observable.timer(i, TimeUnit.Seconds);
        } else {
            return Observable.error(throwable);
        }
    }
}

那么,以您上面的示例为例:

RetryWhenHandler myHandler = new RetryWhenHandler();

socketResponse
    .doOnNext(ignored -> myHandler.retryCount = 0)
    .retryWhen(myHandler);

这应该会在你从可观察对象获取更新时重置 retryCount,以便下次遇到错误时它为 0。


0
使用delayWhen创建一个通知器函数,该函数在每次错误发生后以递增的延迟时间发出信号。在下面的代码片段中,我正在尝试在0.5秒的递增倍数上重试。但是,如果需要,您可以使用不同的时间序列。在这里,我还展示了如何限制重试次数并在这种情况下抛出错误。
socketResponse$
  .pipe(
    retryWhen(errors =>
      errors.pipe(
        // delay(500),                       // fix delay: 0.5 sec
        delayWhen((_,i) => timer(i*500)),    // increasing delay: 0.5 sec multiple
        tap(_ => console.log('retrying...')),
        take(2),                             // max. retries: 2 x
        concat(throwError('number of retries exceeded'))
      )
    )
  )
  .subscribe(
    x => console.log('value:', x),
    e => console.error('error:', e)
  );

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接