This article has been reprinted with authorization by hongyangAndroid, an official wechat account.

This article is about using coroutines and coroutine source code analysis in Android.

Before I talk about coroutines, let me say a few words about threads and thread pools:

Thread is the kernel resource of the operating system, is the smallest unit of CPU scheduling, all applications run on thread, it is the basis of our implementation of concurrent and asynchronous. In Java API, Thread is the basic class to implement Thread. Every time a Thread object is created, the operating system kernel will start a Thread. In the source code of Thread, its internal implementation is a large number of JNI calls, because the implementation of Thread must be directly supported by the operating system. Each Java thread corresponds to a native thread, which is one-to-one. In Android, the pthread_create function in Linux API will be called during the creation of a thread.

Thread calls have the following problems:

  • Threads are not lightweight resources. Creating a large number of threads consumes a lot of system resources. Traditional blocking calls result in a large number of threads that cannot run because of blocking, which wastes system resources.
  • Switching between thread blocking and running states can be quite expensive and has always been an optimization point, for example: The JVM optimizes locks at run time, such as spin locks, lock coarsening, lock elimination, and so on.

Thread Pool is a tool to manage threads based on pooling idea. Using Thread Pool has the following benefits:

  • Reduced resource consumption: Reuse of created threads through pooling techniques to reduce thread creation and destruction costs.
  • Improved response time: Tasks can be executed immediately when they arrive without waiting for threads to be created.
  • Improve manageability of threads: Threads are scarce resources. If they are created without limit, they will not only consume system resources, but also cause resource scheduling imbalance due to unreasonable distribution of threads, which reduces system stability. Thread pools can be used for unified allocation, tuning and monitoring.
  • More and more power: Thread pools are extensible, allowing developers to add more functionality to them.

What does a coroutine have to do with threads? In Java, coroutines are thread pool-based apis that don’t deviate from what Java or Kotlin already have.

Definition of coroutines

Derived from Simula and Modula-2, coroutines are a programming idea that is not limited to a particular language, having been coined by Melvin Edward Conway in 1958 to build assembler programs. Using it in Android simplifies code for asynchronous execution, and it was added to Kotlin in version 1.3.

Use of coroutines

Here’s how to use coroutines:

Rely on

To use coroutines, add the following dependencies to the build.gradle file:

Build. gradle file in the root directory of the project:

// build.gradle(AndroidGenericFramework)
ext {
    // Omit some code
    kotlinxCoroutinesVersion = '1.3.1'
    // Omit some code
}
Copy the code

Build. gradle file for module:

// build.gradle(:app)
dependencies {
    // Omit some code
    implementation "org.jetbrains.kotlinx:kotlinx-coroutines-core:$kotlinxCoroutinesVersion"
    implementation "org.jetbrains.kotlinx:kotlinx-coroutines-android:$kotlinxCoroutinesVersion"
    // Omit some code
    testImplementation "org.jetbrains.kotlinx:kotlinx-coroutines-test:$kotlinxCoroutinesVersion"
    // Omit some code
}
Copy the code
  • Org. Jetbrains. Kotlinx: kotlinx coroutines — the core: the core of coroutines library, it is the public API coroutines, with this layer common code to make the interface of coroutines in each platform.
  • Org. Jetbrains. Kotlinx: kotlinx coroutines – android: coroutines current platform corresponding platform libraries, the current is the android platform, it is a concrete realization of coroutines in specific platforms, because similar multithreading is different in each platform is implemented.
  • Org. Jetbrains. Kotlinx: kotlinx – coroutines – test: test of coroutines library, it is convenient in our test, we use coroutines.

The important thing here is to keep the versions of the three libraries consistent.

basis

Here are the basics of coroutines:

Start the coroutines

Coroutines can be started in two ways:

  • Launch: A new coroutine can be launched, but the result is not returned to the caller.
  • Async: You can start new coroutines and allow the await function to be suspended to return results.

Typically we use the launch function to launch a new coroutine from a regular function, and only use async if we want to perform parallel decomposition.

The async function can return the result as follows:

// Builders.common.kt
public fun <T> CoroutineScope.async(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope. () -> T
): Deferred<T> {
    val newContext = newCoroutineContext(context)
    val coroutine = if (start.isLazy)
        LazyDeferredCoroutine(newContext, block) else
        DeferredCoroutine<T>(newContext, active = true)
    coroutine.start(start, coroutine, block)
    return coroutine
}
Copy the code

The async function returns the Deferred interface, inheriting the Job interface, which is a non-blocking, cancelable future.

