RxJava 2如何在单元测试中覆盖IO调度器

22

我正在尝试测试以下 RxKotlin/RxJava 2 代码:

validate(data)
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .flatMap { ... }

我正在尝试按照以下方式覆盖调度程序:

// Runs before each test suite
RxJavaPlugins.setInitIoSchedulerHandler { Schedulers.trampoline() }
RxAndroidPlugins.setInitMainThreadSchedulerHandler { Schedulers.trampoline() }

但是,当我运行测试时,出现以下错误:

java.lang.ExceptionInInitializerError
...
Caused by: java.lang.NullPointerException: Scheduler Callable result can't be null
    at io.reactivex.internal.functions.ObjectHelper.requireNonNull(ObjectHelper.java:39)
    at io.reactivex.plugins.RxJavaPlugins.applyRequireNonNull(RxJavaPlugins.java:1317)
    at io.reactivex.plugins.RxJavaPlugins.initIoScheduler(RxJavaPlugins.java:306)
    at io.reactivex.schedulers.Schedulers.<clinit>(Schedulers.java:84)

有人遇到过这个问题吗?


在使用RxKotlin/RxJava 1和以下调度程序覆盖时,测试工作正常:

RxAndroidPlugins.getInstance().registerSchedulersHook(object : RxAndroidSchedulersHook() {
    override fun getMainThreadScheduler() = Schedulers.immediate()
})

RxJavaPlugins.getInstance().registerSchedulersHook(object : RxJavaSchedulersHook() {
    override fun getIOScheduler() = Schedulers.immediate()
})

1
请查看2.0.8的更新Javadoc文档:http://reactivex.io/RxJava/2.x/javadoc/io/reactivex/schedulers/Schedulers.html#io() - akarnokd
1
请注意,由于可能存在初始化循环,使用任何其他返回调度程序的方法都会导致NullPointerException。 - Kiskae
2
一旦Schedulers类被初始化,您可以通过RxJavaPlugins.setIoSchedulerHandler(io.reactivex.functions.Function)方法覆盖返回的Scheduler实例。 - akarnokd
谢谢!我之前尝试使用setIoSchedulerHandler,但是flatMap没有被调用。最终找到了原因:validate方法返回了一个执行了emitter.onNext(null)的observable :/ 由于RxJava 2不再接受null值,我将其更改为Completable,现在测试已经通过了! - Alex
4个回答

24

我建议您采用不同的方法,并向您的调度程序添加一层抽象。这位作者在他的文章中提供了一个不错的示例。

在 Kotlin 中,它看起来会像这样:

interface SchedulerProvider {
    fun ui(): Scheduler
    fun computation(): Scheduler
    fun trampoline(): Scheduler
    fun newThread(): Scheduler
    fun io(): Scheduler 
}

然后你可以使用自己的 SchedulerProvider 实现来覆盖它:

class AppSchedulerProvider : SchedulerProvider {
    override fun ui(): Scheduler {
        return AndroidSchedulers.mainThread()
    }

    override fun computation(): Scheduler {
        return Schedulers.computation()
    }

    override fun trampoline(): Scheduler {
        return Schedulers.trampoline()
    }

    override fun newThread(): Scheduler {
        return Schedulers.newThread()
    }

    override fun io(): Scheduler {
        return Schedulers.io()
    }
}

还有一个用于测试类的例子:

class TestSchedulerProvider : SchedulerProvider {
    override fun ui(): Scheduler {
        return Schedulers.trampoline()
    }

    override fun computation(): Scheduler {
        return Schedulers.trampoline()
    }

    override fun trampoline(): Scheduler {
        return Schedulers.trampoline()
    }

    override fun newThread(): Scheduler {
        return Schedulers.trampoline()
    }

    override fun io(): Scheduler {
        return Schedulers.trampoline()
    }
}

当你调用 RxJava 时,你的代码将会像这样:

mCompositeDisposable.add(mDataManager.getQuote()
        .subscribeOn(mSchedulerProvider.io())
        .observeOn(mSchedulerProvider.ui())
        .subscribe(Consumer<Quote> {
...

你只需要根据测试环境重写SchedulerProvider的实现。这是一个样例项目,我会链接使用可测试版本SchedulerProvider的测试文件:https://github.com/Obaied/DingerQuotes/blob/master/app/src/test/java/com/obaied/dingerquotes/QuotePresenterTest.kt#L31


感谢分享“trampoline”。我使用了“immediate”,但在Rx2中不再可用。 - Leo DroidCoder
2
我认为我们不需要这个抽象层。感觉我们只是为了测试而对生产代码进行了重大修改,而我们已经有了无需这种抽象的测试手段。@Alex提供的答案完美地解决了问题。 - Henry
1
我必须赞同@Henry的观点。使用RxJavaPlugin...在测试期间修改调度程序似乎更实际,而不是仅仅为了测试而修改生产代码。 - William Reed

14

我明白了!这与这段代码中的事实有关:

validate(data)
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .flatMap { ... }

validate(data)返回了一个Observable,它发出了以下内容:emitter.onNext(null)。由于RxJava 2不再接受null值,flatMap没有被调用。我将validate更改为返回Completable,并更新调度程序覆盖如下:

RxJavaPlugins.setIoSchedulerHandler { Schedulers.trampoline() }

现在测试通过了!


6
作为替代方案,这在我的项目中一直很好用。您可以在测试类中像这样使用它:
@get:Rule
val immediateSchedulersRule = ImmediateSchedulersRule()

这个类看起来像这样:

class ImmediateSchedulersRule : ExternalResource() {

    val immediateScheduler: Scheduler = object : Scheduler() {

        override fun createWorker() = ExecutorScheduler.ExecutorWorker(Executor { it.run() })

        // This prevents errors when scheduling a delay
        override fun scheduleDirect(run: Runnable, delay: Long, unit: TimeUnit): Disposable {
            return super.scheduleDirect(run, 0, unit)
        }

    }

    override fun before() {
        RxJavaPlugins.setIoSchedulerHandler { immediateScheduler }
        RxJavaPlugins.setComputationSchedulerHandler { immediateScheduler }
        RxJavaPlugins.setNewThreadSchedulerHandler { immediateScheduler }

        RxAndroidPlugins.setInitMainThreadSchedulerHandler { immediateScheduler }
        RxAndroidPlugins.setMainThreadSchedulerHandler { immediateScheduler }
    }

    override fun after() {
        RxJavaPlugins.reset()
    }

}

您可以在这里找到从TestRule迁移到ExternalResource的方法,并在这里获取有关测试RxJava 2的更多信息。


5
这是我使用的确切语法:

这是我使用的确切语法:

RxJavaPlugins.setIoSchedulerHandler(scheduler -> Schedulers.trampoline())

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接