In the last Retrofit source code analysis article, I mentioned some personal understanding of coroutines, which are not black magic, but simply encapsulate or automate the generation of thread scheduling and delivery callbacks required for asynchron.

Encapsulating a callback as a Continuation and calling its Resume or resumeWithException to return the result or throw an exception is no different from our regular callbacks.

Encapsulate the schedule as a scheduler, which is essentially a coroutine interceptor that intercepts continuations in order to implement the scheduling of callbacks.

The focus of this paper is to analyze how the scheduler works in the coroutine. Start with its base coroutine context, the coroutine interceptor.

1. Coroutine context

CoroutineContext CoroutineContext is an interface that defines a data structure whose direct subclasses include three elements, EmptyCoroutineContext, and CombinedContext.

public interface CoroutineContext {
    public operator fun <E : Element> get(key: Key<E>): E?

    public fun <R> fold(initial: R, operation: (R.Element) - >R): R

    public operator fun plus(context: CoroutineContext): CoroutineContext =  ...

    public fun minusKey(key: Key< * >): CoroutineContext
}
Copy the code

CoroutineContext defines four methods:

CoroutineContext role
get(key: Key): E? Find Element by Key
fold( R, (R, Element) -> R): R Walk through all elements
plus(CoroutineContext): CoroutineContext Two CoroutineContext are combined
minusKey(key: Key<*>): CoroutineContext Remove Element from Key

With these familiar methods, you can say that CoroutineContext is a collection of Elements indexed by Key.

Element 1.1.

ElementIt inherits itselfCoroutineContext.ElementThere is also an interface for these collections, but it implements collections of only one element.

1.2. EmptyCoroutineContext

EmptyCoroutineContext is a static class that, as the name suggests, is an empty implementation.

1.3. CombinedContext

CombinedContext is a one-way linked list implementation that stores the last element instead of the usual list storing the next element.

The get() method looks up from the current element.

The fold() method iterates through the current element at the last moment, making recursive calls to previous elements before processing itself.

1.4. ContinuationInterceptor

Coroutinecontext. Element has one main implementation — the ContinuationInterceptor.

The new interceptContinuation interface returns a new continuation wrapped around the original callback continuation, intercepting all callback operations.

Go back to the implementation of Plus.

Plus merges two Coroutinecontexts by adding individual elements from the merged CoroutineContext to the current CoroutineContext.

In addition, the value of the same Key is removed and the interceptor is placed last.

You can see it hereCoroutineContextTwo features of the same typeCoroutineContextOnly one can exist in the list, and the interceptor is at the end of the list. And becauseCombinedContextBecause of the specificity of the structure, it stores a pointer to the last element, so the last term is the first term.

2. The scheduler

Back to the coroutinescope.launch () method, the default value for CoroutineContext is EmptyCoroutineContext, which is the empty implementation mentioned earlier. In the body of the function call CoroutineScope. NewCoroutineContext () in the context of incoming encapsulation.

During encapsulation, dispatchers. Default is added when no other scheduler or ContinuationInterceptor is specified.

Here dispatchers. Default is the Default implementation of a scheduler, which is used to schedule processes.

First look at the definition interface of the dispatcher: CoroutineDispatcher.

Two methods isDispatchNeeded() and dispatch() have been added. IsDispatchNeeded () is used to determine whether dispatch is currently needed, and dispatch() is the execution code for dispatch.

CoroutineDispatcher is itself a subclass of Context and implements the ContinuationInterceptor interface, which encapsulates the callback as a DispatchedContinuation in the interceptor interface.

public abstract class CoroutineDispatcher :
    AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
    ...
    public open fun isDispatchNeeded(context: CoroutineContext): Boolean = true
    
    public abstract fun dispatch(context: CoroutineContext, block: Runnable)
    
    public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
        DispatchedContinuation(this, continuation)
}
Copy the code

DispatchedContinuation is a subclass of Continuation that overwrites the resumeWith() method and calls two new methods in CoroutineDispatcher.

If isDispatchNeeded() returns True, dispatcher.dispatch() is called to dispatch the thread. Otherwise, call the callback method continuation.resumewith () without constraint.

If you need to control the dispatch of coroutines, you can inherit the CoroutineDispatcher interface. However, in actual use, we do not need to implement it ourselves. Kotlin has four built-in Dispatchers Main, Default, IO and Unconfined in Dispatchers.

Main allows the coroutine body to run on the Main thread, Defalut and IO to run in a thread pool, and Unconfined is unrestricted. Here’s how they do it.

2.1. The Main

Main Can be obtained from the MainDispatcherLoader.

Into theMainDispatcherLoader, you can see that its main function is to createMainDispatcherFactoryAnd theMainDispatcherFactoryIs to createMainCoroutineDispatcher. Secondly, the needfactoryCreated becauseKotlinThere are multiple platform implementations, each implemented in a different way.

The implementation is AndroidDispatcherFactory, and the implementation is handlerContext, and seeing the familiar Looper, you can probably guess how to implement it.

IsDispatchNeeded () determines whether a dispatch is needed based on whether it is called immediately and whether it is already in the main thread. The implementation of dispatching in dispatch() is to call handler.post(). The MainHandler is the MainHandler obtained from looper.getmainlooper ().ashandler (). How Runnable is executed after it has been posted is a separate issue, which is no longer a chore.

In Android, Main is scheduled to dump all tasks onto the Main thread via handler.post().

2.2. The Default