It is important to note that launch and async functions handle exceptions in different ways. While using async functions, we can call await functions to get the result. If an exception occurs, it will be raised silently, that is, it will not appear in the crash indicator and will not be noted in logcat.

The await function is for a single coroutine and the awaitAll function is for multiple coroutines, both of which guarantee that the coroutine completes before returning the result.

Typically coroutines are created in three ways, as follows:

runBlocking

Use the runBlocking top-level function to create a coroutine. This approach is thread-blocking and is suitable for unit testing. It is not used for general business development.

runBlocking {
    login()
}
Copy the code

The source of the runBlocking function is as follows:

// Builders.kt
@Throws(InterruptedException::class)
// The first argument context is the coroutine context, which defaults to EmptyCoroutineContext, and the second argument is a Lambda expression with a CoroutineScope receiver object that returns T with no arguments
public fun <T> runBlocking(context: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope. () -> T): T {
    val currentThread = Thread.currentThread()
    val contextInterceptor = context[ContinuationInterceptor]
    val eventLoop: EventLoop?
    val newContext: CoroutineContext
    if (contextInterceptor == null) {
        // Create or use a private event loop if no dispatcher is specified
        eventLoop = ThreadLocalEventLoop.eventLoop
        newContext = GlobalScope.newCoroutineContext(context + eventLoop)
    } else {
        // See if the context's interceptor is one of the event loops we'll use (to support TestContext) or if there is a Threadlocal event loop, we'll use it to avoid blocking, but it won't create a new one
        eventLoop = (contextInterceptor as? EventLoop)? .takeIf { it.shouldBeProcessedFromContext() } ? : ThreadLocalEventLoop.currentOrNull() newContext = GlobalScope.newCoroutineContext(context) }val coroutine = BlockingCoroutine<T>(newContext, currentThread, eventLoop)
    coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
    return coroutine.joinBlocking()
}
Copy the code

GlobalScope

Using a GlobalScope singleton and calling the launch function to create a coroutine does not block threads, but is not recommended in Android because it is the lifetime of the application and can lead to memory leaks that cannot be cancelled if not handled properly. The sample code looks like this:

GlobalScope.launch {
    login()
}
Copy the code

The GlobalScope source code is as follows:

// CoroutineScope.kt
public object GlobalScope : CoroutineScope {
    /** * Returns [EmptyCoroutineContext]. */
    override val coroutineContext: CoroutineContext
        get() = EmptyCoroutineContext
}
Copy the code

EmptyCoroutineContext is an EmptyCoroutineContext.

CoroutineScope

Using the CoroutineScope object and calling the launch function to create a coroutine can control the coroutine’s life cycle by passing in CoroutineContext. This is recommended, as shown in the following example code:

CoroutineScope(Dispatchers.IO).launch {
    login()
}
Copy the code

Dispatchers.IO is one of the types of CoroutineContext, which is covered below.

The CoroutineScope manages one or more related coroutines and can be used to start new coroutines within a specified range.

Unlike the scheduler, the CoroutineScope does not run coroutines.

An important feature of CoroutineScope is to stop the execution of the coroutine when the user leaves the content area of your application. It ensures that all operations that are running are stopped correctly.

CoroutineScope source code as follows:

// CoroutineScope.kt
// The argument block is a Lambda expression with a CoroutineScope receiver object that takes no arguments and returns R
public suspend fun <R> coroutineScope(block: suspend CoroutineScope. () -> R): R =
    suspendCoroutineUninterceptedOrReturn { uCont ->
        val coroutine = ScopeCoroutine(uCont.context, uCont)
        coroutine.startUndispatchedOrReturn(coroutine, block)
    }
Copy the code

Use coroutines in Android

On the Android platform, coroutines help solve two major problems:

  • Manage long-running tasks that, if not managed properly, can block the main thread and cause your application interface to freeze.
  • Provides mainthread security, or securely invoking network or disk operations from the main thread.

Manage long-running tasks

On the Android platform, each application has a main thread that handles the interface and manages user interaction. If your application has too much work assigned to the main thread, the interface will render slowly or freeze, and respond slowly to touch events such as network requests, JSON parsing, writing or reading to a database, and traversing large lists, all of which should be done on the worker thread.

Coroutines add two operations to regular functions to handle long-running tasks. To invoke or call and return, coroutines add suspend and resume:

  • Suspend is used to suspend execution of the current coroutine and save all local variables.
  • Resume is used to allow a suspended coroutine to resume execution from where it was suspended.

The suspend function can only be called from another suspend function, or by using a coroutine builder (for example, launch) to start a new coroutine.

