Moment For Technology

Talk a little bit about coroutines in Kotlin

Posted on Sept. 23, 2022, 2:39 a.m. by Todd Wright
Category: android Tag: android kotlin

1. Questions about asynchrony

You've used threads, right? How do you deal with logic when faced with I/O, network, etc.? The callback? Rx?

We usually use CallBack for asynchronous problems, but as we iterate through the code we find that:

  1. This splits the control flow into different control flows, increasing code complexity
  2. Too many levels of nesting can create callback hell, and the number of control flows can increase exponentially

Future

Java 5 introduced an interface to represent the future value of an asynchronous operation. It helps us encapsulate the details of asynchrony, solves the problem of control flow bifurcation and nested hell, but blocks on the main thread call

fun testCoroutine() {
    val future = requestDataAsync()
    doSomethingElse()
    processData(future.get())
}
Copy the code

Reactive

Java8 CompletableFuture, and rX series RX style can effectively solve the problem of nested callbacks, but does not solve the problem of control flow forking, RxJava is an excellent library, has rich operators, has a powerful ability to handle complex event flow, but has a certain learning cost, There is no guarantee that colleagues will have the same skills

Thinking: Is there a way to write more succinctly and in line with the human mind? Like writing synchronization logic, right? Let's look at Kotlin Coroutine

2.Coroutine

Official description: Coroutines simplify asynchronous programming by putting complexity into libraries. The program logic can be expressed sequentially in coroutines, and the underlying library takes care of the asynchrony for us. The library can wrap the relevant parts of user code as callbacks, subscribing to related events, and scheduling execution on different threads (or even different machines), while keeping the code as simple as sequential execution.

Coroutines are like very lightweight threads. Threads are scheduled by the system, and the overhead of thread switching or thread blocking is high. Coroutines depend on threads, but do not block threads when suspended, which is less costly, and are controlled by the developer (they can actively suspend or resume execution). So coroutines are also like user threads, very lightweight, you can create multiple coroutines in one thread.

Scheduling between tasks is collaborative, not preemptive

