How to emit Flow value from different function? Kotlin Coroutines
Asked Answered
D

5

35

I have a flow :

val myflow = kotlinx.coroutines.flow.flow<Message>{}

and want to emit values with function:

override suspend fun sendMessage(chat: Chat, message: Message) {
    myflow.emit(message)
}

But compiler does not allow me to do this, is there any workarounds to solve this problem?

Dorelle answered 7/5, 2020 at 10:7 Comment(1)
Do you mean you cannot access externally declared and instantiated variable? You pass flow's reference to the function or, much better, encapsulate the reference to the flow in your class throw constructor/builder as a required parameter. Can you share an error with us?Neat
P
10

The answer of Animesh Sahu is pretty much correct. You can also return a Channel as a flow (see consumeAsFlow or asFlow on a BroadcastChannel).

But there is also a thing called StateFlow currently in development by Kotlin team, which is, in part, meant to implement a similar behavior, although it is unknown when it is going to be ready.

EDIT: StateFlow and SharedFlow have been released as part of a stable API (https://blog.jetbrains.com/kotlin/2020/10/kotlinx-coroutines-1-4-0-introducing-stateflow-and-sharedflow/). These tools can and should be used when state management is required in an async execution context.

Plunger answered 7/5, 2020 at 11:46 Comment(3)
Correct +1 for that, they are capable of becoming flow, but note that they will still going to be hot.Hypergolic
It seems just a few hours ago the StateFlow was introduced in a coroutines release 1.3.6Plunger
Per https://mcmap.net/q/450408/-kotlin-mutablestateflow-collect-is-dropping-values there are limitations based on collection speed caused by StateFlow being conflated. Be careful!Impinge
O
20

You can use StateFlow or SharedFlow APIs for such use case. Here's a sample code with the usage of StateFlow.

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

val chatFlow = MutableStateFlow<String>("")
    
fun main() = runBlocking {

    // Observe values
    val job = launch {
        chatFlow.collect {
            print("$it ")
        }
    }

    // Change values
    arrayOf("Hey", "Hi", "Hello").forEach {
        delay(100)
        sendMessage(it)
    }

    delay(1000)
    
    // Cancel running job
    job.cancel()
    job.join()
}

suspend fun sendMessage(message: String) {
    chatFlow.value = message
}

You can test this code by running below snippet.

<iframe src="https://pl.kotl.in/DUBDfUnX3" style="width:600px;"></iframe>
Orient answered 23/5, 2020 at 5:13 Comment(0)
P
10

The answer of Animesh Sahu is pretty much correct. You can also return a Channel as a flow (see consumeAsFlow or asFlow on a BroadcastChannel).

But there is also a thing called StateFlow currently in development by Kotlin team, which is, in part, meant to implement a similar behavior, although it is unknown when it is going to be ready.

EDIT: StateFlow and SharedFlow have been released as part of a stable API (https://blog.jetbrains.com/kotlin/2020/10/kotlinx-coroutines-1-4-0-introducing-stateflow-and-sharedflow/). These tools can and should be used when state management is required in an async execution context.

Plunger answered 7/5, 2020 at 11:46 Comment(3)
Correct +1 for that, they are capable of becoming flow, but note that they will still going to be hot.Hypergolic
It seems just a few hours ago the StateFlow was introduced in a coroutines release 1.3.6Plunger
Per https://mcmap.net/q/450408/-kotlin-mutablestateflow-collect-is-dropping-values there are limitations based on collection speed caused by StateFlow being conflated. Be careful!Impinge
H
4

Flow is self contained, once the block (lambda) inside the flow is executed the flow is over, you've to do operations inside and emit them from there.

Here is the similar github issue, says:

Afaik Flow is designed to be a self contained, replayable, cold stream, so emission from outside of it's own scope wouldn't be part of the contract. I think what you're looking for is a Channel.

And IMHO you're probably looking at the Channels, or specifically a ConflatedBroadcastChannel for multiple receivers. The difference between a normal channel and a broadcast channel is that multiple receivers can listen to a broadcast channel using openSubscription function which returns a ReceiveChannel associated with the BroadcastChannel.

Hypergolic answered 7/5, 2020 at 10:34 Comment(0)
F
3

Use a SharedStateFlow it has got everything you need.

Initialization of your flow:

val myFlow = MutableSharedFlow<Message>()

and now it should just work as you were trying earlier with:

override suspend fun sendMessage(chat: Chat, message: Message) {
    myFlow.emit(message)
}
Fortyniner answered 5/10, 2021 at 9:22 Comment(0)
B
0

You need a channel like:

 private val _navigationAction = Channel<Route>(Channel.CONFLATED)
override val navigationAction : Flow<Route> = _navigationAction.receiveAsFlow()


override fun navigateTo(target: Route) {
    _navigationAction.trySend(target)
}
Blythe answered 22/6, 2023 at 9:43 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.