Kotlin uses stack frames to manage which functions to run and all local variables. The current stack frame is copied and saved for later use when the coroutine is paused; When the coroutine is restored, the stack frame is copied back from where it was saved, and the function starts running again.

The compiler performs a continuous-transfer style (CPS) transformation on functions decorated by the suspend modifier during compilation, which changes the function signature of the suspend function. Here’s an example:

Await function is a suspend function, and the function signature is as follows:

suspend fun <T> CompletableFuture<T>.await(a): T
Copy the code

After ** continuation transfer style (CPS) ** transformation at compile time:

fun <T> CompletableFuture<T>.await(continuation: Continuation<T>): Any?
Copy the code

We can see that the Continuation transfer style (CPS) transformation has an argument of type Continuation. Continuation code looks like this:

interface Continuation<in T> {
    val context: CoroutineContext
    fun resumeWith(result: Result<T>)
}
Copy the code

The continuation wraps the code that the coroutine continues to execute after it has been suspended. During compilation, a complete coroutine is divided into one continuation after another. After the await function is suspended, it calls the resumeWith function of the argument continuation to restore the code after the await execution.

The continuation transfer style (CPS) transformation returns Any? This is because when the function is transformed, it returns a union type of type T (returning itself) and the COROUTINE_SUSPENDED flag. Since Kotlin has no union type syntax, the most generalized type Any is used. The COROUTINE_SUSPENDED tag indicates that a de facto suspend operation will occur in the suspend function.

In the following three dispatcher, they all inherit the CoroutineDispatcher class, the source code is as follows:

// CorountineDispatcher.kt
public abstract class CoroutineDispatcher :
    AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {

    // Omit some code

}
Copy the code

ContinuationInterceptor this class implements the ContinuationInterceptor interface, and the source code is as follows:

// ContinuationInterceptor.kt
@SinceKotlin("1.3")
public interface ContinuationInterceptor : CoroutineContext.Element {

    // Define the key of the context interceptor
    companion object Key : CoroutineContext.Key<ContinuationInterceptor>

    // Return the original wrapped Continuation to intercept any recovery
    public fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T>

    Call for the Continuation instance returned by interceptContinuation when initialized
    public fun releaseInterceptedContinuation(continuation: Continuation< * >) {
        /* do nothing by default */
    }

    public override operator fun <E : CoroutineContext.Element> get(key: CoroutineContext.Key<E>): E? {
        // getPolymorphicKey is a key specifically used by ContinuationInterceptor
        @OptIn(ExperimentalStdlibApi::class)
        if (key is AbstractCoroutineContextKey<*, *>) {
            @Suppress("UNCHECKED_CAST")
            return if (key.isSubKey(this.key)) key.tryCast(this) as? E else null
        }
        @Suppress("UNCHECKED_CAST")
        return if (ContinuationInterceptor === key) this as E else null
    }


    public override fun minusKey(key: CoroutineContext.Key< * >): CoroutineContext {
        // minusPolymorphicKey is a key specifically used by ContinuationInterceptor
        @OptIn(ExperimentalStdlibApi::class)
        if (key is AbstractCoroutineContextKey<*, *>) {
            return if (key.isSubKey(this.key) && key.tryCast(this) != null) EmptyCoroutineContext else this
        }
        return if (ContinuationInterceptor === key) EmptyCoroutineContext else this}}Copy the code

This interface, called a continuation interceptor, is responsible for intercepting the code executed by the coroutine after recovery (that is, the continuation) and restoring it in the specified thread or thread pool.

During compilation, each suspend function is compiled into an anonymous class that implements the Continuation interface. It is not necessary to suspend the coroutine when you call the suspend function, as an example: A network logic called await function request, if you don’t have network request as a result, coroutines would be suspended, until get the result, renew body interceptor can intercept only happen after hang hang starting point of the body, and the starting point for the hang hang didn’t happen, coroutines will call resumeWith function, and no longer want any body interceptors.

Continued body interceptors will cache the interception of body, and in releaseInterceptedContinuation function called release don’t need it.

Use coroutines to secure the main thread

The Kotlin coroutine uses the scheduler to determine which threads are used to execute the coroutine. All coroutines must be run in the scheduler. The coroutine can be paused, and the scheduler is responsible for restoring it.

