How does Continuation work in Kotlin Coroutine?
Asked Answered
S

3

11

I am studying about CPS. I was wondering how does it work.

Object createPost(
    Token token,
    Item item,
    Continuation<Post> const){...}
interface Continuation<in T> {
    val context: CoroutineContext
    fun resume(value: T)
    fun resumeWithException(exception: Throwable)
}

People says CPS is just callbacks and nothing more than that.

But

  1. I don't know why interface is used in here as a parameter.
  2. I don't know what does <in T> do in the Continuation interface.
  3. Continuation is a parameter but, what does it do actually inside and how is it be called under the hood?
Sherlock answered 11/9, 2022 at 13:1 Comment(0)
S
2

The continuation is an object that store the state of coroutine.It must also store local variable and place where coroutine was suspended . This is an object that we can use to resume the coroutine by resume() or resumeWith() or resumeWithException() .

suspendCancellableCoroutine is a function from the kotlinx.coroutines library. Instead, we could use the suspendCoroutine function from Kotlin standard library, which would behave the same and provide continuation object .i will use suspendCancellableCoroutine that suspend the execution of coroutine and also allow the suspension to be cancellable.I will use it later .

///code 1
    suspend  fun getEmployee(){
          println("Start main()")
          var inc: Int = 0
          val id = getID()
          inc++
          println("End main() $inc $id")
      }

In above our suspension point is getID() now continuation store local variables of getEmployees()that will be used after the suspension point (getID()) like in above we need inc because they are used after suspension point .

There are a few ways in which suspending functions could have been implemented, but the Kotlin team decided on an option called continuation-passing style. This means that continuations (explained in the previous chapter) are passed from function to function as arguments. By convention, a continuation takes the last parameter position. In our code it will look like that

    suspend fun getStudent(rollno : Int):Student?
    suspend fun setStudentRollno(name : String ):Unit 
    suspend fun showData():Unit 
    
 // under hood 
fun getStudent(rollno : Int , continuation : Continuation<*>):Any?
fun setStudentRollno(name : String , continuation : Continuation<*>):Any
fun showData(continuation : Continuation<*>):Any

You might have also noticed that the result type under the hood is different from the originally declared one. It has changed to Any or Any?. Why so? The reason is that a suspending function might be suspended, and so it might not return a declared type. In such a case, it returns a special COROUTINE_SUSPENDED marker, which we will later see in practice. For now, just notice that since getStudent might return Student? or COROUTINE_SUSPENDED (which is of type Any), its result type must be the closest supertype of Student? and Any, so it is Any?.We will discuss later in more detail .

 //code 1 under hood look like  
fun getEmployee(continuation :
 Continuation<*>):Any 
fun getID(continuation : Continuation<*>):Any

The next thing is that this getEmpolyess needs its own continuation in order to remember its state. Let's name it getEmployeeContinuation. when getEmployees() is called it first check continuation that comes from parameter is its own continuation(getEmployeeContinuation) or caller continuation(continuation). if continuation is caller continuation(continuation) it means it is beginning of its body .but if continuation is its own continuation(getEmployeeContinuation) it means it is request to resume from suspension point (getID()).and we will see who will request.

At the beginning of its body, getEmployee will wrap the continuation (the parameter) of its caller with its own continuation (getEmployeeContinuation).

val currentContinuation = getEmployeeContinuation(continuation)

This should be done only if the continuation isn't wrapped already. If it is, this is part of the resume process, and we should keep the continuation unchanged(we will see how this will done in code)

val currentContinuation =
  if (continuation is getEmployeeContinuation) continuation
  else getEmployeeContinuation(continuation)

now let look getID()

    suspend fun getID(): Int {
    var text = "hello world"
    println("before")
    suspendCancellableCoroutine<Int> { getIDContinuation ->
        thread {
            Thread.sleep(1000)
            getIDContinuation.resumeWith(Result.success(Unit))
        }
    }
    text += "hh"
    println("after. $text")
    return 1
}

