I know RxJava very well and I've recently switched to Kotlin Coroutines and Flow.
RxKotlin is basically the same as RxJava, it just adds some syntactic sugar to make it more comfortable / idiomatic writing RxJava code in Kotlin.
A "fair" comparison between RxJava and Kotlin Coroutines should include Flow in the mix and I'm gonna try to explain why here. This is gonna be a bit long but I'll try to keep it as simple as I can with examples.
With RxJava you have different objects (since version 2):
// 0-n events without backpressure management
fun observeEventsA(): Observable<String>
// 0-n events with explicit backpressure management
fun observeEventsB(): Flowable<String>
// exactly 1 event
fun encrypt(original: String): Single<String>
// 0-1 events
fun cached(key: String): Maybe<MyData>
// just completes with no specific results
fun syncPending(): Completable
In Kotlin coroutines + flow you do not need many entities cause if you do not have a stream of events you can just use simple coroutines (suspending functions):
// 0-n events, the backpressure is automatically taken care off
fun observeEvents(): Flow<String>
// exactly 1 event
suspend fun encrypt(original: String): String
// 0-1 events
suspend fun cached(key: String): MyData?
// just completes with no specific results
suspend fun syncPending()
Bonus: Kotlin Flow / Coroutines support null
values (support removed with RxJava 2)
Suspending functions are what the name hints: they are functions that can pause the execution of the code and resume it later when the function is completed; this allows you to write code that feels more natural.
What about the operators?
With RxJava you have so many operators (map
, filter
, flatMap
, switchMap
, ...), and for most of them there's a version for each entity type (Single.map()
, Observable.map()
, ...).
Kotlin Coroutines + Flow do not need that many operators, let's see why with some examples of the most common operators:
map()
RxJava:
fun getPerson(id: String): Single<Person>
fun observePersons(): Observable<Person>
fun getPersonName(id: String): Single<String> {
return getPerson(id)
.map { it.firstName }
}
fun observePersonsNames(): Observable<String> {
return observePersons()
.map { it.firstName }
}
Kotlin coroutines + Flow
suspend fun getPerson(id: String): Person
fun observePersons(): Flow<Person>
suspend fun getPersonName(id: String): String? {
return getPerson(id).firstName
}
fun observePersonsNames(): Flow<String> {
return observePersons()
.map { it.firstName }
}
You do not need an operator for the "single" case and it is fairly similar for the Flow
case.
flatMap()
The flatMap
operator and his siblings switchMap
, contactMap
exist to allow you to combine different RxJava objects and thus execute potentially asynchronous code while mapping your events.
Say you need, for each person, to grab from a database (or remote service) it's insurance
RxJava
fun fetchInsurance(insuranceId: String): Single<Insurance>
fun getPersonInsurance(id: String): Single<Insurance> {
return getPerson(id)
.flatMap { person ->
fetchInsurance(person.insuranceId)
}
}
fun observePersonsInsurances(): Observable<Insurance> {
return observePersons()
.flatMap { person ->
fetchInsurance(person.insuranceId) // this is a Single
.toObservable() // flatMap expect an Observable
}
}
Let's see with Kotlin Coroutines + Flow
suspend fun fetchInsurance(insuranceId: String): Insurance
suspend fun getPersonInsurance(id: String): Insurance {
val person = getPerson(id)
return fetchInsurance(person.insuranceId)
}
fun observePersonsInsurances(): Flow<Insurance> {
return observePersons()
.map { person ->
fetchInsurance(person.insuranceId)
}
}
Like before, with the simple coroutine case we do not need operators, we just write the code like we would if it wasn't async, just using suspending functions.
And with Flow
that is NOT a typo, there's no need for a flatMap
operator, we can just use map
. And the reason is that map lambda is a suspending function! We can execute suspending code in it!!!
We don't need another operator just for that.
I cheated a bit here
Rx flatMap
, switchMap
and concatMap
behave slightly differently.
Rx flatMap
generates a new stream for each event and then merges them all together: the order of the new stream's events you receive in the output is undetermined, it might not match the order or the events in input
Rx concatMap
"fixes" that and guarantees you will get each new stream in the same order as your input events
Rx switchMap
will instead dispose of any previously running stream when it gets a new event, only the last input received matters with this operator
So you see, it isn't true that Flow.map
is the same, it is actually more similar to Rx concatMap
, which is the more natural behavior you expect from a map operator.
But it is true you need fewer operators, inside map you can do any async operation you want and reproduce the behavior of flatMap
because it is a suspendable function. The actual equivalent operator to RxJava flatMap
is Flow.flatMapMerge
operator.
The equivalent of the RxJava switchMap
can be achieved in Flow by using the conflate()
operator before the map
operator.
For more complex stuff you can use the Flow transform()
operator which for every event emits a Flow of your choice.
Every Flow operator accepts a suspending function!
In the previous paragraph I told you I cheated. But the key getaway of what I meant by Flow do not need as many operators is that most operator's callbacks are suspending function.
So say you need to filter()
but your filter needs to perform a network call to know if you should keep the value or not, with RxJava you need to combine multiple operators with unreadable code, with Flow you can just use filter()
!
fun observePersonsWithValidInsurance(): Flow<Person> {
return observerPersons()
.filter { person ->
val insurance = fetchInsurance(person.insuranceId) // suspending call
insurance.isValid()
}
}
delay(), startWith(), concatWith(), ...
In RxJava you have many operators for applying delay or adding items before and after:
- delay()
- delaySubscription()
- startWith(T)
- startWith(Observable)
- concatWith(...)
with Kotlin Flow you can simply:
grabMyFlow()
.onStart {
// delay by 3 seconds before starting
delay(3000L)
// just emitting an item first
emit("First item!")
emit(cachedItem()) // call another suspending function and emit the result
}
.onEach { value ->
// insert a delay of 1 second after a value only on some condition
if (value.length() > 5) {
delay(1000L)
}
}
.onCompletion {
val endingSequence: Flow<String> = grabEndingSequence()
emitAll(endingSequence)
}
error handling
RxJava have a lot of operators to handle errors:
- onErrorResumeWith()
- onErrorReturn()
- onErrorComplete()
with Flow you don't need much more than the operator catch()
:
grabMyFlow()
.catch { error ->
// emit something from the flow
emit("We got an error: $error.message")
// then if we can recover from this error emit it
if (error is RecoverableError) {
// error.recover() here is supposed to return a Flow<> to recover
emitAll(error.recover())
} else {
// re-throw the error if we can't recover (aka = don't catch it)
throw error
}
}
and with suspending function you can just use try {} catch() {}
.
You can achieve ALL the RxJava error operators with a single catch
operator because you get a suspending function.
easy to write Flow operators
Due to the coroutines powering Flow under the hood it is way easier to write operators. If you ever checked an RxJava operator you would see how hard it is and how many things you need to learn.
Writing Kotlin Flow operators is easier, you can get an idea just by looking at the source code of the operators that are already part of Flow here. The reason is coroutines make it easier to write async code and operators just feel more natural to use.
As a bonus, Flow operators are all Kotlin Extension Functions, which means either you, or libraries, can easily add operators and they will not feel weird to use (in RxJava observable.lift()
or observable.compose()
are needed to combine custom operators).
Upstream thread doesn't leak downstream
What does this even mean?
This explains why in RxJava you have subscribeOn()
and observeOn()
while in Flow you only have flowOn()
.
Let's take this RxJava example:
urlsToCall()
.switchMap { url ->
if (url.scheme == "local") {
val data = grabFromMemory(url.path)
Flowable.just(data)
} else {
performNetworkCall(url)
.subscribeOn(Subscribers.io())
.toObservable()
}
}
.subscribe {
// in which thread is this call executed?
}
So where is the callback in subscribe
executed?
The answer is:
it depends...
if it comes from the network it's in an IO thread; if it comes from the other branch it is undefined, depending on which thread is used to send the url.
If you think about it, any code you write: you don't know in which thread it is gonna be executed: always depends on the caller. The issue here is that the Thread doesn't depend on the caller anymore, it depends on what an internal function call does.
Suppose you have this plain, standard code:
fun callUrl(url: Uri) {
val callResult = if (url.scheme == "local") {
grabFromMemory(url.path)
} else {
performNetworkCall(url)
}
return callResult
}
Imagine not having a way of knowing in which thread the line return callResult
is executed in without looking inside grabFromMemory()
and performNetworkCall()
.
Think about that for a second: having the thread change based on which function you call and what they do inside.
This happens all the time with callbacks APIs: you have no way of knowing in which thread the callback you provide will be executed unless documented.
This is the concept of "upstream thread leaking downstream".
With Flow and Coroutines this is not the case, unless you explicitly require this behavior (using Dispatchers.Unconfined
).
suspend fun myFunction() {
// execute this coroutine body in the main thread
withContext(Dispatchers.Main) {
urlsToCall()
.conflate() // to achieve the effect of switchMap
.transform { url ->
if (url.scheme == "local") {
val data = grabFromMemory(url.path)
emit(data)
} else {
withContext(Dispatchers.IO) {
performNetworkCall(url)
}
}
}
.collect {
// this will always execute in the main thread
// because this is where we collect,
// inside withContext(Dispatchers.Main)
}
}
}
Coroutines code will run in the context that they have been executed into. And only the part with the network call will run on the IO thread, while everything else we see here will run on the main thread.
Well, actually, we don't know where code inside grabFromMemory()
will run, but we don't care: we know that it will be called inside the Main thread, inside that suspending function we could have another Dispatcher being used, but we know when it will get back with the result val data
this will be again in the main thread.
Which means, looking at a piece of code, it's easier to tell in which thread it will run, if you see an explicit Dispatcher = it's that dispatcher, if you do not see it: in whatever thread dispatcher the suspension call you are looking at is being called.
Structured Concurrency
This is not a concept invented by Kotlin, but it is something they embraced more than any other language I know of.
If what I explain here is not enough for you read this article or watch this video.
So what is it?
With RxJava you subscribe to observables, and they give you a Disposable
object.
You need to take care of disposing of it when it's not needed anymore. So what you usually do is keep a reference to it (or put it in a CompositeDisposable
) to later call dispose()
on it when it's not needed anymore. If you don't the linter will give you a warning.
RxJava is somewhat nicer than a traditional thread. When you create a new thread and execute something on it, it's a "fire and forget", you do not even get a way to cancel it: Thread.stop()
is deprecated, harmful, and recent implementations actually do nothing. Thread.interrupt()
makes your thread fail etc.. Any exceptions go lost... you get the picture.
With Kotlin coroutines and flow they reverse the "Disposable" concept. You CANNOT create a coroutine without a CoroutineContext
.
This context defines the scope
of your coroutine. Every child coroutine spawned inside that one will share the same scope.
If you subscribe to a flow you have to be inside a coroutine or provide a scope too.
You can still keep a reference of the coroutines you start (Job
) and cancel them. This will cancel every child of that coroutine automatically.
If you are an Android developer they give you these scopes automatically. Example: viewModelScope
and you can launch coroutines inside a viewModel with that scope knowing they will automatically be canceled when the ViewModel is cleared.
viewModelScope.launch {
// my coroutine here
}
Some scope will terminate if any children fail, some other scope will let each child leave his own lifecycle without stopping other children if one fails (SupervisedJob
).
Why is this a good thing?
Let me try to explain it like Roman Elizarov did.
Some old programming languages had this concept of goto
which basically let you jump from one line of code to another at will.
Very powerful, but if abused you could end up with very hard-to-understand code, difficult to debug and reason upon.
So new programming languages eventually completely removed it from the language.
When you use if
or while
or when
it is way easier to reason on the code: doesn't matter what happens inside those blocks, you'll eventually come out of them, it's a "context", you don't have weird jumps in and out.
Launching a thread or subscribing to an RxJava observable is similar to the goto: you are executing code which then will keep going until "elsewhere" is stopped.
With coroutines, by demanding you provide a context/scope, you know that when your scope is over everything inside that coroutine will complete when your context completes, doesn't matter if you have a single coroutine or 10 thousand.
You can still "goto" with coroutines by using GlobalScope
, which you shouldn't for the same reason you shouldn't use goto
in languages that provide it.
Cold vs Hot - ShareFlow and StateFlow
When we work with reactive streams we always have this concept of Cold and Hot streams. Those are concepts on both the Rx world and Kotlin Flows
Cold streams are just like a function in our code: it's there and does nothing until you call it. With a Flow that means it is defined what the stream does but it will do nothing until you start to collect on it. And, like a function, if you collect (call) it twice the stream will run twice. (ex. a cold stream to perform an http request will execute the request twice if collected twice).
Hot streams do not work like that. When you have multiple collect calls on them they all share the same Hot stream under the hood, which means your hot streams run once and you can have multiple observers.
You can usually turn a Cold stream into a Hot stream with some operator.
On RxJava you can use this concept of Connectable Observable/Flowable.
val coldObservable: Observable<Something> = buildColdObservable()
// create a hot observable from the cold one
val connectableObservable: ConnectableObservable<Something> = coldObservable.publish()
// you can subscribe multiple times to this connectable
val subADisposable: Disposable = connectableObservable.subscribe(subscriberA)
val subBDisposable: Disposable = connectableObservable.subscribe(subscriberB)
// but nothing will be emitted there until you call
val hotDisposable: Disposable = connectableObservable.connect()
// which actually run the cold observable and share the result on bot subscriberA and subscriberB
// while it's active another one can start listening to it
val subCDisposable: Disposable = connectableObservable.subscribe(subscriberC)
You then have other helpful operators like refCount()
or autoConnect()
which turn back the Connectable
into a standard stream and under the hood automatically .connect()
when the first subscriber is attached.
buildColdObservable()
.replay(1) // when a new subscriber is attached receive the last data instantly
.autoConnect() // keep the cold observable alive while there's some subscriber
On Flow you have the shareIn()
and the stateIn()
operators. You can see the API design here. They are less "manual" in handling when you "connect".
buildColdFlow()
.shareIn(
// you need to specify a scope for the cold flow subscription
scope = myScope,
// when to "connect"
started = SharingStarted.WhileSubscribed(),
// how many events already emitted should be sent to new subscribers
replay = 1,
)
scope
The scope is for structured concurrency. On RxJava it's the connect()
operation that actually subscribes to the cold observable, it gives you a Disposable
you will have to call .dispose()
on somewhere. If you use refCount()
or autoConnect()
it is called on the first subscriber and with refCount()
is never disposed while with autoConnect()
is disposed when there aren't any more subscribers.
With Flow you need to give a dedicated Scope to collect the cold stream, if you cancel that scope the cold stream will stop emitting and will not be usable anymore.
started
So this one is easy
- RxJava
refCount()
--> Flow SharingStarted.Lazily
, starts collecting on the first subscriber
- RxJava
autoConnect()
-> Flow SharingStarted.WhileSubscribed()
, starts collecting on the first subscriber and cancels it when there aren't any more
- RxJava call
connect()
manually before any subscription -> Flow SharingStarted.Eagerly()
, starts collecting immediately
The WhileSubscribed()
has useful parameters, check them out.
You can also define your own logic for SharingStarted
to handle when collecting from the cold Flow.
Behavior and backpressure
When you have a hot observable you always have backpressure issues to deal with. 1 source of data being listened by many means one listener can be slower than others.
Flow .shareIn
collects the cold stream in a dedicated coroutine and buffer emission by default. It means if the cold stream emits too fast it will use the buffer. You can change this behavior.
Kotlin SharedFlow
also lets you access the replay buffer directly to inspect previous emission if you need to.
Canceling a subscriber will have no effect on the shared flow.
using flowOn()
to change the Dispatcher
on the subscriber will have no effect on the shared flow (use flowOn()
before sharing if you need to run the cold stream in some specific dispatcher)
stateIn
Flow has a "special" version of ShareFlow
that is called StateFlow
and you can use stateIn()
to create one from another stream.
A StateFlow
always has 1 value, it cannot be "empty", so you need to provide the initial value when you do stateIn()
.
A StateFlow
can never throw exceptions and can never terminate (in this way is similar to BehaviorRelay
in the RxRelay library)
A StateFlow
will only emit if the state change (it's like it has a build in distinctUntilChanged()
.
RxJava Subjects vs Mutable*Flow
A Subject
in RxJava is a class that you can use to manually push your data on it while still using it as a stream.
In Flow you can use MutableSharedFlow
or MutableStateFlow
to achieve a similar effect.
With Kotlin coroutines you can also use Channels
but they are considered somewhat a lower level API.
Any Drawback?
Flow is still in development and some features available in RxJava might be marked experimental in Kotlin Coroutines Flow or have some difference here and there.
Some niche operator or operator function might not be yet implemented and you might have to implement it yourself (at least it's easier).
But other than that there aren't any drawbacks I know of.
However, there are differences to be aware of that could cause some friction in switching from RxJava and need you to learn new things.
Structured concurrency is a step forward, but introduces a new concept you need to learn and get used to (scopes, supervisorJob): cancellation is handled completely different.
There's some gotcha to be aware of.
Gotcha: Cancellation Exception
If you cancel()
job in a coroutine or throw CancellationException()
the exception is propagated to parent coroutines unless you used a Supervisor scope / job.
The parent coroutine also cancels sibling coroutines of the one that got canceled if that happens.
BUT if you catch(e: Exception)
, even using runCatching {}
, you must remember to rethrow CancellationException()
otherwise you'll have unexpected results cause the coroutine has been canceled but your code is still trying to execute like it wasn't.
Gotcha: UncaughtExceptionHandler
if you do launch { ... }
to create a new coroutine and that coroutine throws, by default, that will terminate the coroutine but will not crash the app and you might completely miss that something went wrong.
This code will not crash your app.
launch {
throw RuntimeException()
}
In some cases it might not even print anything in the log.
If it was a cancellation exception it will definitely NOT print anything in the log.