1. What is a coroutine

  • Coroutines are compiler capabilities, because coroutines do not require operating system and hardware support (threads do). It is the compiler that makes it easier for developers to write code by providing some keywords and automatically generating processing bytecode internally

Purpose differences between threads and coroutines

  • The purpose of threads is to increase CPU utilization, to allow multiple tasks to run in parallel, and to serve the machine.
  • The purpose of coroutines is to enable better collaboration between multiple tasks, mainly in terms of code logic, and to serve the developer (to improve resource utilization, but not the original purpose).

Scheduling differences between threads and coroutines

  • Threads are scheduled by the system, usually preemptively, and are allocated according to priority
  • Coroutine scheduling is defined by the developer according to the program logic to allocate resources to different tasks at different times.

Coroutines are related to threads

  • Coroutines do not replace threads, but are abstracted from threads. Threads are divided CPU resources. Coroutines are organized code flows

2. Basic use

2.1. CoroutineScope. Launch

  • The launch function launches a new coroutine without returning the result to the caller

2.1.1. Code implementation

// Get a coroutine scope to create a coroutine
private val mScope = MainScope()

mScope.launch(Dispatchers.IO) {
            // The IO thread executes the getStringInfo() method and returns the result
            var res = getStringInfo()
            // The main thread prompts for an update after obtaining the result
            withContext(Dispatchers.Main) {
                Alerter.create(this@LearnCoroutineActivity).setTitle("Result").setText(res).show()
            }
}

private suspend fun getStringInfo(a): String {
        return withContext(Dispatchers.IO) {
            // The thread on which the coroutine is located will not block for 1000 milliseconds
            delay(1000)
            "Coroutine-launch"}}// Manually cancel in the onDestroy lifecycle method
override fun onDestroy(a) {
        super.onDestroy()
        mScope.cancel()
}
Copy the code

2.1.2. Steps

  1. Gets a coroutine scope used to create a coroutine
  2. Launches a new coroutine task through the coroutine scope. Launch method
    1. The execution thread can be specified at startup
    2. Internally, the thread is switched by the withContext() method
  3. Manually cancel in the onDestroy lifecycle method

2.2. CoroutineScope. Async

  • The async function implements return value processing or concurrent processing

2.2.1. Return value processing

private fun asyncReturn(a) {
        mScope.launch(Dispatchers.Main) {
            // A new coroutine is opened to execute the body of the coroutine, and the parent coroutine's code follows
            var deferred = async(Dispatchers.IO) {
                delay(1000)
                "Coroutine-Async"
            }
            // Waiting for async to complete its execution to get the return value does not block the thread, but hangs, handing over the thread's execution rights
            // Until async's coroutine body finishes executing, the coroutine resumes execution
            val data = deferred.await()
            Alerter.create(this@LearnCoroutineActivity).setTitle("Result").setText(data).show()
        }
    }
Copy the code

2.2.2. Concurrent processing

private fun asyncConcurrent(a) {
    	// The creation of coroutineContext will be analyzed later
        var coroutineContext = Job() +
                Dispatchers.Main +
                CoroutineExceptionHandler { coroutineContext, throwable ->
                    Log.e(
                        "CoroutineException"."CoroutineExceptionHandler: $throwable"
                    )
                } +
                CoroutineName("asyncConcurrent")
        mScope.launch(coroutineContext) {
            val job1 = async(Dispatchers.IO) {
                delay(1000)
                "job1-finish"
            }
            val job2 = async(Dispatchers.IO) {
                delay(2000)
                "job2-finish"
            }
            val job3 = async(Dispatchers.IO) {
                delay(500)
                "job3-finish"
            }
            // Wait for each job to complete and merge the results
            Alerter.create(this@LearnCoroutineActivity).setTitle("Result")
                .setText("job1:${job1.await()},job2:${job2.await()},job3:${job3.await()}").show()
        }
    }
Copy the code

2.3. Coroutine scope

  • MainScope is the default scope provided by coroutines, but there are other scopes that are more convenient
  • You can use lifecycleScope or viewModelScope, both of which are automatically cancelled
  • Used in the UI components LifecycleOwner lifecycleScope, using ViewModel in ViewModel. ViewModelScope

3. CoroutineContext

  • CoroutineContext is a special Set that contains features of both Map and Set

  • The elements in the Set correspond to the key (Map feature), but are not allowed to repeat (Set feature).

  • Elements can be combined with a + sign

  • There are four categories of elements, which together make up CoroutineContext

    • Job: The unique identifier of a coroutine that controls its lifetime (new, active, completing, completing, cancelling, cancelled).
    • CoroutineDispatcher: specifies the thread on which the coroutine will run (IO, Default, Main, Unconfined)
    • CoroutineName: Specifies the name of the coroutine. The default is coroutine
    • CoroutineExceptionHandler: specifies the exception handler coroutines, used to handle an uncaught exception

3.1. CoroutineDispatcher Element

  • Used to specify the running thread of the coroutine
  • Kotlin has built-in 4 implementations of CoroutineDispatcher, which can be returned and used by Default, IO, Main, and Unconfined fields of Dispatchers