basic `s concept :

inline fun <T> Continuation<T>.resume(value: T): Unit =
    resumeWith(Result.success(value))

inline fun <T> Continuation<T>.resumeWithException(
    exception: Throwable
): Unit = resumeWith(Result.failure(exception))

if result is successfull then resume excution of coroutine as return value of suspension otherwise exception is re thrown right after suspension point.

getEmployees() function could be started from two places: either from the beginning (in the case of a first call) or from the point after suspension (in the case of resuming from continuation). To identify the current state, we use a field called label. At the start, it is 0, therefore the function will start from the beginning. However, it is set to the next state before each suspension point so that we start from just after the suspension point after a resume.

The actual code that continuations and suspending functions are compiled to is more complicated as i am giving pseudo code

getEmployee() : pseudo code

  class  GetEmployeeContinuation( val cont : Continuation<Unit>) : Continuation<Int> {
 override   val   context : CoroutineContext 
               get() : cont.context 
  var label : Int = 0 //track state 
  var  inc : Int = 0 
  var  result : Result<Any>? = null
   override fun resumeWith(result : Result<Int>) {
   this.result = result 
   val res = 
       try{
         // request to resume 
         val r = getEmployee(this)
        if(r == COROUTINE_SUSPENDED){
         return
        }
       Result.success(r as Unit)
      }catch(ex : Throwable ) { Result.failure(ex)}
     cont.resumeWith(res)
   } 
}

fun getEmployee( cont : Continuation<*>) : Any {
   val currentContinuation =
  if( cont is  GetEmployeeContinuation ) cont
  else GetEmployeeContinuation(cont) 
 var result: Result<Any>? = currentContinuation.result
if( currentContinuation.label ==0 ){
 println(“start main()”)
 currentContinuation.inc = 0 
 // set state so we can start execution of coroutine where we left
  currentContinuation.label = 1 
 val res = getID(currentContinuation)
  if( res == COROUTINE_SUSPENDED) return  COROUTINE_SUSPENDED
   result = Result.success(res as Int)
}
if(currentContinuation.label == 1){
  val id = result!!.getOrThrow() as Int
  currentContinuation.inc = currentContinuation.inc + 1  
  println(“End main() ${currentContinuation.inc}  $id”)
   return Unit
}
}

getID() : pseudo code

fun getID(cont: Continuation<*>): Any {
    val currentContinuation =
        if (cont is GetIDContinuation) cont
        else GetIDContinuation(cont)
    if (currentContinuation.label == 0) {
        println("before")
        currentContinuation.text = "hello world"
        currentContinuation.label = 1
        if (Thread.sleep(1000) == COROUTINE_SUSPENDED) return COROUTINE_SUSPENDED
    }
    if (currentContinuation.label == 1) {
        currentContinuation.text += "hh"
        println("after ${currentContinuation.text}")
        return 1
    }
    error("Impossible")
}

class GetIDContinuation(val cont: Continuation<Int>) : Continuation<Unit> {
    override val context: CoroutineContext
        get() = cont.context

    var text: String? = null
    var result: Result<Any>? = null
    var label: Int = 0

    override fun resumeWith(result: Result<Unit>) {
        this.result = result
        val res =
            try {
                val r = getID(this)
                if (r == COROUTINE_SUSPENDED) return
                Result.success(r as Int)
            } catch (ex: Exception) {
                Result.failure(ex)
            }
        cont.resumeWith(res)
    }
}

Let take One scene

In above getEmployee() is called then it make its own continuation object(GetEmployeeContinuation) and start execution from beginning and before calling getID() by passing its own continuation object(GetEmployeeContinuation) ,it update its state (set object GetEmployeeContinuation.label = 1) so we resume the execution of coroutine where we was suspended and also store local variable in object GetEmployeeContinuation and getID() make its own continuation object(GetIDContinuation) and start execution from beginning and it comes last suspension point(Thread.sleep())before calling last suspension point getID() also set its label to 1 and then getID() return COROTUINE_SUSPENDED and in getEmployee() the res = getID(currentContinuation ) --> res = COROUTINE_SUSPENDED so getEmployee() also return COROUTINE_SUSPENDED and whole call stack fold so the thread is free by returning from the function and can do something else .

after 1 second this line will run
getIDContinuation.resumeWith(Result.success(Unit)) and resumeWith() of class GetIDContinuation() will run by passing result as Unit that is why getIDContinuation has continuation(Unit)

In resumeWith() of GetEmployeeContinuation

The cont variable is of type Continuation(Int), which means it expects a value of type Int when it is resumed.when continuatin resume the execution of coroutine then resumeWith() method is called with a Result parameter. When the coroutine is resumed, the result of the continuation is set to the value(Result parameter) passed to resumeWith(). In this case, the value is an Int, so the result will have a data type of Any.

now we have set getIDContinuation.label is equal to 1 before calling Thread.sleep we start execution of coroutine where it was suspended so getID() is call again in resumeWith() of class GetIDContinuation by passing its own continuation object (getIDContinuation)(means resume ) to get value of r and getID() completes its execution and return 1 so variable r is 1 and res is 1 so now this line will run cont.resumeWith(res)

here cont = GetEmployeeContinuation , so resumeWith(1) of class GetEmployeeContinuation will be called and result of GetEmplyeeContinuation will be update to 1. getEmployeeContinuation.label =1 was set before calling getID() .so the execution of coroutine was starting where we have left . In resumeWith() of GetEmployeeContinuation we resume the execution of getEmployee() by calling it by passing its own continuaion object (GetEmployeeContinuation) and completed its execution and then it return unit that will be values of res and GetEmplyeeContinuation resume the excution of its caller when this line will execute cont.resumeWith(res) and process continuous in entire call stack .

actually i get link which help me to answer coroutine under hood

Selfpronouncing answered 15/6, 2024 at 21:3 Comment(0)
V
25

End-user perspective

For the end-user the situation is relatively simple: continuation represents an execution flow that was suspended. It allows to resume the execution by invoking resume() or resumeWithException().

For example, assume we want to suspend for a second and then resume execution. We ask coroutines machinery to suspend, it provides a continuation object, we store it and at a later time we invoke resume() on it. Continuation object "knows" how to resume the execution:

suspend fun foo() {
    println("foo:1")
    val result = suspendCoroutine { cont ->
        thread {
            Thread.sleep(1000)
            cont.resume("OK")
        }
    }
    println("foo:2:$result")
}

suspendCoroutine() is one of possible ways to suspend and acquire a continuation to resume later. thread() and Thread.sleep() is just for demo purposes - usually, we should use delay() instead.

Very often we suspend to acquire some kind of data. This is why continuations support resuming with a result value. In above example we can see that the result of suspendCoroutine() is stored as result and we resume the continuation by passing "OK". This way after resuming result holds "OK". That explains <in T>.

Internals

This is much more complicated. Kotlin is executed in runtimes that don't support coroutines or suspending. For example, JVM can't wait inside a function without blocking any threads. This is simply not possible (I intentionally ignore Project Loom here). To make this possible, Kotlin compiler has to manipulate the bytecode and continuations take important part in this process.

As you noticed, every suspend function receives additional parameter of Continuation type. This object is used to control the process of resuming, it helps returning to the function caller and it holds the current coroutine context. Additionally, suspend functions return Any/Object to allow to signal their state to the caller.

Assume we have another function calling the first one:

suspend fun bar() {
    println("bar:1")
    foo()
    println("bar:2")
}

Then we invoke bar(). Bytecode of both foo() and bar() is much more complicated than you would expect by looking at above source code. This is what's happening:

  1. bar() is invoked with a continuation of its caller (let's ignore for now what does that mean).
  2. bar() checks if it "owns" the passed continuation. It sees not, so it assumes this is a continuation of its caller and that this is the initial execution of bar().
  3. bar() creates its own continuation object and stores the caller's continuation inside it.
  4. bar() starts executing as normal and gets to foo() point.
  5. It stores the local state, so the code offset, values of local variables, etc. in its continuation.
  6. bar() invokes foo() passing its continuation.
  7. foo() checks if it owns the passed continuation. It doesn't, continuation is owned by bar(), so foo() creates its own continuation, stores bar()'s continuation in it and starts a normal execution.
  8. Execution gets to suspendCoroutine() and similarly as earlier, the local state is stored inside foo()'s continuation.
  9. Continuation of foo() is provided to the end-user inside the lambda passed to suspendCoroutine().
  10. Now, foo() wants to suspend its execution, so it... returns... Yes, as said earlier, waiting without blocking the thread is not possible, so the only way to free the thread is by returning from the function.
  11. foo() returns with a special value that says: "execution was suspended".
  12. bar() reads this special value and also suspends, so also immediately returns.
  13. The whole call stack folds and the thread is free to go do something else.
  14. 1 second passes and we invoke cont.resume().
  15. Continuation of foo() knows how to resume the execution from the suspendCoroutine() point.
  16. Continuation invokes foo() function passing itself as a parameter.
  17. foo() checks if it owns the passed continuation - this time it does, so it assumes this is not an initial call to foo(), but it is a request to resume execution. It reads the stored state from the continuation, it loads local variables and jumps to the proper code offset.
  18. Execution progresses normally until it gets to the point where it needs to return from foo() to bar().
  19. foo() knows that this time it was not invoked by bar(), so simply returning won't work. But it still keeps a continuation of its caller, so bar() suspended at exactly the point where foo() needs to return.
  20. foo() returns with magic value that says: "resume the continuation of my caller".
  21. Continuation of bar() is resumed from the point where it executed foo().
  22. Process continues.

As you can see, this is pretty complicated. Normally, users of coroutines should not need to understand how they work internally.

Additional important notes:

  • If foo() would not suspend, it would return normally to bar() and bar() would continue execution as usual. This is to decrease the overhead of the whole process in the case suspending is not needed.
  • When resuming, continuations don't invoke their functions directly, but they ask the dispatcher to do it. Dispatcher is stored inside CoroutineContext, so also inside the continuation.
  • Note that because continuations keep a reference to the caller's continuation, they form a chain of continuations. This can be used to produce the stack trace as the real call stack has been lost when suspending.
Versicular answered 11/9, 2022 at 15:3 Comment(4)
If anyone notice any mistakes in this explanation, please let me know :-)Versicular
Where does the continuation variable stored after step 13 ? Because foo() has been removed from the call stack (along with it's local variable)Rupertruperta
@Rupertruperta Local variables and code offset have been stored inside the continuation object. bar() does this in 5. and foo() in 8. And the continuation itself is provided to whoever is going to resume it later (9.).Versicular
Thank you. Will go watch some more videos about internal coroutines and LoomComforter
S
2

The continuation is an object that store the state of coroutine.It must also store local variable and place where coroutine was suspended . This is an object that we can use to resume the coroutine by resume() or resumeWith() or resumeWithException() .

suspendCancellableCoroutine is a function from the kotlinx.coroutines library. Instead, we could use the suspendCoroutine function from Kotlin standard library, which would behave the same and provide continuation object .i will use suspendCancellableCoroutine that suspend the execution of coroutine and also allow the suspension to be cancellable.I will use it later .

///code 1
    suspend  fun getEmployee(){
          println("Start main()")
          var inc: Int = 0
          val id = getID()
          inc++
          println("End main() $inc $id")
      }

In above our suspension point is getID() now continuation store local variables of getEmployees()that will be used after the suspension point (getID()) like in above we need inc because they are used after suspension point .

There are a few ways in which suspending functions could have been implemented, but the Kotlin team decided on an option called continuation-passing style. This means that continuations (explained in the previous chapter) are passed from function to function as arguments. By convention, a continuation takes the last parameter position. In our code it will look like that

    suspend fun getStudent(rollno : Int):Student?
    suspend fun setStudentRollno(name : String ):Unit 
    suspend fun showData():Unit 
    
 // under hood 
fun getStudent(rollno : Int , continuation : Continuation<*>):Any?
fun setStudentRollno(name : String , continuation : Continuation<*>):Any
fun showData(continuation : Continuation<*>):Any

You might have also noticed that the result type under the hood is different from the originally declared one. It has changed to Any or Any?. Why so? The reason is that a suspending function might be suspended, and so it might not return a declared type. In such a case, it returns a special COROUTINE_SUSPENDED marker, which we will later see in practice. For now, just notice that since getStudent might return Student? or COROUTINE_SUSPENDED (which is of type Any), its result type must be the closest supertype of Student? and Any, so it is Any?.We will discuss later in more detail .

 //code 1 under hood look like  
fun getEmployee(continuation :
 Continuation<*>):Any 
fun getID(continuation : Continuation<*>):Any

The next thing is that this getEmpolyess needs its own continuation in order to remember its state. Let's name it getEmployeeContinuation. when getEmployees() is called it first check continuation that comes from parameter is its own continuation(getEmployeeContinuation) or caller continuation(continuation). if continuation is caller continuation(continuation) it means it is beginning of its body .but if continuation is its own continuation(getEmployeeContinuation) it means it is request to resume from suspension point (getID()).and we will see who will request.

At the beginning of its body, getEmployee will wrap the continuation (the parameter) of its caller with its own continuation (getEmployeeContinuation).

val currentContinuation = getEmployeeContinuation(continuation)

This should be done only if the continuation isn't wrapped already. If it is, this is part of the resume process, and we should keep the continuation unchanged(we will see how this will done in code)

val currentContinuation =
  if (continuation is getEmployeeContinuation) continuation
  else getEmployeeContinuation(continuation)

now let look getID()

    suspend fun getID(): Int {
    var text = "hello world"
    println("before")
    suspendCancellableCoroutine<Int> { getIDContinuation ->
        thread {
            Thread.sleep(1000)
            getIDContinuation.resumeWith(Result.success(Unit))
        }
    }
    text += "hh"
    println("after. $text")
    return 1
}

basic `s concept :

