Kotlin Flow: Testing hangs
Asked Answered
J

1

5

I am trying to test Kotlin implementation using Flows. I use Kotest for testing. This code works:

ViewModel:

val detectedFlow = flow<String> {
    emit("123")
    delay(10L)
    emit("123")
}

Test:

class ScanViewModelTest : StringSpec({
    "when the flow contains values they are emitted" {
        val detectedString = "123"
        val vm = ScanViewModel()
        launch {
            vm.detectedFlow.collect {
                it shouldBe detectedString
            }
        }
    }
})

However, in the real ViewModel I need to add values to the flow, so I use ConflatedBroadcastChannel as follows:

private val _detectedValues = ConflatedBroadcastChannel<String>()
val detectedFlow = _detectedValues.asFlow()

suspend fun sendDetectedValue(detectedString: String) {
    _detectedValues.send(detectedString)
}

Then in the test I try:

"when the flow contains values they are emitted" {
    val detectedString = "123"
    val vm = ScanViewModel()
    runBlocking {
        vm.sendDetectedValue(detectedString)
    }
    runBlocking {
        vm.detectedFlow.collect { it shouldBe detectedString }
    }
}

The test just hangs and never completes. I tried all kind of things: launch or runBlockingTest instead of runBlocking, putting sending and collecting in the same or separate coroutines, offer instead of send... Nothing seems to fix it. What am I doing wrong?

Update: If I create flow manually it works:

private val _detectedValues = ConflatedBroadcastChannel<String>()
val detectedFlow =  flow {
    this.emit(_detectedValues.openSubscription().receive())
}

So, is it a bug in asFlow() method?

Juryrigged answered 13/5, 2020 at 10:36 Comment(2)
did you tried launching on other dispatchers, just for debugging purposes?Oracle
yes i did. no effect. btw, I tried newly released StateFlow/MutableStateFlow and the issue persistsJuryrigged
S
11

The problem is that the collect function you used in your test is a suspend function that will suspend the execution until the Flow is finished.

In the first example, your detectedFlow is finite. It will just emit two values and finish. In your question update, you are also creating a finite flow, that will emit a single value and finish. That is why your test works.

However, in the second (real-life) example the flow is created from a ConflatedBroadcastChannel that is never closed. Therefore the collect function suspends the execution forever. To make the test work without blocking the thread forever, you need to make the flow finite too. I usually use the first() operator for this. Another option is to close the ConflatedBroadcastChannel but this usually means modifications to your code just because of the test which is not a good practice.

This is how your test would work with the first() operator

"when the flow contains values they are emitted" {
    val detectedString = "123"
    val vm = ScanViewModel()
    runBlocking {
        vm.sendDetectedValue(detectedString)
    }
    runBlocking {
        vm.detectedFlow.first() shouldBe detectedString
    }
}
Squirm answered 7/6, 2020 at 7:31 Comment(4)
What if our entire test function is marked as runBlocking?Marlowe
@Marlowe I believe entire test function would also hang in that case. How do we handle such cases where production code uses collect on flow and channels?Palladino
@TomášHavlíček collect doesn't hang, as it is asynchronousMarlowe
@Marlowe Collect suspends until flow endsLanai

© 2022 - 2024 — McMap. All rights reserved.