preface

A coroutine is a concurrent scheme. It’s also a thought.

Coroutines are traditionally single-threaded, consuming less memory for IO intensive tasks and thus more efficient. But facing computationally intensive tasks is not as efficient as multi-threaded parallel computing.

Different languages have different implementations of coroutines, and even the same language has corresponding implementations of operating systems on different platforms.

Our Kotlin language coroutines are implementations of Coroutines for the JVM. The underlying principle also uses Java threads.

Basic knowledge of

Ecological architecture

Dependent library

dependencies {
   // Kotlin
   implementation "Org. Jetbrains. Kotlin: kotlin - stdlib: 1.4.32"

   // Coroutine core library
   implementation "Org. Jetbrains. Kotlinx: kotlinx coroutines -- core: 1.4.3"
   // Coroutine Android support library
   implementation "Org. Jetbrains. Kotlinx: kotlinx coroutines - android: 1.4.3"
   // Coroutine Java8 support library
   implementation "Org. Jetbrains. Kotlinx: kotlinx coroutines -- jdk8:1.4.3"
 
   // lifecycle extends the wrapper around coroutines
   implementation "Androidx. Lifecycle: lifecycle - viewmodel - KTX: 2.2.0." "
   implementation "Androidx. Lifecycle: lifecycle - runtime - KTX: 2.2.0." "
   implementation "Androidx. Lifecycle: lifecycle - livedata - KTX: 2.2.0." "
}
​
Copy the code

Why do some people find coroutines obscure?

1. There is no detailed definition of coroutine on the network, and its implementation is different for each language and system. It can be said that opinions vary, what kernel state user state bala Bala, it is easy to give us with bias

2. The interference caused by Kotlin’s various grammatical sugars. Such as:

  • Higher-order functions
  • The source implementation class could not be found

Therefore, a solid foundation of Kotlin grammar is a prerequisite for learning coroutines.

If you don’t know where to decomcompile to Java, take the final translation of Java as the standard.

What is a coroutine? What’s the use?

I’m sorry, I can only tell you what Kotlin’s coroutine is, and I don’t know much about coroutines in other languages either.

What the coroutine in Kotlin does is flatters and straightens the asynchronous callback code to synchronize it. Other than that, nothing special.

To create a coroutine is to stealthily generate a set of code behind the compiler, such as a state machine.

Suspend and resume the state machine state flow implementation turns layers of nested callback code into something as intuitive and concise as synchronous code.

It’s not some threaded framework, it’s not some sophisticated kernel state, user state. It’s actually a syntactic sugar for our Android callback function…

This article will thoroughly analyze the implementation principle of coroutines around suspend and resume

Kotlin function basic knowledge review

Functions in Kotlin are first-class citizens and have their own types

Function types

fun foo(a){}
// The type is () -> Unit
fun foo(a){}
// Type is (Int) -> String

class Foo{
   fun bar(p0: String,p1: Long):Any{}
   
}
// The bar type is: Foo.(String,Long) -> Any
//Foo is the bar receiver. You can also write (Foo,String,Long) ->Any
Copy the code

Is everything the same? I can’t help it. It’s the rules. It can only be inferred by the compiler, for example

val f: () -> Unit = ::foo
val g: () -> String = ::foo
Copy the code

Written with Receiver

class Foo{
    fun bar(p0: String,p1: Long):Any{}
}
Copy the code
val h: (Foo,String,Long) -> Any = Foo:bar
Copy the code

Function references to bind receiver:

val m: (String,Loing) -> Any = foo:bar
Copy the code

Extra knowledge

val x: (Foo,String,Long) -> Any = Foo:bar
val y: Function3<Foo,String,Long,Any> = x

Foo.(String,Long) -> Any = (Foo,String,Long) ->Any = Function3<Foo,String,Long,Any>
Copy the code

Functions are passed as arguments

fun yy(p: (Foo.String.Long) - >Any){
	p(Foo(),"Hello".3L)// just call p()
    //p.invoke(Foo(),"Hello",3L) can also use the invoke form
}
Copy the code

Lambda

Anonymous functions, which have no name compared to normal functions, sound like nonsense

// A normal function
fun func(a){
   println("hello");
}
​
// Drop the function name func to make it anonymous
fun(a){
   println("hello");    
}
​
// Can be assigned to a variable
val func = fun(a){
   println("hello");    
}
​
// The type of the anonymous function
val func :()->Unit = fun(a){
   println("hello");    
}
​
/ / Lambda expressions
val func={
   print("Hello");
}
​
/ / Lambda type
val func :()->String = {
print("Hello");
"Hello" // If Lambda is used, the last line is treated as the return value, so return is omitted. Ordinary functions don't
}
​
// take Lambda
val f1: (Int) - >Unit = {p:Int ->
print(p);
}
// Can be further simplified as
val f1 = {p:Int ->
print(p);    
}
// When there is only one argument, it can also be written as
val f1: (Int) - >Unit = {
   print(it);
}
​
Copy the code