public actual object Dispatchers {
    @JvmStatic
    public actual val Default: CoroutineDispatcher = createDefaultDispatcher()
    @JvmStatic
    public val IO: CoroutineDispatcher = DefaultScheduler.IO
    @JvmStatic
    public actual val Unconfined: CoroutineDispatcher = kotlinx.coroutines.Unconfined
    @JvmStatic
    public actual val Main: MainCoroutineDispatcher get() = MainDispatcherLoader.dispatcher
}
Copy the code

3.1.1. Default, IO

Default: IO uses a thread pool internally

3.1.1.1. default
  • Default obtains the corresponding thread pool according to the useCoroutinesScheduler property (Default true)
    • DefaultScheduler (useCoroutinesScheduler=ture) : Kotlin’s own thread pool logic
    • CommonPool (useCoroutinesScheduler=false) : The Executor in the Java class library implements the thread pool logic
internal actual fun createDefaultDispatcher(a): CoroutineDispatcher =
    if (useCoroutinesScheduler) DefaultScheduler else CommonPool
internal object DefaultScheduler : ExperimentalCoroutineDispatcher() {
    .....
}
/ / the delegate class
public open class ExperimentalCoroutineDispatcher(
    private val corePoolSize: Int.private val maxPoolSize: Int.private val idleWorkerKeepAliveNs: Long.private val schedulerName: String = "CoroutineScheduler"
) : ExecutorCoroutineDispatcher() {
}
// The Executor in the Java library implements the thread pool logic
internal object CommonPool : ExecutorCoroutineDispatcher() {}
// The common parent class, which defines the behavior
public abstract class ExecutorCoroutineDispatcher: CoroutineDispatcher(), Closeable {}
Copy the code
ExperimentalCoroutineDispatcher
  • The main implementation DefaultScheduler in its superclass ExperimentalCoroutineDispatcher
public open class ExperimentalCoroutineDispatcher(
    private val corePoolSize: Int.private val maxPoolSize: Int.private val idleWorkerKeepAliveNs: Long.private val schedulerName: String = "CoroutineScheduler"
) : ExecutorCoroutineDispatcher() {
    public constructor(
        corePoolSize: Int = CORE_POOL_SIZE,
        maxPoolSize: Int = MAX_POOL_SIZE,
        schedulerName: String = DEFAULT_SCHEDULER_NAME
    ) : this(corePoolSize, maxPoolSize, IDLE_WORKER_KEEP_ALIVE_NS, schedulerName)

    ...// Omit the structure
    
    // Create CoroutineScheduler instance
    private fun createScheduler(a) = CoroutineScheduler(corePoolSize, maxPoolSize, idleWorkerKeepAliveNs, schedulerName)
    
    override val executor: Executorget() = coroutineScheduler

    override fun dispatch(context: CoroutineContext, block: Runnable): Unit =
        try {
            // The dispatch method delegates to the CoroutineScheduler's dispatch method
            coroutineScheduler.dispatch(block)
        } catch (e: RejectedExecutionException) {
            ....
        }

    override fun dispatchYield(context: CoroutineContext, block: Runnable): Unit =
        try {
            // The dispatchYield method is delegated to CoroutineScheduler's dispatchYield method
            coroutineScheduler.dispatch(block, tailDispatch = true)}catch (e: RejectedExecutionException) {
            ...
        }
    
	internal fun dispatchWithContext(block: Runnable, context: TaskContext, tailDispatch: Boolean) {
        try {
            // The dispatchWithContext method is delegated to CoroutineScheduler's dispatchWithContext method
            coroutineScheduler.dispatch(block, context, tailDispatch)
        } catch(e: RejectedExecutionException) { .... }}override fun close(a): Unit = coroutineScheduler.close()
    // Implement request blocking
    public fun blocking(parallelism: Int = BLOCKING_DEFAULT_PARALLELISM): CoroutineDispatcher {
        require(parallelism > 0) { "Expected positive parallelism level, but have $parallelism" }
        return LimitingDispatcher(this, parallelism, null, TASK_PROBABLY_BLOCKING)
    }
		// Implement the request limit
    public fun limited(parallelism: Int): CoroutineDispatcher {
        require(parallelism > 0) { "Expected positive parallelism level, but have $parallelism" }
        require(parallelism <= corePoolSize) { "Expected parallelism level lesser than core pool size ($corePoolSize), but have $parallelism" }
        return LimitingDispatcher(this, parallelism, null, TASK_NON_BLOCKING)
    }
    
    ....// Omit some methods for testing to better track synchronization status
}
Copy the code
3.1.1.2. IO
  • The IO implementation is LimitingDispatcher
val IO: CoroutineDispatcher = LimitingDispatcher(
    this,
    systemProp(IO_PARALLELISM_PROPERTY_NAME, 64.coerceAtLeast(AVAILABLE_PROCESSORS)),
    "Dispatchers.IO",
    TASK_PROBABLY_BLOCKING
)
Copy the code
LimitingDispatcher
  • The IO implementation class has some maximum request limits, as well as queue processing