Kotlin provides three schedulers that you can use to specify where coroutines should be run:

  • Dispatchers.Main: This scheduler can be used to run coroutines on the Main Android thread. It can only be used for interface interactions and performing quick tasks, such as calling the suspend function, running Android interface framework operations, and updating LiveData objects.
  • Dispatchers.Default: This scheduler is good for CPU intensive tasks outside of the main thread, such as sorting lists and parsing JSON.
  • Dispatchers.IO: This scheduler is suitable for performing disk or network I/O outside of the main thread, such as operating databases (using Room), writing to or reading from files, and performing any network operations.

We can schedule threads by calling the withContext function and passing in the corresponding CoroutineContext.

The withContext function is the suspend function, which controls the thread pool of any line of code without referring to a callback, and can therefore be applied to very small functions, as shown in the sample code below:

suspend fun getUserInfo(a) {       // Dispatchers.Main
    val data = fetchUserInfo()    // Dispatchers.Main
    show(data)                    // Dispatchers.Main
}

suspend fun fetchUserInfo(a) {     // Dispatchers.Main
    withContext(Dispatchers.IO) { // Dispatchers.IO
        // Execute network requests // dispatchers.io
    }                             // Dispatchers.Main
}
Copy the code

In the sample code, the getUserInfo function executes on the main thread, which can safely call the fetchUserInfo function to execute the network request in the worker thread, and suspend it until the withContext block completes execution, The coroutine on the main thread restores the subsequent logic based on the result of the fetchUserInfo function.

Using the withContext function does not add any additional overhead compared to the callback implementation, and in some cases is even better than the callback implementation, for example: The external withContext function allows Kotlin to stay in the same scheduler and switch threads only once. In addition, Kotlin has optimized the switch between dispatchers. Default and dispatchers. IO. To avoid thread switching as much as possible.

Note that Dispatchers.Default and Dispatchers.IO are both Dispatchers that use thread pools. Thread-local variables do not point to the same value for the entire withContext block.

Dispatchers.Main

The source code is as follows:

// Dispatchers.kt
public actual object Dispatchers {
    // Omit some code
    @JvmStatic
    public actual val Main: MainCoroutineDispatcher get() = MainDispatcherLoader.dispatcher
    // Omit some code
}
Copy the code

Then look at the MainDispatcherLoader dispatcher, source code is as follows:

// MainDispatchers.kt
internal object MainDispatcherLoader {

    // Omit some code

    @JvmField
    val dispatcher: MainCoroutineDispatcher = loadMainDispatcher()

    // Omit some code

}
Copy the code

MainCoroutineDispatcher is an abstract class, and one of its implementation classes is sealed Class HandlerDispatcher, HandlerContext is the only subclass of HandlerDispatcher, and the source code is as follows:

// MainCoroutineDispatcher.kt
internal class HandlerContext private constructor(
    private val handler: Handler,
    private valname: String? .private val invokeImmediately: Boolean
) : HandlerDispatcher(), Delay {
    // The constructor of HandlerContext, with handler as the handler to be passed in, and name as the optional name for debugging
    public constructor(
        handler: Handler,
        name: String? = null
    ) : this(handler, name, false)

    @Volatile
    private var _immediate: HandlerContext? = if (invokeImmediately) this else null

    override valimmediate: HandlerContext = _immediate ? : HandlerContext(handler, name,true).also { _immediate = it }

    Context is CoroutineContext
    override fun isDispatchNeeded(context: CoroutineContext): Boolean {
        // Determine the value of invokeImmediately or whether it is the same thread
        return! invokeImmediately || Looper.myLooper() ! = handler.looper }// Schedule threads with context as CoroutineContext and block as Runnable
    override fun dispatch(context: CoroutineContext, block: Runnable) {
        // Call Handler's POST method to add a Runnable to the message queue. The Runnable will run when the Handler is attached to the thread
        handler.post(block)
    }

    override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
        val block = Runnable {
            with(continuation) { resumeUndispatched(Unit)}}// Call Handler's postDelayed method to add Runnable to the message queue and run at the end of the specified time
        handler.postDelayed(block, timeMillis.coerceAtMost(MAX_DELAY))
        continuation.invokeOnCancellation { handler.removeCallbacks(block) }
    }

    override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle {
        // Call Handler's postDelayed method to add Runnable to the message queue and run at the end of the specified time
        handler.postDelayed(block, timeMillis.coerceAtMost(MAX_DELAY))
        return object : DisposableHandle {
            override fun dispose(a) {
                // Call Handler's removeCallbacks method to remove the Runnable from the message queue
                handler.removeCallbacks(block)
            }
        }
    }

    override fun toString(a): String =
        if(name ! =null) {
            if (invokeImmediately) "$name [immediate]" else name
        } else {
            handler.toString()
        }

    override fun equals(other: Any?).: Boolean = other is HandlerContext && other.handler === handler
    override fun hashCode(a): Int = System.identityHashCode(handler)
}
Copy the code

