I'm learning kotlin coroutines and flows and one thing is a little bit obscure to me. In case I have a long running loop for the regular coroutines I can use isActive or ensureActive to handle cancelation. However those are not defined for a flow but nevertheless the following code properly finishes the flow:
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking
import org.slf4j.LoggerFactory
private val logger = LoggerFactory.getLogger("Main")
fun main() {
val producer = FlowProducer()
runBlocking {
producer
.produce()
.take(10)
.collect {
logger.info("Received $it")
}
}
logger.info("done")
}
class FlowProducer {
fun produce() = flow {
try {
var counter = 1
while (true) {
logger.info("Before emit")
emit(counter++)
logger.info("After emit")
}
}finally {
logger.info("Producer has finished")
}
}.flowOn(Dispatchers.IO)
}
Why is that a case? Is it because the emit is a suspendable function that handles cancelation for me? What to do in case the emit is called conditionally? For example that loop actually polls records from Kafka and calls emit only when the received records are not empty. Then we can have the situation that:
- We want 10 messages(take 10)
- Actually there are only 10 messages on the kafka topic
- Since there are no more messages the emit won't be called again and therefore even though we received all messages we want, the loop will continue to waste resources on unnecessary polling.
Not sure if my understanding is correct. Should I call yield() on each loop in such case?
currentCoroutineContext().cancel()
be the answer for what you are looking for? That will cancel the context of the current coroutine. – Maghutte