A summary of my personal experience with functions

Functions look the same as anonymous functions, but decompilation to Java shows a slight difference

If you just use a normal function, it’s just like a normal Java function.

Public void a(){}

But if you use this function via a function reference (:: a), instead of calling fun a() directly, it regenerates Function0

Hang up function

Suspend.

A suspended function can call any function.

Non-suspended functions can only call non-suspended functions.

In other words, the suspend function can only be called from the suspend function.


A simple display of suspend functions:

//com.example.studycoroutine.chapter.CoroutineRun.kt
suspend fun suspendFun(a): Int {
    return 1;
}
Copy the code

What’s special about suspend functions?

public static final Object suspendFun(Continuation completion) {
    return Boxing.boxInt(1);
}
Copy the code

Suspend can only be called from suspend.

You must satisfy the sanctimonious suspend function before it can do its job! You put a ball in it.

He then wants to call other suspend functions, simply by continuing the ball into other suspend methods.

There is no such thing as a normal function, so you cannot call the suspend function…

At this point, you may have some questions:

  • Question1. Isn’t it a chicken-and-egg problem? Where did you get the first ball?

  • Question2. Why does the return value change after compiling?

  • Question3. SuspendFun If invoked in the coroutine body, then who is its ball?

The standard library gives us the most primitive tools

public fun <T> (suspend(a) -> T).startCoroutine(completion: Continuation<T>) {
   createCoroutineUnintercepted(completion).intercepted().resume(Unit)}public fun <T> (suspend(a) -> T).createCoroutine(completion: Continuation<T>): Continuation<Unit> =
   SafeContinuation(createCoroutineUnintercepted(completion).intercepted(), COROUTINE_SUSPENDED)
Copy the code

Start a coroutine in the simplest way possible.

Demo-K1

fun main(a) {
   val b = suspend {
       val a = hello2()
       a
  }
   b.createCoroutine(MyCompletionContinuation()).resume(Unit)}suspend fun hello2(a) = suspendCoroutine<Int> {
   thread{
       Thread.sleep(1000)
       it.resume(10086)}}class MyContinuation() : Continuation<Int> {
   override val context: CoroutineContext = CoroutineName("Co-01")
   override fun resumeWith(result: Result<Int>) {
       log("MyContinuation resumeWith result =${result.getOrNull()}")}}Copy the code

Two create coroutine functions are different

StartCoroutine returns no value, and createCoroutine returns a Continuation

It seems like the main difference is that startCoroutine calls resume(Unit) directly, so it’s not wrapped as SafeContinuation, while createCoroutine returns a SafeContinuation, Because you do not know when or where resume will be called, you must ensure that resume is called only once, so it is wrapped as safeContinuation

The purpose of SafeContinuationd is to ensure that it is suspended only when an asynchronous call occurs

Analysis createCoroutineUnintercepted

//kotlin.coroutines.intrinsics.CoroutinesIntrinsicsH.kt
@SinceKotlin("1.3")
public expect fun <T> (suspend(a) -> T).createCoroutineUnintercepted(completion: Continuation<T>): Continuation<Unit>
Copy the code

Say first conclusion

In fact, it can be simply understood as a kotlin level primitive, which returns a coroutine body.

To analyze

Demo-k1 first, b is an anonymous function. It must be compiled as a FunctionX, and it is also decorated by suspend so it must not be the same as a normal anonymous function.

The compiled source code is

public static final void main(a) {
     Function1 var0 = (Function1)(new Function1((Continuation)null) {
        int label;
​
        @Nullable
        public final Object invokeSuspend(@NotNull Object $result) {
           Object var3 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
           Object var10000;
           switch(this.label) {
           case 0:
              ResultKt.throwOnFailure($result);
              this.label = 1;
              var10000 = TestSampleKt.hello2(this);
              if (var10000 == var3) {
                 return var3;
              }
              break;
           case 1:
              ResultKt.throwOnFailure($result);
              var10000 = $result;
              break;
           default:
              throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
          }
​
           int a = ((Number)var10000).intValue();
           return Boxing.boxInt(a);
        }
​
        @NotNull
        public final Continuation create(@NotNull Continuation completion) {
           Intrinsics.checkParameterIsNotNull(completion, "completion");
           Function1 var2 = new <anonymous constructor>(completion);
           return var2;
        }
​
        public final Object invoke(Object var1) {
           return((<undefinedtype>)this.create((Continuation)var1)).invokeSuspend(Unit.INSTANCE); }});boolean var1 = false;
     Continuation var7 = ContinuationKt.createCoroutine(var0, (Continuation)(newMyContinuation()));
     Unit var8 = Unit.INSTANCE;
     boolean var2 = false;
     Companion var3 = Result.Companion;
     boolean var5 = false;
     Object var6 = Result.constructor-impl(var8);
     var7.resumeWith(var6);
  }