Then we look for a place to call the constructor of HandlerContext. The source code looks like this:

// HandlerDispatcher.kt
@JvmField
@Deprecated("Use Dispatchers.Main instead", level = DeprecationLevel.HIDDEN)
internal val Main: HandlerDispatcher? = runCatching { HandlerContext(Looper.getMainLooper().asHandler(async = true), "Main") }.getOrNull()
Copy the code

You can see that the looper.getMainLooper method, which is the MainLooper of the application, is passed in to the Main thread of the application.

You can see that a lot of handler-related methods are used, which means that it still relies on Android’s messaging mechanism.

Dispatchers.Default

The source code is as follows:

// Dispatchers.kt
public actual object Dispatchers {

    // Omit some code

    @JvmStatic
    public actual val Default: CoroutineDispatcher = createDefaultDispatcher()

    // Omit some code

}
Copy the code

The createDefaultDispatcher function looks like this:

// CoroutineContext.kt
internal actual fun createDefaultDispatcher(a): CoroutineDispatcher =
    if (useCoroutinesScheduler) DefaultScheduler else CommonPool
Copy the code

UseCoroutinesScheduler = CommonPool; useCoroutinesScheduler = CommonPool;

// CoroutineContext.kt
internal const val COROUTINES_SCHEDULER_PROPERTY_NAME = "kotlinx.coroutines.scheduler"

internal val useCoroutinesScheduler = systemProp(COROUTINES_SCHEDULER_PROPERTY_NAME).let { value ->
    when (value) {
        null.""."on" -> true
        "off" -> false
        else -> error("System property '$COROUTINES_SCHEDULER_PROPERTY_NAME' has unrecognized value '$value'")}}Copy the code

The internal variables (internal val) useCoroutinesScheduler is according to the System. The JVM getProperty method to obtain, by passing in * * kotlinx. Coroutines. “scheduler” as the key (key), Return on**, useCoroutinesScheduler true; The value returned is off, useCoroutinesScheduler is false.

Let’s take a look at DefaultScheduler. The source code is as follows:

// Dispatcher.kt
internal object DefaultScheduler : ExperimentalCoroutineDispatcher() {
    val IO = blocking(systemProp(IO_PARALLELISM_PROPERTY_NAME, 64.coerceAtLeast(AVAILABLE_PROCESSORS)))

    override fun close(a) {
        throw UnsupportedOperationException("$DEFAULT_SCHEDULER_NAME cannot be closed")}override fun toString(a): String = DEFAULT_SCHEDULER_NAME

    @InternalCoroutinesApi
    @Suppress("UNUSED")
    public fun toDebugString(a): String = super.toString()
}
Copy the code

It inherits ExperimentalCoroutineDispatcher class, it is not stable, after may change, may have a look of this class dispatch function, this function is responsible for scheduling threads and the source code is as follows:

// Dispatcher.kt
@InternalCoroutinesApi
open class ExperimentalCoroutineDispatcher(
    // Number of core threads
    private val corePoolSize: Int.// Maximum number of threads
    private val maxPoolSize: Int.// The time the scheduler stays alive (in nanoseconds)
    private val idleWorkerKeepAliveNs: Long.// Scheduler name
    private val schedulerName: String = "CoroutineScheduler"
) : ExecutorCoroutineDispatcher() {
    constructor(
        corePoolSize: Int = CORE_POOL_SIZE,
        maxPoolSize: Int = MAX_POOL_SIZE,
        schedulerName: String = DEFAULT_SCHEDULER_NAME
    ) : this(corePoolSize, maxPoolSize, IDLE_WORKER_KEEP_ALIVE_NS, schedulerName)

    @Deprecated(message = "Binary compatibility for Ktor 1.0-beta", level = DeprecationLevel.HIDDEN)
    constructor(
        corePoolSize: Int = CORE_POOL_SIZE,
        maxPoolSize: Int = MAX_POOL_SIZE
    ) : this(corePoolSize, maxPoolSize, IDLE_WORKER_KEEP_ALIVE_NS)

    // Omit some code

    override fun dispatch(context: CoroutineContext, block: Runnable): Unit =
        try {
            // The dispatch function of coroutineScheduler is called
            coroutineScheduler.dispatch(block)
        } catch (e: RejectedExecutionException) {
            DefaultExecutor.dispatch(context, block)
        }

    // Omit some code

}
Copy the code