private class LimitingDispatcher(
    private val dispatcher: ExperimentalCoroutineDispatcher,
    private val parallelism: Int.private valname: String? .override val taskMode: Int
) : ExecutorCoroutineDispatcher(), TaskContext, Executor {
    // Synchronize the blocking queue
    private val queue = ConcurrentLinkedQueue<Runnable>()
    / / cas count
    private val inFlightTasks = atomic(0)
    
    override fun dispatch(context: CoroutineContext, block: Runnable) = dispatch(block, false)

    private fun dispatch(block: Runnable, tailDispatch: Boolean) {
        var taskToSchedule = block
        while (true) {

            if (inFlight <= parallelism) {
                // The Dispatch method of LimitingDispatcher delegates to the dispatchWithContext method of DefaultScheduler
                dispatcher.dispatchWithContext(taskToSchedule, this, tailDispatch)
                return}..// Omitted some queue processing logic}}}Copy the code

3.1.2. CoroutineScheduler

  • Default and IO share CoroutineScheduler thread pool. Kotlin implements a set of two scheduling policies for thread pool
  • By internal mode
fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, tailDispatch: Boolean = false){...if (task.mode == TASK_NON_BLOCKING) {
        if (skipUnpark) return
        signalCpuWork()
    } else {
        signalBlockingWork(skipUnpark = skipUnpark)
    }
}
Copy the code
Mode
Type Mode
Default TASK_NON_BLOCKING
IO TASK_PROBABLY_BLOCKING
Handling strategy
Type Mode
Default CoroutineScheduler at most corePoolSize threads can be created. CorePoolSize isMax (2, number of CPU cores).

That is, it tries to be equal to the number of CPU cores
IO Create more threads than corePoolSize to run IO tasks, but no more than maxPoolSize

1. The formula is Max (corePoolSize, min(number of CPU cores x 128, 2^ 21-2)), which is larger than corePoolSize and smaller than 2^ 21-2

2.2^ 21-2 is a very large number about 2M, but CoroutineScheduler cannot create that many threads, so you need to externally limit the number of tasks submitted

3.Dispatchers.IO pass when constructedLimitingDispatcherBy default, the maximum number of concurrent threads is limited to parallelismMax (64, CPU cores)Parallelism can only be submitted to CoroutineScheduler at most, the rest of the tasks are put into the queue to wait.
Suitable for the scene
Type Mode
Default 1. The CPU is busy during the execution of cpu-intensive tasks, which consumes a large number of CPU resources

2. Complex calculation, video decoding, etc., if the number of threads at this time is too many, more than the number of CPU cores, then these excess threads can not be executed by the CPU, will only waste memory resources

3. Since the thread itself has space such as stack and there are too many threads, the consumption caused by frequent thread switching will also affect the performance of the thread pool

4. For CPU-intensive tasks, the number of concurrent threads in the thread pool equals the number of CPU cores to maximize CPU efficiency
IO 1. The FEATURE of IO intensive tasks is that the CPU is idle when the task is executed, and the task does not consume a large amount of CPU resources

2. Most io-intensive tasks, such as network requests and I/O operations, are blocked. Blocked threads do not occupy CPU execution time

In order to keep the CPU busy, the number of threads created should be more than the number of tasks submitted. Ideally, the number of threads created should be equal to the number of tasks submitted. For these threads, there is a timeout policy for the pool to recycle them when they become idle. So most of the time it doesn’t use a lot of memory

4. However, there may be extreme cases. Therefore, for IO intensive tasks, the number of concurrent threads in the thread pool should be as high as possible to improve the CPU throughput.

3.1.3. Unconfined

  • Tasks are executed on the default start thread. And then by callingresumeThe thread that decides to resume the coroutine.
internal object Unconfined : CoroutineDispatcher() {
    False means no dispatch is required
    override fun isDispatchNeeded(context: CoroutineContext): Boolean = false

    override fun dispatch(context: CoroutineContext, block: Runnable) {
        // An Unconfined dispatch method is invoked only when the yield method is invoked
        // Yield () indicates that the current coroutine gives its own thread to another coroutine
        val yieldContext = context[YieldContext]
        if(yieldContext ! =null) {
            yieldContext.dispatcherWasUnconfined = true
            return
        }
        throw UnsupportedOperationException("Dispatchers.Unconfined.dispatch function can only be used by the yield function. " +
            "If you wrap Unconfined dispatcher in your code, make sure you properly delegate " +
            "isDispatchNeeded and dispatch calls.")}}Copy the code
  • Each coroutine has a corresponding Continuation instance, with resumeWith used to restore the coroutine, which exists in a DispatchedContinuation
DispatchedContinuation
  • We’ll focus on the implementation of resumeWith and the class delegate
internal class DispatchedContinuation<in T>(
    @JvmField val dispatcher: CoroutineDispatcher,
    @JvmField val continuation: Continuation<T>
) : DispatchedTask<T>(MODE_UNINITIALIZED), CoroutineStackFrame, Continuation<T> by continuation {
    .....
    override fun resumeWith(result: Result<T>) {
        val context = continuation.context
        val state = result.toState()
        if (dispatcher.isDispatchNeeded(context)) {
            _state = state
            resumeMode = MODE_ATOMIC
            dispatcher.dispatch(context, this)}else {
            executeUnconfined(state, MODE_ATOMIC) {
                withCoroutineContext(this.context, countOrElement) {
                    continuation.resumeWith(result)
                }
            }
        }
    }
    ....
}
Copy the code

The analysis is as follows:

  1. The DispatchedContinuation implements the logical addition to the code before the resumeWith() method through class delegation

  2. Use isDispatchNeeded (whether dispatch is required, Unconfined=false, default, IO=true) to determine whether to do different processing

    1. True: Calls the dispatch method of the CoroutineDispatcher of the coroutine
    2. False: calls the executeUnconfined method
    private inline fun DispatchedContinuation< * >.executeUnconfined(
        contState: Any? , mode:Int, doYield: Boolean = false,
        block: () -> Unit
    ): Boolean{ assert { mode ! = MODE_UNINITIALIZED }val eventLoop = ThreadLocalEventLoop.eventLoop
        if (doYield && eventLoop.isUnconfinedQueueEmpty) return false
        return if (eventLoop.isUnconfinedLoopActive) {
            _state = contState
            resumeMode = mode
            eventLoop.dispatchUnconfined(this)
            true
        } else {
            runUnconfinedEventLoop(eventLoop, block = block)
            false}}Copy the code
    1. Retrieve an eventLoop (related to the current thread) from a threadLocal to determine whether an Unconfined task is being performed
      1. If executed, invoke the EventLoop’s dispatchUnconfined method to place the Unconfined task in the EventLoop
      2. If not, execute directly
    internal inline fun DispatchedTask< * >.runUnconfinedEventLoop(
        eventLoop: EventLoop,
        block: () -> Unit
    ) {
        eventLoop.incrementUseCount(unconfined = true)
        try {
            block()
            while (true) {
                if(! eventLoop.processUnconfinedEvent())break}}catch (e: Throwable) {
            handleFatalException(e, null)}finally {
            eventLoop.decrementUseCount(unconfined = true)}}Copy the code
    1. Execute the block() code block, resumeWith() mentioned above
    2. Invoke the processUnconfinedEvent() method to execute the remaining Unconfined tasks, and break out of the loop when all of them have been executed
EventLoop
  • EventLoop is attached to a ThreadLocal, so it is associated with the current thread. EventLoop is also a subclass of CoroutineDispatcher
internal abstract class EventLoop : CoroutineDispatcher() {...// A dual-end queue is used to store Unconfined tasks
    private var unconfinedQueue: ArrayQueue<DispatchedTask<*>>? = null
    // Remove the Unconfined task from the end of the queue
    public fun processUnconfinedEvent(a): Boolean {
        valqueue = unconfinedQueue ? :return false
        valtask = queue.removeFirstOrNull() ? :return false
        task.run()
        return true
    }
    // Put the Unconfined task at the end of the queue
    public fun dispatchUnconfined(task: DispatchedTask< * >) {
        valqueue = unconfinedQueue ? : ArrayQueue<DispatchedTask<*>>().also { unconfinedQueue = it } queue.addLast(task) } ..... }Copy the code

The analysis is as follows:

  1. An internal dual-end queue is used to store Unconfined tasks
    1. The EventLoop dispatchUnconfined method is used to place an Unconfined task at the end of the queue
    2. The rocessUnconfinedEvent method is used to remove an Unconfined task from the head of the queue

3.1.4. The Main

  • Is to run coroutines on platform-specific Main threads that can only manipulate UI objects, but implementations vary by platform
platform implementation
kotlin/js Kotlin’s support for JavaScript, the ability to transform Kotlin code, the Kotlin standard library, and NPM package management capabilities

Dispatchers.Main is equivalent to Dispatchers.Default on kotlin/ JS
kotlin/native A technique for compiling Kotlin code into native binaries that run without a virtual machine. Its primary purpose is to allow compilation for platforms where virtual machines are not needed or impossible to use, such as embedded devices or iOS

Dispatchers.Main is equivalent to Dispatchers.Default on Kotlin /native
kotlin/JVM For platforms that require virtual machines to compile, for example, Android belongs to Kotlin /JVM. For Kotlin /JVM, we need to introduce corresponding Dispatcher. For example, Android needs to introduce kotlinx-Coroutines-Android library. It has an Android implementation of Dispatchers.Main, which essentially runs tasks on the Android Main thread via handlers

3.2. CoroutineName Element

  • Coroutine name, can be customized, convenient debugging analysis
public data class CoroutineName(
    val name: String
) : AbstractCoroutineContextElement(CoroutineName) {
 
    public companion object Key : CoroutineContext.Key<CoroutineName>
    
    override fun toString(a): String = "CoroutineName($name)"
}
Copy the code

3.3. CoroutineExceptionHandler Element

  • Coroutine exception handler. Every coroutine created by default has an exception handler, or it can be specified manually.

    var coroutineContext = Job() +
            Dispatchers.Main +
    	   // Add the specified exception handler manually
            CoroutineExceptionHandler { coroutineContext, throwable ->
                Log.e(
                    "CoroutineException"."CoroutineExceptionHandler: $throwable"
                )
            } +
            CoroutineName("asyncConcurrent")
    Copy the code
  • However, this is only valid for root coroutines started by the launch method, not for root coroutines started by async

    By default, the root coroutine started by async will catch all uncaught exceptions and place them in the Deferred until the user calls the await method of the Deferred, which means manually adding try-catch

CASE

Coroutine use scenarios change freely, the exception handling situation is more

  1. The SupervisorJob cases, word coroutines thrown exception will be entrusted to the parent coroutines CoroutineExceptionHandler processing
    1. Coroutines CoroutineExceptionHandler will not perform
  2. SupervisorJob case, won’t produce exception propagation, that is, their abnormal CoroutineExceptionHandler can receive
  3. Son of synchronization when multiple exception is thrown, CoroutineExceptionHandler will only capture the first exception and subsequent anomaly in the first exception suppressed array
  4. In disappear assist cheng can throw CancellationException, but all CoroutineExceptionHandler not received, can only be achieved by try-catch capture

3.4. CoroutineContext structure

  • CoroutineContext is a special Set that contains features of both Map and Set
  • The elements in the Set correspond to the key (Map feature), but are not allowed to repeat (Set feature).
  • Elements can be combined with a + sign
  • Each Element inherits from CoroutineContext
public interface CoroutineContext {
    // The operator [] is overloaded to retrieve the Element associated with the Key by CoroutineContext[Key]
    public operator fun <E : Element> get(key: Key<E>): E?

    // It is an aggregation function that provides the ability to traverse each Element in the CoroutineContext from left to right and do an operation for each Element
    public fun <R> fold(initial: R, operation: (R.Element) - >R): R
    
 	 CoroutineContext + CoroutineContext + CoroutineContext + CoroutineContext + CoroutineContext + CoroutineContext + CoroutineContext + CoroutineContext
    public operator fun plus(context: CoroutineContext): CoroutineContext =
        if (context === EmptyCoroutineContext) this else 
            context.fold(this) { acc, element ->
                val removed = acc.minusKey(element.key)
                if (removed === EmptyCoroutineContext) element else {
                    val interceptor = removed[ContinuationInterceptor]
                    if (interceptor == null) CombinedContext(removed, element) else {
                        val left = removed.minusKey(ContinuationInterceptor)
                        if (left === EmptyCoroutineContext) CombinedContext(element, interceptor) else
                            CombinedContext(CombinedContext(left, element), interceptor)
                    }
                }
            }
    
    // Return a new CoroutineContext that removes the Element corresponding to the Key
    public fun minusKey(key: Key< * >): CoroutineContext

    //Key definition, empty implementation
    public interface Key<E : Element>
    
    //Element definition. Each Element is a CoroutineContext
    public interface Element : CoroutineContext {. }}Copy the code

Kotlin’s operator operator overloading is used here to implement various policies

  • In addition to the plus method, the other three methods in CoroutineContext are overwritten by CombinedContext, Element, and EmptyCoroutineContext
    • CombinedContext: Implementation of the CoroutineContext set structure, which contains a recursive definition
    • Element: Element in the CombinedContext
    • EmptyCoroutineContext: an EmptyCoroutineContext with an empty implementation inside

CombinedContext

internal class CombinedContext(
    private val left: CoroutineContext,
    private val element: Element
) : CoroutineContext, Serializable {
    ....
}
Copy the code
  • CombinedContext contains the left and element elements
    • Left could be an Element or CombinedContext
    • Element is the element
The get method
  • You can get the Element associated with the Key by CoroutineContext[Key]
override fun <E : Element> get(key: Key<E>): E? {
    var cur = this
    while (true) {
        // Whether the element matches. If yes, the match is returnedcur.element[key]? .let {return it }
        // If no match is found, start from left
        val next = cur.left
        // If left is CombinedContext, change next and repeat
        if (next is CombinedContext) {
            cur = next
        } else {
            // Returns a successful match
            return next[key]
        }
    }
}
Copy the code
A fold method
  • Provides the ability to traverse each Element in the CoroutineContext from left to right, and do an operation on each Element
// Operation is a function pointer that can perform a function reference
public override fun <R> fold(initial: R, operation: (R.Element) - >R): R =
    // Do the left operation on the left, and do the operation on the left returned by the fold operation and the element
    operation(left.fold(initial, operation), element)
Copy the code
MinusKey method
  • Returns a new CoroutineContext that removes the Element corresponding to the Key
public override fun minusKey(key: Key< * >): CoroutineContext {
        // If the element matches, it is returned for deletion, that is, the match succeedselement[key]? .let {return left }
        // If no match is found, start from left
        val newLeft = left.minusKey(key)
        return when {
            // If there is no target element in the left, then the current CombinedContext must not contain the target element
            newLeft === left -> this
            // If there is a target element in the left, delete the target element, left equals null, and return the elements of the current CombinedContext
            newLeft === EmptyCoroutineContext -> element
            // If there is a target element in the left, delete the target element, left is not empty, create a new CombinedContext and return
            else -> CombinedContext(newLeft, element)
        }
}
Copy the code
chart

  • It’s like a linked list, left is a pointer to the next node,
  • The logic flow of get and minusKey operations is to access the current element first, and then access the left element if not satisfied. The order is from right to left
  • The logic flow of a fold is to access all the elements by accessing left, recursing to the last element, and then returning from left to right.

Plus method

This method is an implementation of CoroutineContext and is internally divided into element merge and interceptor processing

public operator fun plus(context: CoroutineContext): CoroutineContext =
	    // If the merged elements are empty, return directly
        if (context === EmptyCoroutineContext) this else 
		   // Flod the element to be merged
            context.fold(this) { acc, element -> //acc== original element, element== merge element
                // Try to delete the merged element from the original element
                val removed = acc.minusKey(element.key)
                // If the deleted element is empty, it means that the original element is empty after deleting the same element as the target element
                if (removed === EmptyCoroutineContext) element else {
                    // Try to obtain the corresponding Interceptor
                    val interceptor = removed[ContinuationInterceptor]
                    // If the Interceptor is null, construct a new element
                    if (interceptor == null) CombinedContext(removed, element) else {
                        // If it is not empty, delete it
                        val left = removed.minusKey(ContinuationInterceptor)
                        if (left === EmptyCoroutineContext) CombinedContext(element, interceptor) else
                        	// If the deleted element is empty, it means that the original element is not empty after the same element as the target element has been removed. Then build a new element and return it, adding Interceptor to the end
                            CombinedContext(CombinedContext(left, element), interceptor)
                    }
                }
            }
Copy the code
  • The final CoroutineContext returned by the puls method does not have an element with the same key. The new element overwrites the CoroutineContext element with the same key, which is a feature of Set

  • The handling of the interceptor is to keep the ContinuationInterceptor as the last element in the CoroutineContext after each addition, in order to precede the execution of the coroutine

    CoroutineDispatcher inherits from ContinuationInterceptor

    • By placing the ContinuationInterceptor at the end, the coroutine will always find the interceptor the fastest when looking for elements in the context, avoiding the recursive search and thus having the interceptor execute before

4. Job Element

  • Each coroutine created (throughlaunchOr async), which returns oneJobInstance that uniquely identifies the coroutine and is responsible for managing its life cycle

4.1. The Job status

During execution, a Job contains a series of states. Although there is no way to directly obtain all the states, there are three attributes in a Job

  • IsActive (whether active)
  • IsCompleted or not
  • Cancelled (whether cancelled)

According to the attribute, the state of the Job can be inferred as follows

  • The newly created (New)
    • When a coroutine is created, it is in the New state
  • Active (Active)
    • The coroutine is Active when the Job’s start/join method is called
  • Complete (Completing)
    • When the coroutine completes, or by calling the complete method of the CompletableJob(which is a subinterface of the Job), the current coroutine enters the Completing state
  • Has been completed,Completed)
    • A coroutine in the Completed state will enter the Completed state until all subcoroutines have Completed
  • Cancel (Cancelling)
    • An error or a call to the Job’s cancel method will set the current coroutine to Cancelling
  • Has been cancelled,Cancelled)
    • Coroutines that are Cancelled will be Cancelled until all subcoroutines have been completed