The creation of Default calls a method called createDefaultDispatcher(). CreateDefaultDispatcher () uses DefaultScheduler directly.

DefaultScheduler implementation mainly in its superclass ExperimentalCoroutineDispatcher, directly to see it two methods about the scheduling.

IsDispatchNeeded () is in the base class CoroutineDispatcher. Returns true meaning dispatcher is needed in all cases.

The second method dispatch () implementation in ExperimentalCoroutineDispatcher, but have done is all agent gave coroutineScheduler to deal with.

Into the coroutineScheduler. Dispatch (), this method is more complex, the following analysis of this method, line by line.

The first method, createTask(), encapsulates Runnable by adding taskContext and calling TaskContext.afterTask () after run() has been executed. This feature is rarely used in Default, but will be used later when we talk about IO.

The second currentWorker() method calls thread.currentthread () to retrieve the currentThread and cast to return Worker. What is Worker?

Worker is the implementation of Thread inside coroutineScheduler, or it can be said to be the encapsulation of Thread, that is, coroutines scheduled by Default are run on these workers.

So let’s go back to this method, thread.currentThread () as? Under what circumstances does the Worker return Worker, and under what circumstances does NULL return Worker? The immediate explanation is that currentThread will return the Worker when it is a Worker object. Further further, it is in the coroutine body after Default scheduling that the Default scheduling is triggered again. At this time, the return value is not empty. As a counterexample, this method returns NULL if the schedule of Default is triggered in the main thread.

. The third method currentWorker submitToLocalQueue (), which is an extension method, literally means of Worker localQueue Task and the queue is thread needs to perform the Task queue, but this method may not succeed, Task is returned if it is unsuccessful or null if it succeeds.

If currentWorker is an empty object, the thread has terminated, or the thread is currently blocked but the Task is not blocked, the Task will not be blocked.

Then, if submitToLocalQueue() does not return empty, addToGlobalQueue() is called to add tasks to coroutineScheduler’s self-maintained Task queue, which holds all tasks that the Worker does not know should be assigned. Waiting to be pulled out and executed.

At the last step, if the task is non-blocking, signalCpuWork() is called, where two methods are called to try to get the Worker.

TrtUnpark () attempts to remove a Worker from the stack and, if successful, calls locksupport.unpark () to wake up the corresponding thread. LockSupport is a thread blocking utility class, with a corresponding method unpark() and park(), used to control the wake up and block of the thread, does not require an object lock and has no effect on the order of execution. The action here is to wake up the blocked Worker on the stack.

If the Worker is not successfully retrieved from the stack, tryCreateWorker() is called to attempt to create a Worker. Here, as long as the running thread does not exceed the set upper limit, new workers will be created, and if it is the first time, two workers will be created at the same time, in order to realize the preemption mechanism (to preempt the subsequent tasks of the other Worker to perform by themselves). Here the corePoolSize is controlled by the virtual machine.

The creation procedure checks the upper limit and then creates the Worker and calls the start() method to start the thread.

Either wake up or start(), you go to the worker.run () method, get the Task by findTask(), and then call executeTask() to execute it.

executeTask()The method of execution is calledRunnable.run()Method, simple.

In most cases, findTask() will go to findAnyTask() and fetch tasks from its own task queue or global task queue according to flag bits and random numbers. If it fails to get tasks, it will also walk through other workers’ task queues by trySteal() to preempt their tasks. It’s all about getting things done faster.

hereDefaultBasically, there are no secrets. The essence is to create a new thread, start a thread, or wake up a thread.

2.3. IO

The IO scheduling is based on Default, and its implementation is limited Dispatcher, but DefaultScheduler itself is passed in as a parameter. It can be thought that IO does some additional processing on the basis of Default.

Go to limited Dispatcher, or go to dispatch().

At each time of scheduling, the inFlight count is increased by one and compared to the parallelism parameter. InFlight can be understood as the number of tasks currently being executed, while Parallelism is the upper limit of parallelism, which is determined by JVM configuration.

If the current number of concurrent tasks does not reach the upper limit, the Default scheduler method passed in is called; otherwise, the task is placed in a self-maintained task queue for execution.

As you might have noticed, instead of calling the dispatch() method, you call Default with dispatchWithContext() and pass itself in. Taskcontext.aftertask () : TaskContext.afterTask () : TaskContext.afterTask () : TaskContext.afterTask () : TaskContext.afterTask () : TaskContext.afterTask () : TaskContext.afterTask ()

AfterTask () attempts to fetch a new task from the task queue, and decrement the counter by one if none is available.

This isIO– added logic to increase the limit of concurrent tasks. If the limit is not reached,IOandDefaultIt’s the same effect.

2.4. Unconfined

Unconfined implementation is simple, isDispatchNeeded() returns false.

3. Summary

Through the source code analysis of the four schedulers, this is no longer a black magic. After all, this is the familiar handler.post() or thread.start (), no different from creating an asynchronous callback yourself. To return to the beginning to the explanation of the coroutines it just transfer the thread scheduling and asynchronous need callback encapsulation or generated automatically, in essence or asynchronous callback itself, but it will encapsulate the tedious work, is very easy to use, after this know the principle, can be more comfortable to use this tool.

If there are any questions in this article, please point them out.

Your likes and comments are my biggest motivation!

4. Refer to the article

  1. One or two things about Kotlin Coroutines Dispatchers
  2. Cracking Kotlin coroutines (3) – Coroutine scheduling
  3. Suspend (suspend) with the thread blocking utility class LockSupport