测试调度程序在RxJava上无法使用

5

我正在尝试测试一个函数,其中流的元素在延迟后逐个分派,我能够使用Thread.sleep使我的测试工作正常。然而,当我使用TestScheduler.advanceTimeBy时,我无法得到任何结果。

请查看以下代码:

public Observable<Object> getDelayedObjects(Observable<Observable<Object>> objectsStreams) {
    objectsStreams.concatMap(objectsStream ->
        objectsStream.repeat().concatMap(object ->
            Observable.just(object)
                      .delay(getDuration(object), TimeUnit.MILLISECONDS)));
}

测试代码如下:

TestScheduler testScheduler = new TestScheduler();
BehaviorSubject<Observable<Object>> objectStreamSubject = BehaviorSubject.create(objectsStream);

model.getDelayedObjects(objectStreamSubject)
        .observeOn(testScheduler)
        .subscribeOn(testScheduler)
        .subscribe(testSubscriber);

testScheduler.triggerActions();
//Thread.sleep(900) works with the default scheduler
testScheduler.advanceTimeBy(900, TimeUnit.MILLISECONDS);
testSubscriber.assertReceivedOnNext(objects);

更新:

查看了一下TestScheduler的使用方法后,我发现将调度器传递给delay函数是常见的做法。所以我通过在getDelayedObjects方法和delay方法中提供调度器参数来让测试通过。但是,我仍然不知道为什么之前不起作用。

1个回答

6
delay操作符默认使用计算调度器执行基于时间的延迟。这些信息可以在该方法的文档中找到。查找@SchedulerSupport注释中的值,本例中为io.reactivex:computation
为了进行测试,您需要将计算调度器替换为TestScheduler。要进行替换,您需要使用delay操作符之一的多个重载版本,其中包含一个Scheduler参数。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接