State isActive isCompleted isCancelled
New (optional initial state) false false false
Active (default initial state) true false false
Completing (transient state) true false false
Cancelling (transient state) false false true
Cancelled (final state) false true true
Completed (final state) false true false
                                      wait children
+-----+ start  +--------+ complete   +-------------+  finish  +-----------+
| New | -----> | Active | ---------> | Completing  | -------> | Completed |
+-----+        +--------+            +-------------+          +-----------+
                 |  cancel / fail       |
                 |     +----------------+
                 |     |
                 V     V
             +------------+                           finish  +-----------+
             | Cancelling | --------------------------------> | Cancelled |
             +------------+                                   +-----------+

Copy the code

4.2. The Job

fun start(): Boolean
  • Call this function to start thisCoroutine
  • If the currentCoroutineThe call to this function return has not been executedtrue
  • If the currentCoroutineHas been or has been executed, calls this function to returnfalse
fun cancel(cause: CancellationException? = null)
  • The Job is cancelled by an optional cancellation reason
fun invokeOnCompletion(handler: CompletionHandler): DisposableHandle
  • This function gives youJobSet a completion notification whenJobWhen it’s donesynchronousExecute the notification function. The notification object type of the callback is:typealias CompletionHandler = (cause: Throwable?) -> Unit.
  • CompletionHandlerThe parameter representsJobHow it was executed.causeThere are three cases:
    • ifJobIs normal execution completed, thencauseParameters fornull
    • ifJobIs cancelled normally, thencauseParameters forCancellationExceptionObject. This situation should not be treated as an error; it is a normal cancellation of the task. There is generally no need to record this in the error log.
    • Other cases indicateJobExecution failed.
  • The return value of this function isDisposableHandleObject if monitoring is no longer neededJobCan be calledDisposableHandle.disposeFunction to cancel the listener. ifJobIf it has been executed, no call is requireddisposeFunction, will automatically cancel the listening.
