How do Kotlin coroutines work internally?
Asked Answered
A

2

63

How does Kotlin implement coroutines internally?

Coroutines are said to be a "lighter version" of threads, and I understand that they use threads internally to execute coroutines.

What happens when I start a coroutine using any of the builder functions?

This is my understanding of running this code:

GlobalScope.launch {       <---- (A)
    val y = loadData()     <---- (B)  // suspend fun loadData() 
    println(y)             <---- (C)
    delay(1000)            <---- (D)
    println("completed")   <---- (E)
}
  1. Kotlin has a pre-defined ThreadPool at the beginning.
  2. At (A), Kotlin starts executing the coroutine in the next available free thread (Say Thread01).
  3. At (B), Kotlin stops executing the current thread, and starts the suspending function loadData() in the next available free thread (Thread02).
  4. When (B) returns after execution, Kotlin continues the coroutine in the next available free thread (Thread03).
  5. (C) executes on Thread03.
  6. At (D), the Thread03 is stopped.
  7. After 1000ms, (E) is executed on the next free thread, say Thread01.

Am I understanding this correctly? Or are coroutines implemented in a different way?


Update on 2021: Here's an excellent article by Manuel Vivo that complements all the answers below.

Aggravate answered 28/11, 2018 at 19:15 Comment(1)
I am trying to build a diagram (or an animation) of how things work internally, something that looks like this --- en.wikipedia.org/wiki/Thread_pool#/media/File:Thread_pool.svgAggravate
T
72

Coroutines are a completely separate thing from any scheduling policy that you describe. A coroutine is basically a call chain of suspend funs. Suspension is totally under your control: you just have to call suspendCoroutine. You'll get a callback object so you can call its resume method and get back to where you suspended.

Here's some code where you can see that suspension is a very direct and trasparent mechanism, fully under your control:

import kotlin.coroutines.*
import kotlinx.coroutines.*

var continuation: Continuation<String>? = null

fun main(args: Array<String>) {
    val job = GlobalScope.launch(Dispatchers.Unconfined) {
        while (true) {
            println(suspendHere())
        }
    }
    continuation!!.resume("Resumed first time")
    continuation!!.resume("Resumed second time")
}

suspend fun suspendHere() = suspendCancellableCoroutine<String> {
    continuation = it
}

All the code above executes on the same, main thread. There is no multithreading at all going on.

The coroutine you launch suspends itself each time it calls suspendHere(). It writes the continuation callback to the continuation property, and then you explicitly use that continuation to resume the coroutine.

The code uses the Unconfined coroutine dispatcher which does no dispatching to threads at all, it just runs the coroutine code right there where you invoke continuation.resume().


With that in mind, let's revisit your diagram:

GlobalScope.launch {       <---- (A)
    val y = loadData()     <---- (B)  // suspend fun loadData() 
    println(y)             <---- (C)
    delay(1000)            <---- (D)
    println("completed")   <---- (E)
}
  1. Kotlin has a pre-defined ThreadPool at the beginning.

It may or may not have a thread pool. A UI dispatcher works with a single thread.

The prerequisite for a thread to be the target of a coroutine dispatcher is that there is a concurrent queue associated with it and the thread runs a top-level loop that takes Runnable objects from this queue and executes them. A coroutine dispatcher simply puts the continuation on that queue.

  1. At (A), Kotlin starts executing the coroutine in the next available free thread (Say Thread01).

It can also be the same thread where you called launch.

  1. At (B), Kotlin stops executing the current thread, and starts the suspending function loadData() in the next available free thread (Thread02).

Kotlin has no need to stop any threads in order to suspend a coroutine. In fact, the main point of coroutines is that threads don't get started or stopped. The thread's top-level loop will go on and pick another runnable to run.

Furthermore, the mere fact that you're calling a suspend fun has no significance. The coroutine will only suspend itself when it explicitly calls suspendCoroutine. The function may also simply return without suspension.

But let's assume it did call suspendCoroutine. In that case the coroutine is no longer running on any thread. It is suspended and can't continue until some code, somewhere, calls continuation.resume(). That code could be running on any thread, any time in the future.

  1. When (B) returns after execution, Kotlin continues the coroutine in the next available free thread (Thread03).

B doesn't "return after execution", the coroutine resumes while still inside its body. It may suspend and resume any number of times before returning.

  1. (C) executes on Thread03.
  2. At (D), the Thread03 is stopped.
  3. After 1000ms, (E) is executed on the next free thread, say Thread01.

Again, no threads are being stopped. The coroutine gets suspended and a mechanism, usually specific to the dispatcher, is used to schedule its resumption after 1000 ms. At that point it will be added to the run queue associated with the dispatcher.


For specificity, let's see some examples of what kind of code it takes to dispatch a coroutine.

Swing UI dispatcher:

EventQueue.invokeLater { continuation.resume(value) }

Android UI dispatcher:

mainHandler.post { continuation.resume(value) }

ExecutorService dispatcher:

