RxJava 2 overriding IO scheduler in unit test
Asked Answered
S

4

22

I'm trying to test the following RxKotlin/RxJava 2 code:

validate(data)
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .flatMap { ... }

I'm attempting to override the schedulers as follows:

// Runs before each test suite
RxJavaPlugins.setInitIoSchedulerHandler { Schedulers.trampoline() }
RxAndroidPlugins.setInitMainThreadSchedulerHandler { Schedulers.trampoline() }

However, I get the following error when running the test:

java.lang.ExceptionInInitializerError
...
Caused by: java.lang.NullPointerException: Scheduler Callable result can't be null
    at io.reactivex.internal.functions.ObjectHelper.requireNonNull(ObjectHelper.java:39)
    at io.reactivex.plugins.RxJavaPlugins.applyRequireNonNull(RxJavaPlugins.java:1317)
    at io.reactivex.plugins.RxJavaPlugins.initIoScheduler(RxJavaPlugins.java:306)
    at io.reactivex.schedulers.Schedulers.<clinit>(Schedulers.java:84)

Has anyone experienced this problem?


The test worked fine when using RxKotlin/RxJava 1 and the following scheduler overrides:

RxAndroidPlugins.getInstance().registerSchedulersHook(object : RxAndroidSchedulersHook() {
    override fun getMainThreadScheduler() = Schedulers.immediate()
})

RxJavaPlugins.getInstance().registerSchedulersHook(object : RxJavaSchedulersHook() {
    override fun getIOScheduler() = Schedulers.immediate()
})
Somehow answered 7/4, 2017 at 18:40 Comment(4)
See the updated Javadoc for 2.0.8: reactivex.io/RxJava/2.x/javadoc/io/reactivex/schedulers/…Verlinevermeer
Specifically "Note that due to possible initialization cycles, using any of the other scheduler-returning methods will result in a NullPointerException."Welltodo
"Once the Schedulers class has been initialized, you can override the returned Scheduler instance via the RxJavaPlugins.setIoSchedulerHandler(io.reactivex.functions.Function) method."Verlinevermeer
Thanks! I tried previously using setIoSchedulerHandler, but flatMap was not getting called. Finally figured out why: The validate method was returning an observable that did emitter.onNext(null) :/ Since nulls are no longer accepted in RxJava 2, I changed that to a Completable and tests now pass!Somehow
T
24

I suggest you take a different approach and add a layer of abstraction to your schedulers. This guy has a nice article about it.

It would look something like this in Kotlin

interface SchedulerProvider {
    fun ui(): Scheduler
    fun computation(): Scheduler
    fun trampoline(): Scheduler
    fun newThread(): Scheduler
    fun io(): Scheduler 
}

And then you override that with your own implementation of SchedulerProvider:

class AppSchedulerProvider : SchedulerProvider {
    override fun ui(): Scheduler {
        return AndroidSchedulers.mainThread()
    }

    override fun computation(): Scheduler {
        return Schedulers.computation()
    }

    override fun trampoline(): Scheduler {
        return Schedulers.trampoline()
    }

    override fun newThread(): Scheduler {
        return Schedulers.newThread()
    }

    override fun io(): Scheduler {
        return Schedulers.io()
    }
}

And one for testing classes:

class TestSchedulerProvider : SchedulerProvider {
    override fun ui(): Scheduler {
        return Schedulers.trampoline()
    }

    override fun computation(): Scheduler {
        return Schedulers.trampoline()
    }

    override fun trampoline(): Scheduler {
        return Schedulers.trampoline()
    }

    override fun newThread(): Scheduler {
        return Schedulers.trampoline()
    }

    override fun io(): Scheduler {
        return Schedulers.trampoline()
    }
}

Your code would look like this where you call RxJava:

mCompositeDisposable.add(mDataManager.getQuote()
        .subscribeOn(mSchedulerProvider.io())
        .observeOn(mSchedulerProvider.ui())
        .subscribe(Consumer<Quote> {
...

And you'll just override your implementation of SchedulerProvider based on where you test it. Here's a sample project for reference, I am linking the test file that would use the testable-version of SchedulerProvider: https://github.com/Obaied/DingerQuotes/blob/master/app/src/test/java/com/obaied/dingerquotes/QuotePresenterTest.kt#L31

Trilbie answered 10/4, 2017 at 10:30 Comment(3)
Thanks for sharing "trampoline". I used "immediate", but it's not available in Rx2 anymorePrognosticate
I don't this we need this layer of abstraction. It feels like we are modifying production code significantly just for the sake of testing, when we already have the means to test it without such an abstraction. The answer provided by @Somehow works perfectly.Hollingshead
I'd have to agree with @Hollingshead here. It seems much more practical to just modify the schedulers during testing with RxJavaPlugin... than have to modify production code just for the sake of testsWessex
S
14

Figured it out! It had to do with the fact that in this code:

validate(data)
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .flatMap { ... }

validate(data) was returning an Observable, which was emitting the following: emitter.onNext(null). Since RxJava 2 no longer accepts null values, flatMap was not getting called. I changed validate to return a Completable and updated the scheduler override to the following:

RxJavaPlugins.setIoSchedulerHandler { Schedulers.trampoline() }

Now the tests pass!

Somehow answered 7/4, 2017 at 19:19 Comment(0)
C
6

As an alternative to proposed solutions, this has been working fine for a while in my projects. You can use it in your test classes like this:

@get:Rule
val immediateSchedulersRule = ImmediateSchedulersRule()

And the class looks like this:

class ImmediateSchedulersRule : ExternalResource() {

    val immediateScheduler: Scheduler = object : Scheduler() {

        override fun createWorker() = ExecutorScheduler.ExecutorWorker(Executor { it.run() })

        // This prevents errors when scheduling a delay
        override fun scheduleDirect(run: Runnable, delay: Long, unit: TimeUnit): Disposable {
            return super.scheduleDirect(run, 0, unit)
        }

    }

    override fun before() {
        RxJavaPlugins.setIoSchedulerHandler { immediateScheduler }
        RxJavaPlugins.setComputationSchedulerHandler { immediateScheduler }
        RxJavaPlugins.setNewThreadSchedulerHandler { immediateScheduler }

        RxAndroidPlugins.setInitMainThreadSchedulerHandler { immediateScheduler }
        RxAndroidPlugins.setMainThreadSchedulerHandler { immediateScheduler }
    }

    override fun after() {
        RxJavaPlugins.reset()
    }

}

You can find a way to migrate from TestRule to ExternalResource here and get more info on testing RxJava 2 here.

Crooks answered 28/6, 2019 at 15:11 Comment(0)
I
5

This is the exact syntax that worked for me:

RxJavaPlugins.setIoSchedulerHandler(scheduler -> Schedulers.trampoline())
Inexpedient answered 27/11, 2018 at 12:28 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.