Copy the code

Function1 var0 = new Function1 creates an object, which has nothing to do with coroutines. This step is just compiler level anonymous function syntactic optimization

If direct

fun main(a) {
   suspend {
       val a = hello2()
       a
  }.createCoroutine(MyContinuation()).resume(Unit)}Copy the code

Function1 var0 = new Function1 is also created

Answer question1

Continue calling createCoroutine

Continue createCoroutineUnintercepted, found on the platform of the JVM implementation

//kotlin.coroutines.intrinsics.IntrinsicsJVM.class
@SinceKotlin("1.3")
public actual fun <T> (suspend(a) -> T).createCoroutineUnintercepted(
   completion: Continuation<T>
): Continuation<Unit> {
//probeCompletion is still the completion object we pass in, which in our Demo is myCoroutine
   val probeCompletion = probeCoroutineCreated(completion)// The probeCoroutineCreated method is checked. This is my understanding
   //This is the suspend lambda. In the Demo, it's myCoroutineFun
   return if (this is BaseContinuationImpl)
       create(probeCompletion)
   else
The //else branch will not be covered in our demo
     / / when [createCoroutineUnintercepted] have not inherited BaseContinuationImpl hang lambda, will use this function.
       createCoroutineFromSuspendFunction(probeCompletion) {
          (this asFunction1<Continuation<T>, Any? >).invoke(it) } }Copy the code
@NotNull
public final Continuation create(@NotNull Continuation completion) {
Intrinsics.checkNotNullParameter(completion, "completion");
Function1 var2 = new <anonymous constructor>(completion);
return var2;
}
Copy the code

Pass in Completion, create a new Function1, and return it as a Continuation. That is the body object you have created, and the heart of the coroutine is its internal state machine, the invokeSuspend function

Call the create

@NotNull
public final Continuation create(@NotNull Continuation completion) {
	Intrinsics.checkNotNullParameter(completion, "completion");
	Function1 var2 = new <anonymous constructor>(completion);
	return var2;
}
Copy the code

Pass in Completion, create a new Function1, and return it as a Continuation. That is the body object you have created, and the heart of the coroutine is its internal state machine, the invokeSuspend function

Complement — Related class inheritance relationships

Answer question2 & 3

Known coroutines can invoke the resume of coroutines, this call will eventually come to BaseContinuationImpl: : resumeWith

internal abstract class BaseContinuationImpl{
   fun resumeWith(result: Result
       
        )
       ?> {
          // This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume
       var current = this
       var param = result
       while (true) {
           with(current) {
               val completion = completion!! // fail fast when trying to resume continuation without completionval outcome: Result<Any? > =try {
                       val outcome = invokeSuspend(param)// Call the state machine
                       if (outcome === COROUTINE_SUSPENDED) return
                       Result.success(outcome)
                  } catch (exception: Throwable) {
                       Result.failure(exception)
                  }
               releaseIntercepted() // this state machine instance is terminating
               if (completion is BaseContinuationImpl) {
                   // unrolling recursion via loop
                   current = completion
                   param = outcome
              } else {
                   // When you get to this completion, the completion is the first ball that gets plugged.
                   completion.resumeWith(outcome)
                   return
              }
          }
      }
  }
}
Copy the code

State machine code interception

public final Object invokeSuspend(@NotNull Object $result) {
   Object var3 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
   Object var10000;
   switch(this.label) {
   case 0:// Label = 0 for the first time
      ResultKt.throwOnFailure($result);
      // The label is changed to 1, which means the next time it is restored, case 1 will be used.
      this.label = 1; 
      // With all eyes on me, I announce something: this is the coroutine body object.
      var10000 = TestSampleKt.hello2(this);
      if (var10000 == var3) {
         return var3;
      }
      break;
   case 1:
      ResultKt.throwOnFailure($result);
      var10000 = $result;
      break;
   default:
      throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
   }

   int a = ((Number)var10000).intValue();
   return Boxing.boxInt(a);
}
Copy the code

The answer to Question3 comes back, passing in the continuation created by Create

Last but not least, question2 shows why decompilation of suspended functions returns object.

For example, hello2 can return the whiteboard that represents the suspension, as well as result. If the whiteboard is returned, the state machine returns, and the coroutine is suspended. If result is returned, hello2 completes execution and is a suspended function without suspension, and the compiler usually reminds you that the suspend modifier is meaningless. So it’s a design necessity, there’s no reason why.

