Chain kotlin flows depends on Result state
Asked Answered
M

6

7

I'm looking for the most "clean" way to implement the following logic:

  • I have N methods, everyone returns Flow<Result<SOME_TYPE>> (type are different)
  • I want to chain these methods, so if 1 returns Result.Success, then call 2nd and so on.

The most obvious way to do it is:

methodA().map { methodAResult ->
  when (methodAResult) {
    is Result.Success -> {
      methodB(methodAResult).map { methodBResult ->
        when (methodBResult) {
          is Result.Success -> {
            methodC(methodAResult).map { methodCResult ->
              when (methodCResult) {
                is Result.Success -> TODO()
                is Result.Failure -> TODO()
              }
            }
          }
          is Result.Failure -> TODO()
        }
      }
     }
     is Result.Failure -> TODO()
   }
 }

But it looks like a well-known "callback hell". Do u have any ideas how to avoid it?

Mccusker answered 11/10, 2020 at 12:16 Comment(0)
D
7

I believe this could be flattened with transform operator:

methodA().transform { methodAResult ->
    when (methodAResult) {
        is Success -> methodB(methodAResult).collect { emit(it) }
        is Failure -> TODO()
    }
}.transform { methodBResult ->
    when (methodBResult) {
        is Success -> methodC(methodBResult).collect { emit(it) }
        is Failure -> TODO()
    }
}.transform { methodCResult ->
    when (methodCResult) {
        is Success -> TODO()
        is Failure -> TODO()
    }
}
Dynamotor answered 11/10, 2020 at 19:39 Comment(4)
Yeah, that looks like what I need, but what about Result.Failure? How can I "stop" the chain in this case? upd: I think there is no way to do it. Will test your solution tomorrow and hope will accept itMccusker
If you emit nothing in failure branch, then processing for this particular result won't be continued. If it is not enough, and you want to stop processing for all results (including even successful) after first failure, you may define some boolean var thereWasFailureInMethodProcessing = false, set it to true in failure branch, and add if (thereWasFailureInMethodProcessing) return@transform check before when block.Hopson
I would recommend flatMapMerge instead of transform, seems more natural. For the failure case, you can return an empty flow or transform the Failure result into a result of the correct type and return flowOf the new result. Could even replace the when block with a call to fold.Overtake
@MichaelKrussel if it is possible, can you provide code sample just like accepted answerUriniferous
O
6

A slight modification to the solution provided by Михаил Нафталь

    methodA()
        .flatMapMerge {
            when (it) {
                is Result.Success -> methodB(it)
                is Result.Failure -> emptyFlow()
            }
        }.flatMapMerge {
            when (it) {
                is Result.Success -> methodC(it)
                is Result.Failure -> emptyFlow()
            }
        }.collect {
            when (it) {
                is Result.Success -> TODO()
                is Result.Failure -> TODO()
            }
        }

Merging the output of one flow to another flow is the goal of flatMap so using flatMap seems a little cleaner.

If this Result class has a map, fold, or getOrNull type method this could be cleaned up a bit more and the when blocks could be removed.

Also if you need to propagate the failure to collect then you could replace the calls to emptyFlow with a flow that just outputs the failure that you want.

Overtake answered 3/12, 2020 at 22:15 Comment(0)
F
2

Badly a flatMap method still doesn't exist.

But you can use mapCatching :

methodA
    .mapCatching { a -> methodB(a).getOrThrow() }
    .mapCatching { b -> methodC(b).getOrThrow() }

Or make your own flatMap extension function :

fun <T, R> Result<T>.flatMap(block: (T) -> (Result<R>)): Result<R> {
    return this.mapCatching {
        block(it).getOrThrow()
    }
}

methodA
    .flatMap { a -> methodB(a) }
    .flatMap { b -> methodC(b) }
Fuzzy answered 4/2, 2022 at 14:52 Comment(0)
E
0

Currently if you are fine with external libs you can use result computation block from Arrow like so:

import arrow.core.raise.result
import io.kotest.core.spec.style.DescribeSpec
import io.kotest.matchers.result.shouldBeFailure
import io.kotest.matchers.result.shouldBeSuccess
import io.kotest.matchers.shouldBe
// ...
fun methodA(): Result<String> = Result.success("1")
fun methodB(x: Int): Result<Int> = Result.success(x + 2)
fun methodC(x: String): Result<String> = Result.failure(Exception("Fail $x"))

result {
    val myint = methodA().bind().toInt()
    val mystring = methodB(myint).bind().toString()
    methodC(mystring).bind()
}.shouldBeFailure {
    it.message shouldBe "Fail 3"
}

result {
    val myint = methodA().bind().toInt()
    methodB(myint).bind().toString()
}.shouldBeSuccess {
    it shouldBe "3"
}
Emmert answered 3/8, 2023 at 8:57 Comment(0)
W
0

Also, in case if a success value collected upstream is necessary downstream you can chain a bunch of Flow using this:

fun <T1, T2, R> Flow<Result<T1>>.chain(
    other: Flow<Result<T2>>,
    transform: (T1, T2) -> R
): Flow<Result<R>> {
    return flow {
        [email protected](upstreamCollector = this) { result1 ->
            other.collectSuccessOrEmitError(upstreamCollector = this) { result2 ->
                emit(Result.success(transform(result1, result2)))
            }
        }
    }
}

private suspend fun <T, R> Flow<Result<T>>.collectSuccessOrEmitError(
    upstreamCollector: FlowCollector<Result<R>>,
    successCollector: FlowCollector<T>
) = catch {
    upstreamCollector.emit(Result.failure(it))
}.collect {
    it.fold(
        onSuccess = { value -> successCollector.emit(value) },
        onFailure = { throwable -> upstreamCollector.emit(Result.failure(throwable)) }
    )
}

And use will be like

flow1
  .chain(flow2) { a, b -> a + b }
  .chain(flow3) { a, b -> a to b }
  .collect {
      // do something
   }
Whiny answered 10/9, 2024 at 14:46 Comment(0)
S
-1

I believe that in this use case you should probably use suspend functions and compose them using await(). Errors should be passed through exceptions as described here.

Savate answered 11/10, 2020 at 14:43 Comment(5)
u can not use await on flowMccusker
As I've written, he needs to replace flows with suspending functions for that.Savate
its 2 different things. flow allow me to get "updates" from "publishers", suspending - used to run "heavy" work.Mccusker
You're right, flows are usually used on variables and not functions so I didn't notice. First, you should probably lose onSuccess and onFailure instead of when statement. I also find it hard to understand the use case. If you could provide an example, maybe I can give you a better answer.Savate
let's say u have a use case to add a new event. to add a new event u should check if event location is available (fun from location repository fun that returns a flow of Result<Location>), then checks if option (a predefined type of event) is available (fun from option repository that returns a flow of Result<Option>) and so on. They are not mentioned to be in a particular order, but have to respect the Result object (let's say if location checking returns Result.Failure, then we don't need to wait for other results from different repositories - option and so on)Mccusker

© 2022 - 2025 — McMap. All rights reserved.