suspend fun join()(Suspend function)
  • Used in anotherCoroutineIn wait until the job is executed.

4.3. Job Exception Propagation

  • Coroutines have parent and child levels. If a child Job is running with an exception, the parent Job will recognize and throw the exception. If you want to prevent this behavior you need to use the SupervisorJob

    Exceptions other than CancellationException

SupervisorJob
fun main(a){
     val parentJob = GlobalScope.launch {
       //childJob is a SupervisorJob
        val childJob = launch(SupervisorJob()){
            throw NullPointerException()
        }
        childJob.join()
        println("parent complete")
    }
    Thread.sleep(1000)}Copy the code

In this case, the exception thrown by childJob does not affect the running of parentJob. The parentJob continues to run and prints parent Complete.


5. CoroutineScope

  • CoroutineScope is a container that provides CoroutineContext, but defines the code boundaries to globally control all CoroutineContext in the inner scope. The source code is as below:
public interface CoroutineScope {
    public val coroutineContext: CoroutineContext
}
Copy the code

You only need one CoroutineContext to ensure that the CoroutineContext is passed throughout the coroutine run, and to constrain the scope of CoroutineContext

5.1. lifecycleScope

  • lifecycleScopeYou can have coroutines with andActivitySame life cycle awareness
public val LifecycleOwner.lifecycleScope: LifecycleCoroutineScope
    get() = lifecycle.coroutineScope
    
