Why I can't use an emit function in Kotlin Flow like rxJava.Single.create?
Asked Answered
B

2

5

I'm trying to rewrite interactors with rxjava chains to kotlin flow. In LocationHandlerImpl I'm using LocationService for getting my current location. In addOnSuccessListener and addOnFailureListener I'm emitting my model but having error:

emit

"Suspension function can be called only within coroutine body". Am i doing it wrong? But i can call emit outside of listeners (look below flow builder)

Betaine answered 22/11, 2019 at 8:31 Comment(0)
R
0

As mentioned from Prokash (here), Flows are designed to be self contained. Your location services listener is not within the scope of the Flow.

You can however check out the callbackFlow which provides you the mechanics you are looking for to build a flow using a Callback-based API.

Callback Flow Documentation, be aware that the callback flow is still in Experimental phase.

Reginareginald answered 22/11, 2019 at 9:4 Comment(5)
it worked for me. The error disappeared, I had to add an annotation about experimental apiBetaine
@IliyaMashin did the callbackFlow worked for you? If yes, you can mark the correct answer so your question is marked as solved.Reginareginald
it didn't. compile error disappears but i had new exception ClosedSendChannelException at runtime. when i called offer(T)Betaine
Can you post some actual code snippets on the current state of your code? It will help us a lot.Reginareginald
sorry. it was my mistake. i just forgot about "awaitClose". it works but i can't enter to onCompletion blockBetaine
A
6

It seems that you are trying to get the last location from the Android location service. This is one of many Task-returning calls in the Google Play Services. Kotlin already has a module, kotlinx-coroutines-play-services, that contributes a function

suspend fun <T> Task<T>.await(): T?

With that in your project, you can simply write this:

suspend fun getMyLocation(): Location? =
        LocationServices.getFusedLocationProvider(context)
                .lastLocation
                .await()

If you want to integrate it with other Flow-based code, add this wrapper function:

fun <T> Task<T>.asFlow() = flow { emit(await()) }

and now you can write

fun getLocationAsFlow(): Flow<Location?> =
        LocationServices.getFusedLocationProvider(context)
                .lastLocation
                .asFlow()

If, for educational purposes, you would like to see how it can be implemented directly, without the additional module, then the most straightforward approach would be as follows:

fun getLocationAsFlow() = flow {
    val location = suspendCancellableCoroutine<Location?> { cont ->
        LocationServices.getFusedLocationProvider(context)
                .lastLocation
                .addOnCompleteListener {
                    val e = exception
                    when {
                        e != null -> cont.resumeWithException(e)
                        isCanceled -> cont.cancel()
                        else -> cont.resume(result)
                    }
                }
    }
    emit(location)
}

This is the result of inlining a simplified implementation of Task.await() into its use site.

Apoloniaapolune answered 22/11, 2019 at 15:6 Comment(2)
Why does it need to be suspend function?Destalinization
For context, you were referring to getLocationAsFlow(), which doesn't have to be suspendable. Removed.Apoloniaapolune
R
0

As mentioned from Prokash (here), Flows are designed to be self contained. Your location services listener is not within the scope of the Flow.

You can however check out the callbackFlow which provides you the mechanics you are looking for to build a flow using a Callback-based API.

Callback Flow Documentation, be aware that the callback flow is still in Experimental phase.

Reginareginald answered 22/11, 2019 at 9:4 Comment(5)
it worked for me. The error disappeared, I had to add an annotation about experimental apiBetaine
@IliyaMashin did the callbackFlow worked for you? If yes, you can mark the correct answer so your question is marked as solved.Reginareginald
it didn't. compile error disappears but i had new exception ClosedSendChannelException at runtime. when i called offer(T)Betaine
Can you post some actual code snippets on the current state of your code? It will help us a lot.Reginareginald
sorry. it was my mistake. i just forgot about "awaitClose". it works but i can't enter to onCompletion blockBetaine

© 2022 - 2024 — McMap. All rights reserved.