Kotlin Flow execute two API calls in parallel and collect each result as it arrives
Asked Answered
P

3

14

I am trying to implement cache then network strategy for my API call using Kotlin Flows. Here is what I am trying right now

flowOf(
 remoteDataSource.getDataFromCache() // suspending function returning Flow<Data>
   .catch { error -> Timber.e(error) },
 remoteDataSource.getDataFromServer() // suspending function returning Flow<Data>
).flattenConcat().collect {
  Timber.i("Response Received")
}

Problem here is collect is only called when getDataFromServer returns. My expectation is that I should get first event from cache and then second event from server after a few milliseconds. In this case "Response Received"gets printed twice but immediately one after other.

In this other variant "Response Received" only gets printed once that is after getDataFromServer() returns.

 remoteDataSource.getDataFromCache() // suspending function returning Flow<Data>
  .catch { error -> Timber.e(error) }
  .flatMapConcat {
    remoteDataSource.getDataFromServer() // suspending function returning Flow<Data>
  }
  .collect {
    Timber.i("Response Received")
  }

I was using RxJava's Flowable.concat() before and it was working perfectly. Is there something in Kotlin Flows which can emulate that behaviour?

Pulverulent answered 12/2, 2020 at 17:48 Comment(0)
C
8

Problem here is collect is only called when getDataFromServer returns.

The first problematic thing with your design is that the Flow-returning function is also suspendable. That's two layers of suspendability. Functions should return flows without any delays and the flows themselves should emit items as they come in. If you followed this guideline, your initial code would already work.

The way you wrote these functions, they can still work if you write this:

flow<String> {
    emitAll(getCached())
    emitAll(getFromServer())
}

This statement completes immediately, returning a cold flow. When you call collect on it, it first calls getCached() and emits the cached value, and then calls getFromServer() and emits the server response.


The above solution starts the server call only after you consume the cached value. If you need the two flows to be active concurrently, use flatMapMerge.

Assuming you fixed the above basic problem and made your Flow-returning functions non-suspending, all you need is this:

flowOf(getCached(), getFromServer()).flattenMerge()

If for some reason you can't do that, you have to add the emitAll wrapper around each call:

flowOf(
    flow { emitAll(getCached()) }, 
    flow { emitAll(getFromServer()) }
).flattenMerge()
Cohleen answered 13/2, 2020 at 12:43 Comment(0)
S
3

Recently, merge operator was added to the Kotlin coroutines version 1.3.3. Here is the merged PR.

Using the merge operator, you should be able to get the result as and when it arrives.

Stockwell answered 13/2, 2020 at 10:12 Comment(0)
P
2

Turns out in case of flowOf(someOperation()) someOperation() needs to be completed for downstream to start processing. Its like Observable.just(someOperation()) in RxJava world.

In second scenario flatMapConcat is actually a transform operator so it obviously returns final processed output.

There seems to be lack of native concat like operators in Flow world. This is how I solved this problem in the end

flow {
   remoteDataSource.getDataFromCache()
   .catch { error -> Timber.e(error) }
   .onCompletion {
       remoteDataSource.getDataFromServer()
            .collect {
                 emit(it)
            }
    }.collect { emit(it) }
}
Pulverulent answered 13/2, 2020 at 6:53 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.