Kotlin flow - how to handle cancelation
Asked Answered
S

3

5

I'm learning kotlin coroutines and flows and one thing is a little bit obscure to me. In case I have a long running loop for the regular coroutines I can use isActive or ensureActive to handle cancelation. However those are not defined for a flow but nevertheless the following code properly finishes the flow:

import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking
import org.slf4j.LoggerFactory

private val logger = LoggerFactory.getLogger("Main")

fun main() {
    val producer = FlowProducer()
    runBlocking {
        producer
            .produce()
            .take(10)
            .collect {
                logger.info("Received $it")
            }
    }
    logger.info("done")
}


class FlowProducer {
    fun produce() = flow {
        try {
            var counter = 1
            while (true) {
                logger.info("Before emit")
                emit(counter++)
                logger.info("After emit")
            }
        }finally {
            logger.info("Producer has finished")
        }

    }.flowOn(Dispatchers.IO)
}

Why is that a case? Is it because the emit is a suspendable function that handles cancelation for me? What to do in case the emit is called conditionally? For example that loop actually polls records from Kafka and calls emit only when the received records are not empty. Then we can have the situation that:

  1. We want 10 messages(take 10)
  2. Actually there are only 10 messages on the kafka topic
  3. Since there are no more messages the emit won't be called again and therefore even though we received all messages we want, the loop will continue to waste resources on unnecessary polling.

Not sure if my understanding is correct. Should I call yield() on each loop in such case?

Shrift answered 23/7, 2021 at 10:43 Comment(1)
I think you need to be more precise with your questioning. With this code you can surely get all 10 messages. And if you just want to receive the first 10 messages, then you can just add the logic for that and keep doing the while loop until you emit 10 messages. There is no need for yield. If however you want to cancel the producer in a certain case wouldn't currentCoroutineContext().cancel() be the answer for what you are looking for? That will cancel the context of the current coroutine.Maghutte
T
6

The important thing to remember here is that flows are "cold", at least in their simple form. What that means is that a flow isn't capable of doing any work except while you are actively consuming data from it. A cold flow doesn't have a coroutine associated with it. You can learn a little more from this blog post by Roman Elizarov.

When you call collect on a flow, control is tranferred from the collector to the flow. This is what enables the flow to do work. The collector is effectively executing the code inside the flow. When the flow calls emit, control transfers back to the collector. If you're familiar with Kotlin's sequence builder, you can think of flows very similarly.

By definition, this means that if you stop collecting the flow, the flow stops doing any work. In your case, because you used take(10), the collector will stop executing the flow once it has received ten items. Because the collector is the thing that's actually executing the loop inside the flow, the loop doesn't continue to run when the collector is no longer collecting. Once you stop using the flow, it's just like an iterator that's no longer being iterated over. It can be garbage collected like any other object.

You asked whether you should call yield() inside your flow. There are some situations where this could be useful, and you can read more about flow cancellation checks in the docs. In your case, it's not necessary, because:

  1. The cancellation checks are only needed to detect when something has cancelled the coroutine that is executing the flow. When the flow aborts itself, such as when take(10) has emitted 10 items, it simply terminates normally, without cancelling any coroutines.
  2. The flow is built using emit, which already checks for cancellation.

Even when cancellation checks aren't required, it's still possible to create a flow that runs forever. As mentioned above, control only transfers back to the collector each time the flow calls emit. So if your flow runs indefinitely without calling emit, it will never return control back to the collector. This is the same as writing an infinite loop in normal code, and isn't particularly special to flows.

Note that it is possible to create a hot flow that has a coroutine doing work in the background. In that case, you would need to make sure that the coroutine responds correctly to cancellation of the flow.

Tyndall answered 26/7, 2021 at 11:56 Comment(1)
Thanks. What is confusing is that after taking all elements using take() there's a need to call emit one more time to finish the flow. As I said, having only 10 elements in kafka topic, and using take(10) it doesn't stop. There's a need for the 11th element in the topic so that it calls emit for the 11th time and the flow actually stops without emitting this element. Using yield() actually solved the issue as it can then either stop on emit() or yield() in such case.Shrift
H
1
// some flow
.onCompletion {
    val isCancelled = (it is CancellationException)
}
Halftrack answered 7/11, 2023 at 13:33 Comment(0)
R
0

Yes, emit will throw CancellationException when take cancels the flow.

The Kafka example you give will actually work, because take will cancel the flow at the end of the 10th emit, not at the start of the 11th.

Rani answered 23/7, 2021 at 12:6 Comment(5)
Are you sure? Why this slighltly modified code doesn't work then? It never stops: class FlowProducer { fun produce() = flow { try { var counter = 1 while (true) { if (counter <= 10){ logger.info("Before emit $counter") emit(counter) logger.info("After emit $counter") counter++ } } } finally { logger.info("Producer has finished") } }.flowOn(Dispatchers.IO)Shrift
Ha, not entirely sure, now... I guess we'll have to look at the source. I will be quite disappointed if it waits until the 11th emit to cancel.Rani
I don't think emit could throw CancellationException in this scenario. After the flow is aborted by take, the collector stops executing the flow, so the call to emit would not be reached. Because take is a normal termination of the flow, it doesn't propagate a cancellation exception. Doing so would cause the coroutine collecting the flow to also fail with a cancellation exception, which isn't what you want when you use take.Tyndall
@Marcin, from the source, it looks like take will cancel the flow when you try to get the 11th item, before it returns control to the source. kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/…Rani
@MattTimmermans exactly what I've encountered. Having only 10 calls to emit() will run this loop forever even though I have all items I've requested. A little bit confusing. Adding yield() actually solved the problem.Shrift

© 2022 - 2025 — McMap. All rights reserved.