inline fun <T> Continuation<T>.resume(value: T): Unit =
    resumeWith(Result.success(value))

inline fun <T> Continuation<T>.resumeWithException(
    exception: Throwable
): Unit = resumeWith(Result.failure(exception))

if result is successfull then resume excution of coroutine as return value of suspension otherwise exception is re thrown right after suspension point.

getEmployees() function could be started from two places: either from the beginning (in the case of a first call) or from the point after suspension (in the case of resuming from continuation). To identify the current state, we use a field called label. At the start, it is 0, therefore the function will start from the beginning. However, it is set to the next state before each suspension point so that we start from just after the suspension point after a resume.

The actual code that continuations and suspending functions are compiled to is more complicated as i am giving pseudo code

getEmployee() : pseudo code

  class  GetEmployeeContinuation( val cont : Continuation<Unit>) : Continuation<Int> {
 override   val   context : CoroutineContext 
               get() : cont.context 
  var label : Int = 0 //track state 
  var  inc : Int = 0 
  var  result : Result<Any>? = null
   override fun resumeWith(result : Result<Int>) {
   this.result = result 
   val res = 
       try{
         // request to resume 
         val r = getEmployee(this)
        if(r == COROUTINE_SUSPENDED){
         return
        }
       Result.success(r as Unit)
      }catch(ex : Throwable ) { Result.failure(ex)}
     cont.resumeWith(res)
   } 
}