// Initialize and control exception propagation
public val Lifecycle.coroutineScope: LifecycleCoroutineScope
    get() {
        while (true) {
            val existing = mInternalScopeRef.get(a)as LifecycleCoroutineScopeImpl?
            if(existing ! =null) {
                return existing
            }
            val newScope = LifecycleCoroutineScopeImpl(
                this,
                SupervisorJob() + Dispatchers.Main.immediate
            )
            if (mInternalScopeRef.compareAndSet(null, newScope)) {
                newScope.register()
                return newScope
            }
        }
    }

// Implement lifecycle awareness and invoke key methods
internal class LifecycleCoroutineScopeImpl(
    override val lifecycle: Lifecycle,
    override val coroutineContext: CoroutineContext
) : LifecycleCoroutineScope(), LifecycleEventObserver {
    init {
        if (lifecycle.currentState == Lifecycle.State.DESTROYED) {
            coroutineContext.cancel()
        }
    }
 
    fun register(a) {
        launch(Dispatchers.Main) {
            if (lifecycle.currentState >= Lifecycle.State.INITIALIZED) {
                lifecycle.addObserver(this@LifecycleCoroutineScopeImpl)}else {
                coroutineContext.cancel()
            }
        }
    }

    override fun onStateChanged(source: LifecycleOwner, event: Lifecycle.Event) {
        if (lifecycle.currentState <= Lifecycle.State.DESTROYED) {
            lifecycle.removeObserver(this)
            coroutineContext.cancel()
        }
    }
  
// The extension cancellations the implementation
public fun CoroutineContext.cancel(a): Unit {
    this[Job]? .cancel() }Copy the code
  • Source code parsing is as follows:
    1. By creating LifecycleCoroutineScopeImpl CoroutineScope interface
    2. SupervisorJob is used to control exception propagation
    3. Control the thread type through dispatchers.main
    4. Create a coroutine in the Register method with launch, listen to the Activity lifecycle state, and call the cancel method when appropriate
    5. The cancellation policy is implemented by extension

5.2. Other methods

  • LifecycleScope also extends the scope of other control functions
lifecycleScope.launchWhenCreated {  }
lifecycleScope.launchWhenStarted {  }
lifecycleScope.launchWhenResumed {  }
Copy the code

6. ContinuationInterceptor

  • ContinuationInterceptor inherits from Coroutinecontext.Element, which is CoroutineContext
  • ContinuationInterceptor provides the interceptContinuation method, which implements intercepting.

6.1. intercepted call chain

public interface ContinuationInterceptor : CoroutineContext.Element {
    companion object Key : CoroutineContext.Key<ContinuationInterceptor>
    public fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T>
    ...
}
//1. Invoke the chain analysis process via launch
public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope. () - >Unit
): Job {
    val newContext = newCoroutineContext(context)
    val coroutine = if (start.isLazy)
        LazyStandaloneCoroutine(newContext, block) else
  			//2. Create from StandaloneCoroutine
        StandaloneCoroutine(newContext, active = true)
  	/ / 3. The start
    coroutine.start(start, coroutine, block)
    return coroutine
}