Finally, except in cases where a result is returned directly, the suspended function always ends with resume and either returns result or an exception. This means that the suspended function has returned.

The point of calling resume is to re-call the resumeWith of BaseContinuationImpl to wake up the state machine and continue executing the coroutine body code.

In other words, our custom suspend function must use the suspendCoroutine to obtain the continuant, that is, the state machine object, otherwise it cannot truly suspend and resume.

suspendCoroutine

We don’t have to suspendCoroutine, use more direct suspendCoroutineUninterceptedOrReturn can be achieved, but this way to manually return the whiteboard. Be careful, however, to return or not return when it is reasonable to do so, as this can produce many unexpected results

suspend fun mySuspendOne(a) = suspendCoroutineUninterceptedOrReturn<String> { continuation ->
    thread {
        TimeUnit.SECONDS.sleep(1)
        continuation.resume("hello world")}// Because our function did not return the correct result, we must return a pending flag, otherwise BaseContinuationImpl will consider the task complete.
    // And our thread is running again without cancellation, which will have many unexpected results
    kotlin.coroutines.intrinsics.COROUTINE_SUSPENDED
}
Copy the code

SuspendCoroutine does not

suspend fun mySafeSuspendOne(a) = suspendCoroutine<String> { continuation ->
    thread {
        TimeUnit.SECONDS.sleep(1)
        continuation.resume("hello world")}/ / suspendCoroutine function very smart to help us determine if not want to object, automatically return to the kotlin. Coroutines. Intrinsics. COROUTINE_SUSPENDED
}
Copy the code
public suspend inline fun <T> suspendCoroutine(crossinline block: (Continuation<T- > >)Unit): T =
    suspendCoroutineUninterceptedOrReturn { c: Continuation<T> ->
    	Encapsulate a proxy Continuation object
        val safe = SafeContinuation(c.intercepted())
        block(safe)
        // Determine whether to return COROUTINE_SUSPENDED based on the result of the block
        safe.getOrThrow()
    }
Copy the code

The mystery of the SafeContinuation

// Call the single-argument constructor
internal actual constructor(delegate: Continuation<T>) : this(delegate, UNDECIDED)
@Volatile
private var result: Any? = initialResult //UNDECIDED to result
// Java atomic property update stuff
private companion object {
       @Suppress("UNCHECKED_CAST")
       @JvmStatic
       private valRESULT = AtomicReferenceFieldUpdater.newUpdater<SafeContinuation<*>, Any? >( SafeContinuation::class.java, Any::class.java asClass<Any? >,"result")}internal actual fun getOrThrow(a): Any? {
   var result = this.result // atomic read
   if (result === UNDECIDED) { // If UNDECIDED, set result to COROUTINE_SUSPENDED
       if (RESULT.compareAndSet(this, UNDECIDED, COROUTINE_SUSPENDED)) returnCOROUTINE_SUSPENDED
       result = this.result // reread volatile var
  }
   return when {
       result === RESUMED -> COROUTINE_SUSPENDED // already called continuation, indicate COROUTINE_SUSPENDED upstream
       result is Result.Failure -> throw result.exception
       else -> result // either COROUTINE_SUSPENDED or data <- Here returns the whiteboard}}public actual override fun resumeWith(result: Result<T>) {
       while (true) { // lock-free loop
           val cur = this.result / / atomic read. Don't understand why the official comment here is called atomic reading. I think Volatile only guarantees visibility.
           when {
             // If this is UNDECIDED, add the result.
               cur === UNDECIDED -> if (RESULT.compareAndSet(this, UNDECIDED, result.value)) return
             // If the state is suspended, call back the state machine via resumeWith
               cur === COROUTINE_SUSPENDED -> if (RESULT.compareAndSet(this, COROUTINE_SUSPENDED, RESUMED)){
                   delegate.resumeWith(result)
                   return
              }
               else -> throw IllegalStateException("Already resumed")}}}Copy the code
val safe = SafeContinuation(c.intercepted())
block(safe)
safe.getOrThrow()
Copy the code

Just to review what a hang is, getOrThrow returns a whiteboard. When can getOrThrow return a whiteboard? The answer is that the value of result has not been modified since it was initialized. ResumeWith was not executed. Block (safe) is passed in without calling safe resumeWith. As simple as that is, cas code ensures the atomicity and concurrency safety of critical logic

Continuing with the demo-k1 example, we assume that Hello2 is running on a new child thread, otherwise it is still not suspended.

{
   thread{
       Thread.sleep(1000)
       it.resume(10086)}}Copy the code

conclusion

Finally, we can say that to start a coroutine, we use the compiler to generate a state machine object that helps us to flatten the callback code into synchronous code.