Take a look at the CoroutineScheduler class, and then take a look at its dispatch function, source code as follows:

// CoroutineScheduler.kt
@Suppress("NOTHING_TO_INLINE")
internal class CoroutineScheduler(
    // Number of core threads
    @JvmField val corePoolSize: Int.// Maximum number of threads
    @JvmField val maxPoolSize: Int.// The time the scheduler stays alive (in nanoseconds)
    @JvmField val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS,
    // Scheduler name
    @JvmField val schedulerName: String = DEFAULT_SCHEDULER_NAME
) : Executor, Closeable {
    init {
        require(corePoolSize >= MIN_SUPPORTED_POOL_SIZE) {
            "Core pool size $corePoolSize should be at least $MIN_SUPPORTED_POOL_SIZE"
        }
        require(maxPoolSize >= corePoolSize) {
            "Max pool size $maxPoolSize should be greater than or equals to core pool size $corePoolSize"
        }
        require(maxPoolSize <= MAX_SUPPORTED_POOL_SIZE) {
            "Max pool size $maxPoolSize should not exceed maximal supported number of threads $MAX_SUPPORTED_POOL_SIZE"
        }
        require(idleWorkerKeepAliveNs > 0) {
            "Idle worker keep alive time $idleWorkerKeepAliveNs must be positive"}}// Omit some code

   fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, tailDispatch: Boolean = false) {
       // Used to support virtual time
       trackTask()
       val task = createTask(block, taskContext)
       // Try to submit the task to a local queue and take the execution logic based on the result
       val currentWorker = currentWorker()
       val notAdded = currentWorker.submitToLocalQueue(task, tailDispatch)
       if(notAdded ! =null) {
           if(! addToGlobalQueue(notAdded)) {// The global queue should not accept more tasks when the last step is closed
               throw RejectedExecutionException("$schedulerName was terminated")}}valskipUnpark = tailDispatch && currentWorker ! =null
        if (task.mode == TASK_NON_BLOCKING) {
            if (skipUnpark) return
            // Execute the task
            signalCpuWork()
        } else {
            // Add blocking tasks
            signalBlockingWork(skipUnpark = skipUnpark)
        }
   }

   // Omit some code

}
Copy the code

CoroutineScheduler implements the Executor interface. In Java, the core thread pool implementation class is ThreadPoolExecutor, which implements the Executor interface. So this coroutine escheduler is an implementation of thread pools in coroutines.

CorePoolSize is the number of core threads that are available to the current processor by calling the JVM runtime.getruntime ().availableProcessors() method, which enforces a default of at least two threads.

MaxPoolSize specifies the maximum number of threads. The minimum is corePoolSize and the maximum is **(1 SHL BLOCKING_SHIFT) – 2**. BLOCKING_SHIFT is 21. Make sure the value obtained by Runtime.getruntime ().availableProcessors() is multiplied by 2 between the minimum and maximum.

All this function does is push the incoming task onto the task stack and either call signalCpuWork to perform the task or call signalBlockingWork to increase the blocking task.

CommonPool = CommonPool = CommonPool

// CommonPool.kt
internal object CommonPool : ExecutorCoroutineDispatcher() {

    // Omit some code

    private fun createPool(a): ExecutorService {
        if(System.getSecurityManager() ! =null) return createPlainPool()
        // ForkJoinPool class reflection, so that it can run on JDK6 (not here), if not use normal thread pools
        val fjpClass = Try { Class.forName("java.util.concurrent.ForkJoinPool")}? :return createPlainPool()
        // Try to use commonPool unless parallelism is explicitly specified or when debugging privatePool mode
        if(! usePrivatePool && requestedParallelism <0) {
            Try { fjpClass.getMethod("commonPool")? .invoke(null) as? ExecutorService } ? .takeIf { isGoodCommonPool(fjpClass, it) } ? .let {return it }
        }
        // Try to create a private ForkJoinPool instance
        Try { fjpClass.getConstructor(Int: :class.java).newInstance(parallelism) as? ExecutorService } ? . let {return it }
        // Use normal line cities
        return createPlainPool()
    }

    // Omit some code

    // Create a normal thread pool
    private fun createPlainPool(a): ExecutorService {
        val threadId = AtomicInteger()
        // Use Java's newFixedThreadPool thread pool
        return Executors.newFixedThreadPool(parallelism) {
            Thread(it, "CommonPool-worker-${threadId.incrementAndGet()}").apply { isDaemon = true}}}// Omit some code