// Open method
public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R. () - >T) {
    initParentJob()
    start(block, receiver, this)}public operator fun <R, T> invoke(block: suspend R. () - >T, receiver: R, completion: Continuation<T>) =
    when (this) {
        / / opening method of CoroutineStart startCoroutineCancellable on by default
        CoroutineStart.DEFAULT -> block.startCoroutineCancellable(receiver, completion)
        CoroutineStart.ATOMIC -> block.startCoroutine(receiver, completion)
        CoroutineStart.UNDISPATCHED -> block.startCoroutineUndispatched(receiver, completion)
        CoroutineStart.LAZY -> Unit // will start lazily
    }

internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(receiver: R, completion: Continuation<T>) =
    runSafely(completion) {
        // Create a coroutine and call intercepted to intercept and finally call resumeCancellable, which is essentially resumeWith
        // Finally, the resumeWith method is automatically called once each time the coroutine is started
        createCoroutineUnintercepted(receiver, completion).intercepted().resumeCancellable(Unit)}Copy the code

6.2. intercepted Concrete implementation

// The declared cross-platform implementation interface
public expect fun <T> Continuation<T>.intercepted(a): Continuation<T>
// Android platform implementation
public actual fun <T> Continuation<T>.intercepted(a): Continuation<T> =
    (this as? ContinuationImpl)? .intercepted() ?:this
    
// The final call method
public fun intercepted(a): Continuation<Any? > = intercepted// For the first time, intercepted calls the interceptContinuation method, which intercepts processed continuations? : (context[ContinuationInterceptor]? .interceptContinuation(this) ?: this)
            .also { intercepted = it }
