RxJava条件延迟响应

4

我希望将响应最大延迟500毫秒。 如果updateData花费了400毫秒,那么我希望延迟100毫秒。

以下代码总是延迟500毫秒。 目前,如果后端需要600毫秒,则总延迟为1100毫秒,但如果超过500毫秒,我不想延迟。

compositeDisposable.add(someRepo.updateData(dataRequest)
.observeOn(schedulerProvider.mainThread())
.delay(500, TimeUnit.MILLISECONDS)
.subscribeWith(dataUpdateObserver())
2个回答

1
尝试这样做。在启动someRepo.updateData(dataRequest)时,同时启动一个计时器。然后在完成someRepo.updateData(dataRequest)时获取计时器计数并找到所需的延迟时间。
如果已经超过500毫秒,则不需要进一步延迟。否则找到所需的延迟时间(500- elapsedTime)。以下代码执行相同操作。这里someRepo.updateData(dataRequest)返回Single<DataResponse>
   Subject<Boolean> timerStopper = PublishSubject.create(); // To terminate the timer
   Observable<Long> timer = Observable.interval(1, TimeUnit.MILLISECONDS)
                .takeUntil(timerStopper).publish().refCount();


   someRepo.updateData(dataRequest).doOnSubscribe(__ -> timer.subscribe())
  .flatMap(dataResponse -> timer.flatMapSingle(elapsed -> {
         timerStopper.onNext(true); // take the elapsed time and terminate the timer
         long requiredDelay = elapsed > MAX_DELAY ? 0 : MAX_DELAY - elapsed; // find the required delay from elapsed. If its already exceeded Maximum delay, no delay is required
         Log.d(TAG, "requiredDelay: " + requiredDelay);
         return Single.fromCallable(() -> dataResponse).delay(requiredDelay, TimeUnit.MILLISECONDS);})
        .map(data -> data).firstOrError())
   .subscribeWith(dataUpdateObserver());

-1
你可以定义一个全局变量来获取updateData操作的总时间,根据它所花费的时间,使用相应的延迟值。
long total = 0;

@Test
public void customDelay() {
    long start = System.currentTimeMillis();
    Subscription subscription = Observable.just("hello reactive world with custom delay")
            .map(value -> {
                //updateData logic
                try {
                    Thread.sleep(new Random().nextInt(600));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                total = System.currentTimeMillis() - start;
                if (total > 500) total = 0;
                return value;
            })
            .delay(total, TimeUnit.MILLISECONDS)
            .subscribe(n -> System.out.println("time:" + (System.currentTimeMillis() - start)));
    new TestSubscriber((Observer) subscription).awaitTerminalEvent(1000, TimeUnit.MILLISECONDS);
}

您可以在此处查看示例运行https://github.com/politrons/reactive/blob/master/src/test/java/rx/observables/utils/ObservableDelay.java


嗨@paul,感谢您的帮助。但在我的情况下,Single<DataResponse> updateData(DataRequest dataRequest);我如何使用它,因为我没有使用Observable.just。让我们看一下我的片段someRepo.updateData(dataRequest) .observeOn(schedulerProvider.mainThread()) .delay(500, TimeUnit.MILLISECONDS) .subscribeWith(dataUpdateObserver() - Bulu

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接