    // Schedule threads
    override fun dispatch(context: CoroutineContext, block: Runnable) {
        try{ (pool ? : getOrCreatePoolSync()).execute(wrapTask(block)) }catch (e: RejectedExecutionException) {
            unTrackTask()
            DefaultExecutor.enqueue(block)
        }
    }

    // Omit some code

}
Copy the code

You can see that using CommonPool is actually using Java’s newFixedThreadPool thread pool.

The dispatchers. Default scheduler’s core thread pool is equal to the number of threads in the processor, so it can be used to handle intensive computations. it is suitable for performing cpu-intensive tasks outside the main thread, such as sorting lists and parsing JSON, similar to the idea of calculating thread pools in RxJava.

Dispatchers.IO

The source code is as follows:

// Dispatchers.kt
public actual object Dispatchers {

    // Omit some code

    @JvmStatic
    public val IO: CoroutineDispatcher = DefaultScheduler.IO

}
Copy the code

IO is a member variable of DefaultScheduler.

internal object DefaultScheduler : ExperimentalCoroutineDispatcher() {
    / / call the parent class ExperimentalCoroutineDispatcher blocking function
    val IO = blocking(systemProp(IO_PARALLELISM_PROPERTY_NAME, 64.coerceAtLeast(AVAILABLE_PROCESSORS)))

    override fun close(a) {
        throw UnsupportedOperationException("$DEFAULT_SCHEDULER_NAME cannot be closed")}override fun toString(a): String = DEFAULT_SCHEDULER_NAME

    @InternalCoroutinesApi
    @Suppress("UNUSED")
    public fun toDebugString(a): String = super.toString()
}
Copy the code

May have a look of its superclass ExperimentalCoroutineDispatcher blocking function, source code is as follows:

// Dispatcher.kt
@InternalCoroutinesApi
open class ExperimentalCoroutineDispatcher(
    // Number of core threads
    private val corePoolSize: Int.// Maximum number of threads
    private val maxPoolSize: Int.// The time the scheduler stays alive (in nanoseconds)
    private val idleWorkerKeepAliveNs: Long.// Scheduler name
    private val schedulerName: String = "CoroutineScheduler"
) : ExecutorCoroutineDispatcher() {
    constructor(
        corePoolSize: Int = CORE_POOL_SIZE,
        maxPoolSize: Int = MAX_POOL_SIZE,
        schedulerName: String = DEFAULT_SCHEDULER_NAME
    ) : this(corePoolSize, maxPoolSize, IDLE_WORKER_KEEP_ALIVE_NS, schedulerName)

    @Deprecated(message = "Binary compatibility for Ktor 1.0-beta", level = DeprecationLevel.HIDDEN)
    constructor(
        corePoolSize: Int = CORE_POOL_SIZE,
        maxPoolSize: Int = MAX_POOL_SIZE
    ) : this(corePoolSize, maxPoolSize, IDLE_WORKER_KEEP_ALIVE_NS)

    // Omit some code

    public fun blocking(parallelism: Int = BLOCKING_DEFAULT_PARALLELISM): CoroutineDispatcher {
        require(parallelism > 0) { "Expected positive parallelism level, but have $parallelism" }
        // Create the LimitingDispatcher object
        return LimitingDispatcher(this, parallelism, TASK_PROBABLY_BLOCKING)
    }

    // Omit some code

}
Copy the code

LimitingDispatcher class = LimitingDispatcher

// Dispatcher.kt
private class LimitingDispatcher(
    The dispatcher for ExperimentalCoroutineDispatcher type / / final variables
    val dispatcher: ExperimentalCoroutineDispatcher,
    val parallelism: Int.override val taskMode: Int
) : ExecutorCoroutineDispatcher(), TaskContext, Executor {

    // Omit some code

    // Dispatch the thread, calling the dispatch(block: Runnable, tailDispatch: Boolean) function
    override fun dispatch(context: CoroutineContext, block: Runnable) = dispatch(block, false)

    private fun dispatch(block: Runnable, tailDispatch: Boolean) {
        var taskToSchedule = block
        while (true) {
            // Submit the task slot that is being executed
            val inFlight = inFlightTasks.incrementAndGet()

            // Fast path, if the parallelism limit is not reached, the task is dispatched and returned
            if (inFlight <= parallelism) {
                / / call ExperimentalCoroutineDispatcher dispatchWithContext function
                dispatcher.dispatchWithContext(taskToSchedule, this, tailDispatch)
                return
            }

            // The task is added to the queue when the parallelism limit is reached
            queue.add(taskToSchedule)

            if (inFlightTasks.decrementAndGet() >= parallelism) {
                return} taskToSchedule = queue.poll() ? :return}}// Omit some code

}
Copy the code

