How to convert a suspend function to an RX Single (or Completable)?
Asked Answered
W

1

11

We're refactoring our project from RX to Kotlin Coroutines, but not in one go, so we need our project use both for some time.

Now we have a lot of methods that use RX single as a return type like this, because they are heavy, long running operations, like API calls.

fun foo(): Single<String> // Heavy, long running opertation

We want it to be like this:

suspend fun foo(): String // The same heavy, long running opertation

Where we use this method, we want to still use RX.

We have been doing this:

foo()
    .subscribeOn(Schedulers.io())
    .map { ... }
    .subscribe { ... }

Now how should I convert my suspend fun to produce a Single, that I can work with?

Is this a good idea?

Single.fromCallable {
    runBlocking {
        foo() // This is now a suspend fun
    }
}
    .subscribeOn(Schedulers.io())
    .map { ... }
    .subscribe { ... }
Wearing answered 28/3, 2022 at 11:7 Comment(0)
J
21

I didn’t give it a lot of time to reason it out, but I think your way of creating it might use one additional thread for the duration of the suspend function.

There is an official kotlinx-coroutines-rx3 library that has conversions. The rxSingle function is relevant here. Under the hood, it will call the suspend function in a suspending way without blocking an additional thread.

I haven’t used this myself so you may want to test for yourself to be sure, but I don’t think you need to use subscribeOn immediately following this because the actual work is being done in a coroutine. Your Foo suspend function, since it is a suspend function, should already be internally using Dispatchers.IO if it performs blocking IO.

rxSingle { foo() }
    .map { ... }
    .subscribe { ... }
Jackijackie answered 28/3, 2022 at 11:57 Comment(5)
Thanks, it works! If I want to make it use Dispatchers.IO, I need to do this (The default is Dispatchers.Default): rxSingle(Dispatchers.IO) { foo() }Wearing
What I mean is, since foo() is a suspend function, you don’t need to specify a dispatcher to call it. It’s an error to write a suspend function that blocks. Therefore you never have to specify a dispatcher for calling a suspend function.Jackijackie
The rxSingle method accepts a CoroutineDispatcher to specify where you want to run it (otherwise the default is Dispatchers.Default). The suspend function won't block anything, if you don't call it from runBlocking {} but it can run for a long time. I think specifying a dispatcher is justified if you don't want the suspend function to run on Dispatchers.Default, and it is often necessary for unit tests.Wearing
can foo() return a flow?Goldstone
@Goldstone No reason I see it can't return absolutely anything.Jackijackie

© 2022 - 2024 — McMap. All rights reserved.