How to suppress Error in a Coroutine Flow, so the Flow doesn't complete?
Asked Answered
T

3

5

I have a flow which could possible throw an error like so:

val myFlow = flow {
    emit("1")
    delay(2000)
    emit("2")
    delay(2000)
    emit("3")
    delay(2000)
    emit("4")
    delay(2000)
    throw Exception() // here it would throw an error
    delay(10000)
    emit("6")  // because the flow completes on error, it doesn't emit this
}

My problem is that, when the error is thrown even when I add a .catch { error -> emit("5") }.. it still completes the flow, and so "6" isnt emitted.

myFlow.catch { error ->
    emit("5")
}.onEach {
    println("$it")
}.onCompletion {
    println("Complete")
}.launchIn(scope)

And the result is:

1
2
3
4
5
Complete

I need it to be:

1
2
3
4
5
6
Complete

I want to swallow the error instead of making the flow complete. How can I achieve this?

Teuton answered 30/11, 2020 at 17:34 Comment(3)
try { throw Exception() } catch(e: Exception) { emit("5") }Hogback
Could you explain your use-case about what you want to achieve? So that proper solution can be providedAneroidograph
https://mcmap.net/q/886238/-how-to-resume-flow-after-exceptionRedskin
C
6

OK, I know it's not exactly the same example but I suspect my case is somewhat similar.

Let`s say you have some kind of risky flow. The flow may throw an exception. But you want to maintain the connection with the flow even after exception, because it emits some important, e.g., server real-time updates.

And your goal is suppress an exception or convert it to some data and continue listening to the real-time updates no matter what.

    var counter = 0

    val riskyFlow = flow {

        while (true) {
            counter++

            delay(1000)

            if (counter == 3) 
                throw IllegalStateException("Oops! Error.")
            else 
                emit("Server update")
            
        }
    }

If you use catch, risky-flow will complete after you emit something you want on error.

riskyFlow
        .catch { cause -> emit("Emit on error") }
        .onEach { println(it) }
        .launchIn(GlobalScope)

Solution

Use retry or retryWhen. This way you suppress exception by emitting some data on exception, then you'll restart the connection to the flow right away, so it'll continue emitting its data.

riskyFlow
        .retryWhen { cause, attempt ->
            emit("emit on error")
            true
        }
        .onEach { println(it) }
        .launchIn(GlobalScope)

The output is:

I/System.out: Server update
I/System.out: Server update
I/System.out: emit on error
I/System.out: Server update
I/System.out: Server update
Catechu answered 4/8, 2022 at 20:29 Comment(0)
J
0

This is not possible in your current example since the last 2 lines in your flow are unreachable.

you should deal with the exception inside of your flow meaning catch the exception in the flow and emit 5 in your example.

Like this

val myFlow = flow {
        emit("1")
        delay(2000)
        emit("2")
        delay(2000)
        emit("3")
        delay(2000)
        emit("4")
        delay(2000)
        try {
            throw Exception() // here it would throw an error
        } catch (e: Exception) {
            emit("5")
        }
        delay(10000)
        emit("6")  // because the flow completes on error, it doesn't emit this
    }
Jammie answered 30/11, 2020 at 19:21 Comment(2)
@ArchieG.Quiñones cold flows follow standard execution logic. Your question is equivalent to "how to continue to execute body of a method after exception is thrown". This is not possible because exception throwing breaks the execution, that's how JVM works.Cloverleaf
This is wrong. Since exception is catched in flow, flow does not complete and "6" is emitted.Jule
G
0

Encountered same problem today, with some extension functions magic here is my solution:

import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking

fun main() {
    runBlocking {
        flow {
            for (i in 0..10) {
                emit(i)
            }
        }
            .allowErrors()
            .onEachSuccess {
                if (it == 3 || it == 6) {
                    throw RuntimeException("$it !!!")
                }
            }
            .mapSuccess { "Success :$it; " }
            .onEachError {
                println("ERROR: ${it.message}")
            }
            .reduceSuccess { a, b -> a + b }
            .also { println(it) }
    }   
}

fun <T> Flow<T>.allowErrors(): Flow<Pair<T?, Throwable?>> = transform { value ->
    return@transform emit(value to null)
}

fun <T> Flow<Pair<T?, Throwable?>>.filterSuccess(predicate: suspend (T) -> Boolean): Flow<Pair<T?, Throwable?>> =
    transform { value ->
        if (value.second == null) {
            try {
                if (predicate(value.first!!)) return@transform emit(value)
            } catch (e: Exception) {
                emit(value.first to e)
            }
        } else {
            return@transform emit(value)
        }
    }


fun <T> Flow<Pair<T?, Throwable?>>.onEachSuccess(action: suspend (T) -> Unit): Flow<Pair<T?, Throwable?>> =
    transform { value ->
        if (value.second == null) {
            try {
                action(value.first!!)
                return@transform emit(value)
            } catch (e: Exception) {
                emit(value.first to e)
            }
        } else {
            return@transform emit(value)
        }
    }

fun <T, R> Flow<Pair<T?, Throwable?>>.mapSuccess(transform: suspend (T) -> R): Flow<Pair<R?, Throwable?>> =
    transform { value ->
        if (value.second == null) {
            try {
                return@transform emit(transform(value.first!!) to null)
            } catch (e: Exception) {
                emit(null to e)
            }
        } else {
            return@transform emit(null to value.second)
        }
    }

fun <T> Flow<Pair<T?, Throwable?>>.onEachError(action: suspend (Throwable) -> Unit): Flow<Pair<T?, Throwable?>> =
    transform { value ->
        if (value.second != null) {
            action(value.second!!)
            return@transform emit(value)
        } else {
            return@transform emit(value)
        }
    }

suspend fun <T> Flow<Pair<T?, Throwable?>>.collectSuccess(collector: FlowCollector<T>) {
    [email protected] { it.second == null }.map { it.first!! }.collect(collector)
}

suspend fun <S, T : S> Flow<Pair<T?, Throwable?>>.reduceSuccess(operation: suspend (accumulator: S, value: T) -> S): S {
    return [email protected] { it.second == null }.map { it.first!! }.reduce(operation)
}

https://pl.kotl.in/HOpFfv6Hc

Gulledge answered 12/9, 2023 at 18:15 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.