Copy the code

6.3. Example code

  • ContinuationInterceptor intercepts coroutines when they are started. Here is a simple example
GlobalScope.launch(object : ContinuationInterceptor {
    override val key: CoroutineContext.Key<*> = ContinuationInterceptor

    override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> {
        println("intercept")
        return object : Continuation<T> by continuation {
            override fun resumeWith(result: Result<T>) {
                println("create new thread")
                thread {
                    continuation.resumeWith(result)
                }
            }
        }
    }
}) {
    withContext(Dispatchers.IO) {
      //do something
    }
}.start()
Copy the code

Based on the examples, the following questions are raised:

6.4. Why do we need to keep ContinuationInterceptor as the last element in the CoroutineContext in the Plus method?

  1. In order to increase the speed of finding ContinuationInterceptor, it is added to the end. Here’s why:
    • The structure of the CombinedContext isleftwithelementAnd theleftSimilar to the prodromal segment, similar to the prodromal set,
    • elementJust a pure broken oneCoroutineContextWhile itgetThe method is always fromelementStart looking for correspondenceKeytheCoroutineContextobject
    • Only if there’s no matchleftSet for a recursive lookup
  2. ContinuationInterceptorUsed very frequently, in fact, every time you create to query the current coroutineCoroutineContextWhether ‘ContinuationInterceptor’ exists in

Suspend | coroutine state machine

  • The method modified by the suspend keyword is a coroutine method, which is essentially:

7.1. The CPS mechanism

  • throughCPS(Continuation-Passing-Style)Mechanism. Make every single onesuspendThe way of decorating orlambdaExpressions are added to the code when it is invokedContinuationParameter of type
/ / before the conversion
@GET("users/{user}/repos")
suspend fun getRepoListByUser(@Path("user") user: String): ApiResponse<List<Repo>>
/ / after the transformation
@GET("/v2/news")
fun getRepoListByUser(@Path("user") user: String, c: Continuation<List<Repo> >): Any?
Copy the code
  • The return changes to Any because:
    • whensuspendWhen a function is suspended by a coroutine, it returns a special identifierCOROUTINE_SUSPENDEDAnd it’s essentially aAny
    • When a coroutine executes without suspension, it returns either the result of execution or the exception raised.
    • So you useKotlinThe uniqueAny?Type.
  • Continuation source is as follows:
public interface Continuation<in T> {
    public val context: CoroutineContext
    public fun resumeWith(result: Result<T>)
}
Copy the code
  • Attribute analysis:
    • context: the context of a coroutine, in most casesCombinedContexttype
    • resumeWith: Wakes up a suspended coroutine to continue executing at the place where it was suspended when the logic inside the coroutine has finished executing.
  • Continuation gets the opportunity:
    • ContinutationCreated along with the coroutine,launchHas been created by the timeContinutationObject, and the coroutine is started.
    • inlaunchThis object is used as an argument for any suspended function passed in the code block.

7.2. Coroutine state machines

Coroutines are identified by suspend, but the real suspend start is determined by whether COROUTINE_SUSPENDED is returned, and the code representation is handled by state machines to suspend and resume coroutines. When it is necessary to suspend, preserve the field and set the next status point, and then suspend the coroutine by means of the exit method. The current thread is not blocked during suspension. The corresponding recovery is done by resumeWith to enter the next state of the state machine, while resuming the previously suspended scene when entering the next state

  • We analyze coroutine state machines in conjunction with kotlin bytecode files
public final class TextActivity extends ComponentActivity {
   protected void onCreate(@Nullable Bundle savedInstanceState) {
      super.onCreate(savedInstanceState);
         / /...
         public final Object invokeSuspend(@NotNull Object $result) {
            // Hang the logo
            Object var2 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
            // Use the label to handle the different states. The internal case is the logical code
            switch(this.label) {
            case 0:
               ResultKt.throwOnFailure($result);
               TextActivity var10000 = TextActivity.this;
               this.label = 1;
               if (var10000.test(this) == var2) {
                  return var2;
               }
               break;
            case 1:
               ResultKt.throwOnFailure($result);
               break;
            case 2:
              // Wake up after executing the corresponding logic for 2 seconds
               ResultKt.throwOnFailure($result);
               $this$async = this.p$;
               this.L$0 = $this$async;
               this.label = 1;
                 if (DelayKt.delay(2000L.this) == var3) {
                     return var3;
                  }
               break;
            default:
               throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
            }

            return Unit.INSTANCE;
         }
     	   / /...
         public final Object invoke(Object var1, Object var2) {
            // When the coroutine starts, it calls the resumeWith method once, and its internal logic is the invokeSuspend method
            return ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE); }}),3, (Object)null);
   }
   / /...
}
Copy the code
  1. The getCOROUTINE_SUSPENDED method, also known as the COROUTINE_SUSPENDED identifier, is used to indicate the suspension
  2. Different states are processed through label, and the pending identifier is returned in the corresponding case
  3. When COROUTINE_SUSPENDED is returned, the method is removed and the coroutine is suspended. The current thread can execute other logic without being blocked by a coroutine suspension
  4. Finally wait until the next state and execute the corresponding code
  5. When the label enters the case2 state, it will wake up after the corresponding time word