fun getEmployee( cont : Continuation<*>) : Any {
   val currentContinuation =
  if( cont is  GetEmployeeContinuation ) cont
  else GetEmployeeContinuation(cont) 
 var result: Result<Any>? = currentContinuation.result
if( currentContinuation.label ==0 ){
 println(“start main()”)
 currentContinuation.inc = 0 
 // set state so we can start execution of coroutine where we left
  currentContinuation.label = 1 
 val res = getID(currentContinuation)
  if( res == COROUTINE_SUSPENDED) return  COROUTINE_SUSPENDED
   result = Result.success(res as Int)
}
if(currentContinuation.label == 1){
  val id = result!!.getOrThrow() as Int
  currentContinuation.inc = currentContinuation.inc + 1  
  println(“End main() ${currentContinuation.inc}  $id”)
   return Unit
}
}

getID() : pseudo code

fun getID(cont: Continuation<*>): Any {
    val currentContinuation =
        if (cont is GetIDContinuation) cont
        else GetIDContinuation(cont)
    if (currentContinuation.label == 0) {
        println("before")
        currentContinuation.text = "hello world"
        currentContinuation.label = 1
        if (Thread.sleep(1000) == COROUTINE_SUSPENDED) return COROUTINE_SUSPENDED
    }
    if (currentContinuation.label == 1) {
        currentContinuation.text += "hh"
        println("after ${currentContinuation.text}")
        return 1
    }
    error("Impossible")
}