executor.submit { continuation.resume(value) } 
Tangent answered 28/11, 2018 at 19:27 Comment(7)
So, when I use Dispatchers.IO for a suspending fun, then it will spawn a new thread and process it on an IO thread right? After processing the suspending fun, the IO thread is stopped, and the Continuation resumes from where it left, on the Main thread or a thread which is defined by the CoroutineContext. Is that correct?Aggravate
I am trying to prepare a diagram (or an animation) of how things work internally, something that looks like this --- en.wikipedia.org/wiki/Thread_pool#/media/File:Thread_pool.svgAggravate
I guess you mean when you use withContext(Dispatchers.IO) { ... work ... }. You would use this for a non-suspending fun to avoid it blocking your current thread. The Runtime will suspend your current coroutine and resume it on a background thread. When the withContext() block is done, it will again suspend and resume on your initial thread. So it will be the same coroutine all the time, just jumping from thread to thread (just like a thread can jump from core to core).Tangent
So basically a coroutine is a collection of many Continuations, and a Continuation is just like a callback [under the hood], which is being executed by other suspending functions after their work is done. And a Dispatcher is responsible for scheduling these Continuations by placing them into a concurrent queue (either main or IO based on the developer's preference). Right?Aggravate
Continuation is a callback above the hood, its clients use it as such. Internally it's an optimized implementation that allows to reuse the same callback object to resume from any suspension point within a function body. A coroutine is associated with a linked list of continuations which mimics the call stack. Once a suspend fun completes, it resumes the continuation of its caller with the result. This is the essence of the "continuation-passing style".Tangent
One point that I still want to clarify. Assuming no threads are stopped and restarted, there are always potentially few threads up and running which they check that concurrent queue constantly for a piece work to run when they're are in READY state, right?Theodora
There are different strategies a given event loop implementation can use. Two basic ones are exponential backoff, where you go to sleep for progressively longer times (but always a limited time), and the wait-notify mechanism, where you go to sleep until the producer thread sends you a signal.Tangent
A
22

Coroutines work by creating a switch over possible resume points:

class MyClass$Coroutine extends CoroutineImpl {
    public Object doResume(Object o, Throwable t) {
        switch(super.state) {
        default:
                throw new IllegalStateException("call to \"resume\" before \"invoke\" with coroutine");
        case 0:  {
             // code before first suspension
             state = 1; // or something else depending on your branching
             break;
        }
        case 1: {
            ...
        }
        }
        return null;
    }
}

The resulting code executing this coroutine is then creating that instance and calls the doResume() function everytime it needs to resume execution, how that is handled depends on the scheduler used for execution.

Here is an example compilation for a simple coroutine:

launch {
    println("Before")
    delay(1000)
    println("After")
}

Which compiles to this bytecode

private kotlinx.coroutines.experimental.CoroutineScope p$;

public final java.lang.Object doResume(java.lang.Object, java.lang.Throwable);
Code:
   0: invokestatic  #18                 // Method kotlin/coroutines/experimental/intrinsics/IntrinsicsKt.getCOROUTINE_SUSPENDED:()Ljava/lang/Object;
   3: astore        5
   5: aload_0
   6: getfield      #22                 // Field kotlin/coroutines/experimental/jvm/internal/CoroutineImpl.label:I
   9: tableswitch   { // 0 to 1
                 0: 32
                 1: 77
           default: 102
      }
  32: aload_2
  33: dup
  34: ifnull        38
  37: athrow
  38: pop
  39: aload_0
  40: getfield      #24                 // Field p$:Lkotlinx/coroutines/experimental/CoroutineScope;
  43: astore_3
  44: ldc           #26                 // String Before
  46: astore        4
  48: getstatic     #32                 // Field java/lang/System.out:Ljava/io/PrintStream;
  51: aload         4
  53: invokevirtual #38                 // Method java/io/PrintStream.println:(Ljava/lang/Object;)V
  56: sipush        1000
  59: aload_0
  60: aload_0
  61: iconst_1
  62: putfield      #22                 // Field kotlin/coroutines/experimental/jvm/internal/CoroutineImpl.label:I
  65: invokestatic  #44                 // Method kotlinx/coroutines/experimental/DelayKt.delay:(ILkotlin/coroutines/experimental/Continuation;)Ljava/lang/Object;
  68: dup
  69: aload         5
  71: if_acmpne     85
  74: aload         5
  76: areturn
  77: aload_2
  78: dup
  79: ifnull        83
  82: athrow
  83: pop
  84: aload_1
  85: pop
  86: ldc           #46                 // String After
  88: astore        4
  90: getstatic     #32                 // Field java/lang/System.out:Ljava/io/PrintStream;
  93: aload         4
  95: invokevirtual #38                 // Method java/io/PrintStream.println:(Ljava/lang/Object;)V
  98: getstatic     #52                 // Field kotlin/Unit.INSTANCE:Lkotlin/Unit;
 101: areturn
 102: new           #54                 // class java/lang/IllegalStateException
 105: dup
 106: ldc           #56                 // String call to \'resume\' before \'invoke\' with coroutine
 108: invokespecial #60                 // Method java/lang/IllegalStateException."<init>":(Ljava/lang/String;)V
 111: athrow

I compiled this with kotlinc 1.2.41

From 32 to 76 is the code for printing Before and calling delay(1000) which suspends.

From 77 to 101 is the code for printing After.

From 102 to 111 is error handling for illegal resume states, as denoted by the default label in the switch table.

So as a summary, the coroutines in kotlin are simply state-machines that are controlled by some scheduler.

Avan answered 28/11, 2018 at 20:47 Comment(2)
That essentially means that a coroutine is internally divided into several cases of a switch. Continuations will be executed like executeCase(1), executeCase(2), executeCase(N). Correct?Aggravate
The state is updated by the doResume method by modifying a field in CoroutineImpl it inherits. Then control is returned to the caller (the scheduler) and it resumes at a later point, maybe it first does something else or resumes immediately.Avan

© 2022 - 2024 — McMap. All rights reserved.