suspend fun testCoroutine() { GlobalScope.launch(Main) { val dataDeferred = getDataAsync() doSomethingElse() val data = dataDeferred.await() processData(data) } Thread.sleep(1000) doSomething() } fun getDataAsync(): DeferredUnit {return GlobalScope. Async {// Start an asynchronous coroutine to execute a time-consuming task requestData()}}Copy the code

3. Snatch vs. cooperate

In a CPU core, there is only one process executing at a time. With so many processes, how should the CPU time slice be allocated?

Collaborative multitasking:

Early operating systems adopted collaborative multitasking, that is, the process actively cedes execution rights eg: The current process needs to wait for I/O operations, actively cedes CPU, and the system schedules the next process. Hidden danger is: a single process can completely occupy the CPU, because the process in the computer is intermingled with good and bad, if the robustness of the process is poor, running in the middle of a dead cycle, deadlock, etc., will lead to the whole system into paralysis!

Preemptive multitasking:

Execution is determined by the operating system, which has the ability to take control from either process and gain control from another. The system allocates time slices for each process fairly and reasonably. If the time slices are used up, the process will sleep. Even if the time slices are not used up, the system will force the process to sleep if more urgent events need to be executed first. Threading has also become preemptive multitasking, which brings with it a new thread safety issue.

4.Coroutine vs Thread

The relationship between process, thread and coroutine is shown as follows:

Let's simulate the flow of thread and coroutine execution logic:

  • The multi-logic capability of multithreading depends heavily on the number of execution streams/threads applied to the program. Threads are relatively heavy, being large and moving in and out of the kernel. In this mode, threads waste a lot of memory by blocking and waiting.

  • The coroutine does not block as shown in Figure 1. Once a coroutine blocks, it either picks a ready coroutine from the pool to run, or the blocked coroutine specifies who runs after it, with no CPU time wasted.

On the difference between threads and coroutines:

  1. Coroutines are compiler-level (user-level), while threads are operating system-level (kernel-level). Coroutines are usually mechanisms implemented by compilers. Threads also seem to be at the language level, but the underlying principle is that the operating system has them first and then exposes them to the user through some API, which is different here.
  2. Threads or processes can usually do the same things with coroutines, but with more locking and communication.
  3. Thread is preemptive, while coroutine cooperation requires users to release the right to switch to other coroutines. Therefore, only one coroutine has the right to run at the same time, which is equivalent to the ability of a single thread.
  4. A thread is a partitioned CPU resource. A coroutine is an organized code flow. A coroutine needs a thread to carry and run it.

Using concurrency tools such as locks in coroutine code is like adding too much to the problem. It is recommended to avoid referencing variables in external scopes when writing coroutine code. Instead, use channels for communication between coroutines.

5. Coroutine advantages and disadvantages

  1. Lightweight: You can run multiple coroutines on a single thread because coroutines support suspension and do not block the thread running them. Suspension saves memory than blocking and supports multiple concurrent operations.

  2. Fewer memory leaks: Multiple operations are performed within a scope using structured concurrency mechanisms.

  3. Built-in cancel support: Cancel operations are automatically propagated throughout the running coroutine hierarchy.

  4. Jetpack Integration: Many Jetpack libraries include extensions that provide full coroutine support. Some libraries also provide their own coroutine scope that you can use to structure concurrency.

Are there really no drawbacks to coroutines? First, since coroutines run in threads, thread safety issues also exist for coroutines, but coroutines are largely avoided. In addition, in the case of high concurrency, coroutines are likely to have some indescribable problems.

6.Coroutine builders

There are three types of builders that help us create coroutines

1. launch:Job

public fun CoroutineScope.launch( context: CoroutineContext = EmptyCoroutineContext, start: CoroutineStart = CoroutineStart.DEFAULT, block: suspend CoroutineScope.() - Unit ): Job { val newContext = newCoroutineContext(context) val coroutine = if (start.isLazy) LazyStandaloneCoroutine(newContext, block) else StandaloneCoroutine(newContext, active = true) coroutine.start(start, Coroutine, block) return coroutine} Launch returns no value, or just a job, which knows the status of the task but does not carry the result. Launch is commonly used to run coroutines (such as file deletion, creation, etc.) that do not require action results. Launch is an extension of CoroutineScope. Coroutine below; 2. Coroutine startup mode; 3. Coroutine body: a block is a function literal with a receiver, which is a CoroutineScope Demo: GlobalScope.launch(Main) { val dataDeferred = getDataAsync() doSomethingElse() val data = dataDeferred.await() processData(data) }Copy the code

2.async/await:Deferred

public fun T CoroutineScope.async( context: CoroutineContext = EmptyCoroutineContext, start: CoroutineStart = CoroutineStart.DEFAULT, block: suspend CoroutineScope.() - T ): DeferredT { val newContext = newCoroutineContext(context) val coroutine = if (start.isLazy) LazyDeferredCoroutine(newContext, block) else DeferredCoroutineT(newContext, active = true) coroutine.start(start, Coroutine, block) return Coroutine} async has a return value, Deferred. It inherits jobs. It has all jobs and can carry back data that jobs don't have. Async is also an extension of CoroutineScope. Async takes three arguments as launch: 1. Coroutine below; 2. Coroutine startup mode; Return GlobalScope. Async {requestData()} return GlobalScope. Async {requestData()}Copy the code

3.runBlocking:T

@Throws(InterruptedException::class) public fun T runBlocking(context: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.() - T): T {runBlocking starts a new coroutine and blocks the current thread until all of its internal and subcoroutine logic has been executed. This method is designed to make libraries written in the suspend style usable in regular blocking code, and is often used in the main method and tests. Demo: fun test() = runBlockingUnit { val job = GlobalScope.launch { delay(3000L) println("sub World!" )}}Copy the code

7.CoroutineStart startup mode

Public enum class CoroutineStart {DEFAULT, LAZY, ATOMIC, @ ExperimentalCoroutinesApi / / Since 1.0.0, no ETA on stability UNDISPATCHED; @InternalCoroutinesApi public operator fun T invoke(block: suspend () - T, completion: ContinuationT): Unit = when (this) { DEFAULT - block.startCoroutineCancellable(completion) ATOMIC - block.startCoroutine(completion) UNDISPATCHED - block.startCoroutineUndispatched(completion) LAZY - Unit // will start lazily } @InternalCoroutinesApi public operator fun R, T invoke(block: suspend R.() - T, receiver: R, completion: ContinuationT): Unit = when (this) { DEFAULT - block.startCoroutineCancellable(receiver, completion) ATOMIC - block.startCoroutine(receiver, completion) UNDISPATCHED - block.startCoroutineUndispatched(receiver, completion) LAZY - Unit // will start lazily } @InternalCoroutinesApi public val isLazy: Boolean get() = this === LAZY }Copy the code

As an argument to the builder launch, Async, it defines the timing of Coroutine execution by CoroutineBuilder, helping us with various usage scenarios.

There are four boot modes:

Boot mode

role

DEFAULT

Default mode for immediate execution of coroutine body

LAZY

Run only as needed

ATOMIC

The coroutine body is executed immediately, but cannot be canceled before it starts running

UNDISPATCHED

The coroutine body is executed immediately in the current thread until the first suspend call

8.CoroutineContext

public interface CoroutineContext {

    public operator fun E : Element get(key: KeyE): E?

    public fun R fold(initial: R, operation: (R, Element) - R): R

    public operator fun plus(context: CoroutineContext): CoroutineContext =
        if (context === EmptyCoroutineContext) this else // fast path -- avoid lambda creation
            context.fold(this) { acc, element -
                val removed = acc.minusKey(element.key)
                if (removed === EmptyCoroutineContext) element else {
                    // make sure interceptor is always last in the context (and thus is fast to get when present)
                    val interceptor = removed[ContinuationInterceptor]
                    if (interceptor == null) CombinedContext(removed, element) else {
                        val left = removed.minusKey(ContinuationInterceptor)
                        if (left === EmptyCoroutineContext) CombinedContext(element, interceptor) else
                            CombinedContext(CombinedContext(left, element), interceptor)
                    }
                }
            }

    public fun minusKey(key: Key*): CoroutineContext

    public interface KeyE : Element

    public interface Element : CoroutineContext {

        public val key: Key*

        public override operator fun E : Element get(key: KeyE): E? =
            @Suppress("UNCHECKED_CAST")
            if (this.key == key) this as E else null

        public override fun R fold(initial: R, operation: (R, Element) - R): R =
            operation(initial, this)

        public override fun minusKey(key: Key*): CoroutineContext =
            if (this.key == key) EmptyCoroutineContext else this
    }
}
Copy the code

CoroutineContext is an interface that is essentially a type-safe heterogeneous Map data structure that is a collection of Element instances. Each Element in the collection has a key associated with the object reference that uniquely identifies an Element. As the basic structural unit of kotlin coroutines, it is important to take advantage of context: Eg: implement correct thread behavior, thread switching, interception, declaration cycles, exceptions, debugging, etc

Coroutines common interfaces and classes class diagram:

9.Job

public interface Job : CoroutineContext.Element {
    public companion object Key : CoroutineContext.KeyJob {
        init {
            CoroutineExceptionHandler
        }
    }

    // ------------ state query ------------
    public val isActive: Boolean
    public val isCompleted: Boolean
    public val isCancelled: Boolean
    @InternalCoroutinesApi
    public fun getCancellationException(): CancellationException

    // ------------ state update ------------
    public fun start(): Boolean
    public fun cancel(cause: CancellationException? = null)

    // ------------ parent-child ------------
    public val children: SequenceJob
    @InternalCoroutinesApi
    public fun attachChild(child: ChildJob): ChildHandle

    // ------------ state waiting ------------
    public suspend fun join()

    ......
}
Copy the code

Job is also a context element, a coroutine model that can be executed with a declaration cycle and task hierarchy.

The Job implementation relationship is Job = Element = CoroutineContext

  1. Jobs can be organized into parent and child hierarchies and have the following important features: 1.1 When the parent Job exits, all child jobs exit immediately 1.2 Child Jobs Throw CancellationException An unexpected exception causes the parent Job to exit immediately
  2. Similar to Thread, a Job may have multiple states. The status conversion relationship is as follows:

实 习 : Completing a job in the Active state (实 习), 实 习 : Completing a job in the Active state (实 习), getCancellationException(

10.CoroutineScope

public interface CoroutineScope {
    public val coroutineContext: CoroutineContext
}
Copy the code

CoroutineScope - CoroutineScope for managing coroutines:

  1. Create/start/switch coroutines: It defines launch, async, withContext, and other extension functions, and within these methods defines how the context inherits when the child coroutine is launched.
  2. Manages the coroutine life cycle: It defines the cancel() method, which cancels the current scope and all coroutines in the scope.

It is recommended that classes with a life cycle inherit CoroutineSocpe so that all coroutines can be managed uniformly at the end of their life cycle.

Class CoroutinesActivity: BaseTestActivity(), CoroutineScope {var job: Job = Job() override val coroutineContext: CoroutineContext get() = Dispatchers.Main + job override fun onCreate(savedInstanceState: Bundle?) { super.onCreate(savedInstanceState) setContentView(R.layout.activity_coroutine) findViewByIdTextView(R.id.tv_test5).setOnClickListener { loadDataAsync() } } private fun loadDataAsync() = launch { val ioData = async { // I/O operator } // do something else concurrently with I/O val data = ioData.await() } override fun onDestroy() { super.onDestroy() job.cancel() } }Copy the code

11.suspend

Suspend function: used to modify a function, indicating that it is a suspend function. The coroutine compiler performs CPS transformations during compilation. Functions decorated by suspend can be called only in the body of the coroutine and in functions also decorated by suspend.

Principle:

  1. Suspending a function or a call to a lambda expression implicitly passes in a Continuation type argument that encapsulates the code logic that will be executed after the coroutine recovers, like a callback interface.

    suspend fun requestData(): Bean { ... }

    Object requestData(Continuation cont) { ... }

  2. Instead of using the form of ordinary callbacks, the coroutine internal implementation uses a state machine (state mode) to handle different hanging points. The approximate CPS(Continuation Passing Style) translation is as follows:

    Normal code logic:

    { suspend fun requestData(): Bean { ... } // Suspend fun createPost(bean: bean, item: item): Post {... } // Suspend fun processPost(post: post) {... } fun postItem(item: Item) { GlobalScope.launch { val token = requestData() val post = createPost(bean, item) processPost(post) } }

    // The inner class generated after compilation looks like this. Both the creation of the coroutine and the starting point of the coroutine suspension generate a Case Final class postItem$1 extends SuspendLambda... { public final Object invokeSuspend(Object result) { ... switch (this.label) { case 0: this.label = 1; bean = requestData(this) break; case 1: this.label = 2; Bean bean = result; post = createPost(bean, this.item, this) break; case 2: Post post = result; processPost(post) break; }}}}

  3. Each hang start and the Continuation corresponding to the initial hang start in your code is converted to one state, and coroutine recovery is really just a jump to the next state.

  4. Suspend functions guarantee sequential execution within coroutines by dividing execution into fragments of continuations and by using state machines to ensure that the fragments are executed sequentially.

12.CPS State Machine (Continuation Style)

Most coroutine implementations are based on some sort of run-time yield mechanism, but Kotlin does not have this underlying support, so Kotlin implements his coroutine based on CPS transformations on JVMS that do not have yield instruction support.

After compilation, each suspend function generates an inner class StateMachine that implements the Continuation and holds the function's local variables and execution points. The StateMachine alternative to yield

  1. Remember the execution point; (Switch case) Each point where suspend execution is performed corresponds to a label. When resuming execution next time, the next execution position is found by increasing the label.

  2. Remember the local variable context at the time the function is paused.

    public interface Continuation { public val context: CoroutineContext public fun resumeWith(result: Result)} resume - Used to allow a suspended coroutine to resume execution from where it was suspended or paused.

13.Suspend vs CoroutineScope

In the Kotlin:

  1. Each method declared as a CoroutineScope extension method returns immediately, but concurrently executes what the extension method specifies, which is one of the reasons runBlocking is not a CoroutineScope extension method.
  2. Each method declared only as suspend waits for its internal logic to complete before returning to the caller.

The suspend method should return after all tasks have completed. If there are things inside the suspend method that need to be executed concurrently, you should wait for them to complete before returning, using coroutineScope{} instead of adding the coroutineScope extension to the method signature

14.ContinuationInterceptor

public interface ContinuationInterceptor : CoroutineContext.Element { companion object Key : CoroutineContext.KeyContinuationInterceptor public fun T interceptContinuation(continuation: ContinuationT): ContinuationT public fun releaseInterceptedContinuation(continuation: Continuation*) { /* do nothing by default */ } public override operator fun E : CoroutineContext.Element get(key: . }Copy the code

Continuation interceptor: is also a context can achieve thread switch, Hook, log and other functions

We customize a blocker: class MyContinuationInterceptor: ContinuationInterceptor { override val key = ContinuationInterceptor override fun T interceptContinuation(continuation: ContinuationT) = MyContinuation(continuation) } class MyContinuationT(val continuation: ContinuationT): ContinuationT { override val context = continuation.context override fun resumeWith(result: ResultT) { println("jason, result=$result" ) continuation.resumeWith(result) } } GlobalScope.launch { launch(MyContinuationInterceptor() { val deferred = async { println(2) } val result = deferred.await() println("5. $result") }.join() }Copy the code

Note: There can only be one interceptor. If there are more than one interceptor, the previous interceptor will be overwritten

14.CoroutineDispatcher

public abstract class CoroutineDispatcher : AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor { public open fun isDispatchNeeded(context: CoroutineContext): Boolean = true public abstract fun dispatch(context: CoroutineContext, block: Runnable) @InternalCoroutinesApi public open fun dispatchYield(context: CoroutineContext, block: Runnable): Unit = dispatch(context, block) public final override fun T interceptContinuation(continuation: ContinuationT): ContinuationT = DispatchedContinuation(this, continuation) @InternalCoroutinesApi public override fun releaseInterceptedContinuation(continuation: Continuation*) { (continuation as DispatchedContinuation*).reusableCancellableContinuation? .detachChild() } public operator fun plus(other: CoroutineDispatcher): CoroutineDispatcher = other ...... }Copy the code

Official description: The coroutine context contains a coroutine scheduler that determines which thread or threads the associated coroutine is executed on. The coroutine scheduler can limit the execution of a coroutine to a specific thread, dispatch it to a thread pool, or let it run unchecked.

All coroutine builders such as launch and async receive an optional CoroutineContext parameter, which can be used to explicitly specify a scheduler for a new coroutine or other context element.

A scheduler is essentially an implementation of a coroutine context and an interceptor interface. It can be said that a scheduler is a kind of interceptor. The dispatch method will be called in the interceptor's method interceptContinuation to implement the scheduling of the coroutine.

val job = GlobalScope.launch(Dispatchers.Main + CoroutineName("Mycoroutine")) { for (i in 10 downTo 1) { hello.text = "Countdown $i ..." Delay (500)} hello.text = "Done!" }Copy the code

Kotlin has several, and we usually use the official scheduler

Enumerated values

role

Main

The UI thread

Default

Share background thread pool threads

IO

Share background thread pool threads

Unconfined

Indeterminate, depending on the thread being called or the suspended function

/ / can also use this simple way of creating a custom of coroutines scheduler val myDispatcher = Executors. NewSingleThreadExecutor {r -  Thread (r, "MyThread") }.asCoroutineDispatcher()Copy the code

15.withContext

public suspend fun T withContext(
    context: CoroutineContext,
    block: suspend CoroutineScope.() - T
): T {
    contract {
        callsInPlace(block, InvocationKind.EXACTLY_ONCE)
    }
    return suspendCoroutineUninterceptedOrReturn sc@ { uCont -
        // compute new context
        val oldContext = uCont.context
        val newContext = oldContext + context
        // always check for cancellation of new context
        newContext.checkCompletion()
        ......
        // SLOW PATH -- use new dispatcher
        val coroutine = DispatchedCoroutine(newContext, uCont)
        coroutine.initParentJob()
        block.startCoroutineCancellable(coroutine, coroutine)
        coroutine.getResult()
    }
}
Copy the code

In Kotlin, this is a useful function that, instead of creating a new coroutine, can switch to a specified thread to run a block of code, and automatically cut the thread back to continue execution after the logical execution within the closure has finished.

Coroutinescope.launch (dispatchers.main) {// Start the UI thread // switch to the IO thread, Val image = withContext(dispatchers. IO) {getImage(imageId) // will run on the IO thread} Avatariv.setimagebitmap (image) // Back to the UI thread to update the UI}Copy the code

16.yield

I'm also confused at the beginning of yield. I read the comment:

Yields the thread (or thread pool) of the current coroutine dispatcher to other coroutines to run if possible. If possible, the thread (or thread pool) of the current coroutine scheduler is returned to the other coroutine to runCopy the code

Compared with JS

Return the value, and continue when you next enter, and continue from there the next time you enter.Copy the code

As you can see, Kotlin also provides yield syntax, but the meaning is different from that of other languages. Here, it is just like return, suspending the current coroutine and letting other coroutines run, and there are other details and restrictions on actual use.

Think: Use scenarios? Compare tasks that consume CPU resources and are persistent

17.Channel

public fun E Channel(
    capacity: Int = RENDEZVOUS,
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,
    onUndeliveredElement: ((E) - Unit)? = null
): ChannelE =
    when (capacity) {
        ......
    }
Copy the code

A Channel provides a way to transfer values in a stream. Used for data transmission between multiple coroutines, which allow sending and receiving of data in the same Channel.

A Channel is a very similar concept to BlockingQueue. One difference is that it replaces the blocking PUT operation with a pending SEND, and replaces the blocking take operation with a pending Receive.

Demo: val channel = channel Int() launch(IO) {// We'll just square integers 5 times and send for (x in 1.. Repeat (5) {println(channel.receive())} println("Done!") )Copy the code

18.Exception

public inline fun CoroutineExceptionHandler(crossinline handler: (CoroutineContext, Throwable) - Unit): CoroutineExceptionHandler =
    object : AbstractCoroutineContextElement(CoroutineExceptionHandler), CoroutineExceptionHandler {
        override fun handleException(context: CoroutineContext, exception: Throwable) =
            handler.invoke(context, exception)
    }
Copy the code

There are two kinds of exceptions to Kotlin coroutines:

  1. Cancellationexceptions thrown by the suspend method inside coroutines are ignored by all handlers because of coroutine cancellation, so they are used only as an additional source of debugging information that can be captured with catch blocks.
  2. 2.1 Async: Exposes an exception to the user (by catching an exception thrown by deffer.await()))

We can try catch caught, or you can use global exception handler CoroutineExceptionHandler, is very convenient.

The Demo:  fun exceptionTest() = runBlocking { val handler = CoroutineExceptionHandler { _, exception - println("Caught $exception") } val job = GlobalScope.launch(handler) { throw AssertionError() } val deferred = GlobalScope.async(handler) { throw ArithmeticException() // Nothing will be printed, relying on user to call deferred.await() } joinAll(deferred, job) }Copy the code

I/System.out: Caught java.lang.AssertionError

Note: Questions about the preliminary study of coroutines can be posted for discussion

Reference: Kotlin Chinese station, CSDN: Kotlin coroutine in detail

Search
About
mo4tech.com (Moment For Technology) is a global community with thousands techies from across the global hang out!Passionate technologists, be it gadget freaks, tech enthusiasts, coders, technopreneurs, or CIOs, you would find them all here.