class GetIDContinuation(val cont: Continuation<Int>) : Continuation<Unit> {
    override val context: CoroutineContext
        get() = cont.context

    var text: String? = null
    var result: Result<Any>? = null
    var label: Int = 0

    override fun resumeWith(result: Result<Unit>) {
        this.result = result
        val res =
            try {
                val r = getID(this)
                if (r == COROUTINE_SUSPENDED) return
                Result.success(r as Int)
            } catch (ex: Exception) {
                Result.failure(ex)
            }
        cont.resumeWith(res)
    }
}

Let take One scene

In above getEmployee() is called then it make its own continuation object(GetEmployeeContinuation) and start execution from beginning and before calling getID() by passing its own continuation object(GetEmployeeContinuation) ,it update its state (set object GetEmployeeContinuation.label = 1) so we resume the execution of coroutine where we was suspended and also store local variable in object GetEmployeeContinuation and getID() make its own continuation object(GetIDContinuation) and start execution from beginning and it comes last suspension point(Thread.sleep())before calling last suspension point getID() also set its label to 1 and then getID() return COROTUINE_SUSPENDED and in getEmployee() the res = getID(currentContinuation ) --> res = COROUTINE_SUSPENDED so getEmployee() also return COROUTINE_SUSPENDED and whole call stack fold so the thread is free by returning from the function and can do something else .