You can see that the Dispatchers.Default scheduler and Dispatchers.IO scheduler share the same thread pool.

Specify CoroutineScope

When defining a coroutine, you must specify its CoroutineScope. The CoroutineScope can manage one or more related coroutines and can be used to start new coroutines within a specified range.

Unlike the scheduler, the CoroutineScope does not run coroutines.

An important feature of CoroutineScope is to stop the execution of the coroutine when the user leaves the content area of the application, ensuring that all ongoing operations are stopped correctly.

On the Android platform, CoroutineScope implementations can be associated with the Lifecycle of components, such as Lifecycle and ViewModel, to avoid memory leaks and no additional work on activities or fragments that are relevant to the user, such as: ViewModelScope, LifecycleScope, and liveData.

Add a KTX dependency
  • For ViewModelScope, please use the androidx. Lifecycle: lifecycle – viewmodel – KTX: 2.1.0 – beta01 or later.
  • For LifecycleScope, please use the androidx. Lifecycle: lifecycle – runtime – KTX: 2.2.0 – alpha01 or later.
  • For liveData, please use the androidx. Lifecycle: lifecycle – liveData – KTX: 2.2.0 – alpha01 or later.
Life cycle aware CoroutineScope
ViewModelScope

Define the ViewModelScope for the ViewModel. If the ViewModel is cleared, the coroutine started in this scope is automatically cancelled. You can use this if your work needs to be done while the ViewModel is active, as shown in the following code:

class MyViewModel : ViewModel() {

    init {
        viewModelScope.launch {
            // When the ViewModel is cleared, coroutines started in this range are automatically cancelled}}}Copy the code
LifecycleScope

Define a LifecycleScope for each Lifecycle object and coroutines that start within this scope will be cancelled automatically when Lifecycle is destroyed. Through lifecycle. CoroutineScope or lifecycleOwner. LifecycleScope property access lifecycle coroutineScope, sample code as shown below:

class MyFragment : Fragment() {

    override fun onViewCreated(view: View, savedInstanceState: Bundle?). {
        super.onViewCreated(view, savedInstanceState)
        viewLifecycleOwner.lifecycleScope.launch {
            // Coroutines started within this scope will be cancelled automatically when Lifecycle is destroyed}}}Copy the code

Even if the CoroutineScope provides appropriate methods to automatically cancel long-running operations, in some cases it may be necessary to pause execution of a code block, for example: To use FragmentTransaction, the Fragment’s Lifecycle is at least STARTED, in which case Lifecycle provides these methods: WhenCreated, lifecycle. WhenStarted, and lifecycle. WhenResumed, if Lifecycle is not in at least the minimum state required, then any coroutines running within these code blocks will be paused, as shown in the code sample below:

class MyFragment : Fragment() {

    init {
        // The Fragment constructor can be safely started
        lifecycleScope.launch {
            whenCreateed {
                // This code block executes only if the Fragment lifecycle is at least CREATED and other suspend functions can be called
            }

            whenStarted {
                // This code block is executed only if the Fragment's lifecycle is at least STARTED and other suspend functions can be called
            }

            whenResumed {
                // This code block will only execute if the Fragment's lifecycle is at least RESUMED, and other suspend functions can be called}}}}Copy the code
liveData

When using LiveData, we may need to evaluate values asynchronously, for example, when user information is retrieved and displayed on the interface. In this case, we can call the suspend function using the LiveData builder function and return the result as a LiveData object, as shown in the following example code:

val userInfoData: LiveData<UserInfoData> = liveData {
    The getUserInfo function is a suspend function
    val data = remoteSource.getUserInfo()
    emit(data)}Copy the code

The liveData builder is used as a structured concurrency primitive between coroutines and liveData.

When LiveData becomes active, the code block starts executing; When LiveData becomes inactive, the code block is automatically cancelled after a configurable timeout. If the code block is cancelled before completion, it will restart after LiveData becomes active again; If it completed successfully in the last run, there will be no restart. Note that a code block is restarted only if it is cancelled automatically and will not be restarted if the block is cancelled for any other reason (such as throwing a CancelationException).

We can emit multiple values from the code block, and each call to the emit function pauses execution of the code block until the LiveData value is set on the main thread.

My GitHub: TanJiaJunBeyond

Common Android Framework: Common Android framework

My nuggets: Tan Jiajun

My simple book: Tan Jiajun

My CSDN: Tan Jiajun