after 1 second this line will run
getIDContinuation.resumeWith(Result.success(Unit)) and resumeWith() of class GetIDContinuation() will run by passing result as Unit that is why getIDContinuation has continuation(Unit)

In resumeWith() of GetEmployeeContinuation

The cont variable is of type Continuation(Int), which means it expects a value of type Int when it is resumed.when continuatin resume the execution of coroutine then resumeWith() method is called with a Result parameter. When the coroutine is resumed, the result of the continuation is set to the value(Result parameter) passed to resumeWith(). In this case, the value is an Int, so the result will have a data type of Any.

now we have set getIDContinuation.label is equal to 1 before calling Thread.sleep we start execution of coroutine where it was suspended so getID() is call again in resumeWith() of class GetIDContinuation by passing its own continuation object (getIDContinuation)(means resume ) to get value of r and getID() completes its execution and return 1 so variable r is 1 and res is 1 so now this line will run cont.resumeWith(res)

here cont = GetEmployeeContinuation , so resumeWith(1) of class GetEmployeeContinuation will be called and result of GetEmplyeeContinuation will be update to 1. getEmployeeContinuation.label =1 was set before calling getID() .so the execution of coroutine was starting where we have left . In resumeWith() of GetEmployeeContinuation we resume the execution of getEmployee() by calling it by passing its own continuaion object (GetEmployeeContinuation) and completed its execution and then it return unit that will be values of res and GetEmplyeeContinuation resume the excution of its caller when this line will execute cont.resumeWith(res) and process continuous in entire call stack .

actually i get link which help me to answer coroutine under hood

Selfpronouncing answered 15/6, 2024 at 21:3 Comment(0)
P
0

continuation represents an execution flow that was suspended. It allows to resume the execution by invoking resume() or resumeWithException().

var continuation: Continuation? = null

fun callbackMethod(continuation: Continuation) {
        // run this method after a task is done.
        // It's a call back method
        mContinuation?.resume(Unit)
}

suspend fun main() {
    suspendCancellableCoroutine<Unit> { cont ->
        continuation = cont
        // do some task that calls callbackMethod at the end
    }
}
Phrixus answered 13/4, 2023 at 7:59 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.