Note: This coroutine is Kotlin coroutine

The previous article introduced suspend functions in coroutines — introducing the Continuation interface in coroutines and the concept of CPS variations, and exploring in detail the entire process from suspend to recovery of suspend functions.

AbstractCoroutine AbstractCoroutine AbstractCoroutine AbstractCoroutine AbstractCoroutine AbstractCoroutine AbstractCoroutine AbstractCoroutine AbstractCoroutine AbstractCoroutine AbstractCoroutine AbstractCoroutine AbstractCoroutine AbstractCoroutine AbstractCoroutine AbstractCoroutine AbstractCoroutine AbstractCoroutine AbstractCoroutine AbstractCoroutine AbstractCoroutine Here is a summary of their roles:

  • Continuation: Hides callbacks and converts asynchronous writing to synchronous writing
  • Job: Controls the life cycle of coroutines and helps structural concurrency between coroutines
  • CoroutineScope: Delineates the scope of coroutines

What is a Job?

Translated as a task, Job gives coroutines cancellations, life cycles, and the ability to structure concurrency. The most important feature in common use is cancelable, structured concurrency. Lifecycle can be cancelled automatically, especially in everyday Android development.

Job life cycle

The life cycle of a Job can be divided into six states, including New, Active, Completing, Cancelling, Cancelled and Completed. Generally, the external world holds the Job interface and the coroutine caller holds it as a reference. The Job interface provides isActive, isCompleted, and isCancelled variables so that the external world can perceive the internal status of the Job. The following figure shows the relationship between the three variables and the six status of the Job life cycle

Let’s use three examples to illustrate how the Job life cycle works

New => Active => Completed

    val job = launch(start = CoroutineStart.LAZY) {
        println("Active")   
    }
    println("New")  
    job.join()
    println("Completed")    
Copy the code
  1. The state of the coroutine created in lazy mode is New
  2. After the job invokes the join function, the coroutine enters the Active state and executes the specific code corresponding to the coroutine
  3. When the coroutine completes execution, since there are no subcoroutines to wait on, the coroutine enters the Completed state directly

ParentActive => ChildActive => ParentCompleting => ChildCompleted => ParentCompleted

val parent = launch {
    println("ParentActive")
    val child = launch {
        println("ChildActive")
    }
    child.invokeOnCompletion {
        println("ChildCompleted")
    }
    println("ParentCompleting")
}
parent.invokeOnCompletion {
    println("ParentCompleted")}Copy the code
  1. The invokeOnCompletion function is used here to listen for the callbacks of the coroutine into the Completed and Cancelled states
  2. Start the parent coroutine and the parent coroutine enters the Active state
  3. The parent coroutine starts the child in the running code. The child coroutine enters the Active state and starts executing the specific code corresponding to the child coroutine
  4. The parent coroutine completes its code and enters the Completing state to wait for the child coroutine to complete its execution
  5. The child coroutine completes execution and enters the Completed state, followed by the parent coroutine

ParentActive => ChildActive => ParentCancelling => ChildCancelled => ParentCancelled

val parent = launch {
    println("ParentActive")
    val child = launch {
        println("ChildActive")
    }
    child.invokeOnCompletion {
        println("ChildCancelled")
    }
    cancel()
    println("ParentCancelling")
}
parent.invokeOnCompletion {
    println("ParentCancelled")}Copy the code
  1. Cancellations of coroutines depend on cancellationExceptions, which are not picked up as cancellations
  2. Start the parent coroutine and the parent coroutine enters the Active state
  3. In the parent coroutine, the child coroutine is initiated, and the child coroutine enters the Active state
  4. Call the cancel function to cancel the parent coroutine before it finishes executing, and then pass the cancellation event to all of its children
  5. The parent coroutine then enters Cancelling state and waits for the child coroutine to finish
  6. The child coroutine goes Cancelled and the parent coroutine goes Cancelled

The cancellation of the Job

How to trigger the cancellation of coroutines by calling the cancel function or throwing an exception. When cancelling a coroutine using cancel, the cancellation event is passed from the parent to the child coroutine, and then the child coroutine is cancelled. This is a recursive process. Cancelling the coroutine when an exception is thrown causes the cancellation event to be passed up and down both ways. Therefore, the cancellation of coroutines can be discussed in two dimensions:

  • The cancellation event triggered by Cancel
  • A cancellation event that is triggered by a non-CancellationException

The cancellation event triggered by Cancel

Take a scenario where cancel is called inside a coroutine run. The overall call chain is as follows:

CoroutineScope.cancel(cause: CancellationException?)
-> JobSupport.cancel(cause: CancellationException?)    / / 1
-> JobSupport.cancelInternal(cause: Throwable)
-> JobSupport.cancelImpl(cause: Any?)    / / 2
-> JobSupport.makeCancelling(cause: Any?)    / / 3
-> JobSupport.tryMakeCancelling(state: Incomplete, rootCause: Throwable)    / / 4
-> JobSupport.notifyCancelling(list: NodeList, cause: Throwable)    / / 5
Copy the code

The above labeled functions are analyzed emphatically:

Mark 1

public override fun cancel(cause: CancellationException?).{ cancelInternal(cause ? : defaultCancellationException()) }Copy the code

Cancelling the coroutine through the Cancel function depends on passing a CancellationException, and a default CancellationException is generated if the Cancel function does not pass any CancellationException

With 2

// cause is Throwable or ParentJob when cancelChild was invoked
// returns true is exception was handled, false otherwise
internal fun cancelImpl(cause: Any?).: Boolean {
        var finalState: Any? = COMPLETING_ALREADY
        if (onCancelComplete) {
            // make sure it is completing, if cancelMakeCompleting returns state it means it had make it
            // completing and had recorded exception
            finalState = cancelMakeCompleting(cause)
            if (finalState === COMPLETING_WAITING_CHILDREN) return true
        }
        if (finalState === COMPLETING_ALREADY) {
            finalState = makeCancelling(cause)
        }
        return when {
            finalState === COMPLETING_ALREADY -> true
            finalState === COMPLETING_WAITING_CHILDREN -> true
            finalState === TOO_LATE_TO_CANCEL -> false
            else -> {
                afterCompletion(finalState)
                true}}}Copy the code

FinalState starts with a default state COMPLETING_ALREADY. If the coroutine is running without a “body block” — the logical code to execute — this parameter is true. So we skip this analysis, we call makeCancelling(cause), and then note 3

With 3

    // transitions to Cancelling state
    // cause is Throwable or ParentJob when cancelChild was invoked
    // It contains a loop and never returns COMPLETING_RETRY, can return
    // COMPLETING_ALREADY -- if already completing or successfully made cancelling, added exception
    // COMPLETING_WAITING_CHILDREN -- if started waiting for children, added exception
    // TOO_LATE_TO_CANCEL -- too late to cancel, did not add exception
    // final state -- when completed, for call to afterCompletion
    private fun makeCancelling(cause: Any?).: Any? {
        var causeExceptionCache: Throwable? = null // lazily init result of createCauseException(cause)
        loopOnState { state ->
            when (state) {
                is Finishing -> { // already finishing -- collect exceptions
                    val notifyRootCause = synchronized(state) {
                        if (state.isSealed) return TOO_LATE_TO_CANCEL // already sealed -- cannot add exception nor mark cancelled
                        // add exception, do nothing is parent is cancelling child that is already being cancelled
                        val wasCancelling = state.isCancelling // will notify if was not cancelling
                        // Materialize missing exception if it is the first exception (otherwise -- don't)
                        if(cause ! =null| |! wasCancelling) {valcauseException = causeExceptionCache ? : createCauseException(cause).also { causeExceptionCache = it } state.addExceptionLocked(causeException) }// take cause for notification if was not in cancelling state beforestate.rootCause.takeIf { ! wasCancelling } } notifyRootCause? .let { notifyCancelling(state.list, it) }return COMPLETING_ALREADY
                }
                is Incomplete -> {
                    // Not yet finishing -- try to make it cancelling
                    valcauseException = causeExceptionCache ? : createCauseException(cause).also { causeExceptionCache = it }if (state.isActive) {
                        // active state becomes cancelling
                        if (tryMakeCancelling(state, causeException)) return COMPLETING_ALREADY
                    } else {
                        // non active state starts completing
                        val finalState = tryMakeCompleting(state, CompletedExceptionally(causeException))
                        when {
                            finalState === COMPLETING_ALREADY -> error("Cannot happen in $state")
                            finalState === COMPLETING_RETRY -> return@loopOnState
                            else -> return finalState
                        }
                    }
                }
                else -> return TOO_LATE_TO_CANCEL // already complete}}}Copy the code

Call this method to enter the Cancelling state of the Job. First, look at the loopOnState method, which appears frequently in JobSupport, the implementation class of Job interface. Because there may be multiple threads trying to modify the state, a set of cyclic reading mechanism is designed. When we modify a variable using a compareAndSet like _state.compareAndSet(state, cancelling), the modification may fail and we may need to try again to read the new state. We will then proceed to state, but we need to pause for a moment because we need to familiarize ourselves with several states in the Job implementation class JobSupport. JobSupport implements isActive, isCompleted, isCancelled:

    public override val isActive: Boolean get() {
        val state = this.state
        return state is Incomplete && state.isActive
    }

    public final override val isCompleted: Boolean get() = state !is Incomplete

    public final override val isCancelled: Boolean get() {
        val state = this.state
        return state is CompletedExceptionally || (state is Finishing && state.isCancelling)
    }
Copy the code

By analyzing the above codes, it can be concluded that in the implementation of JobSupport, the judgment of state completely depends on three class objects: Incomplete, CompletedExceptionally and Finishing.

  • If a class corresponding to a state object implements the InCompete interface, rewriting to an object is not complete
  • The state object can represent cancelled state if it inherits the CompletedExceptionally class
  • If the State object corresponds to the Finishing class, it indicates that the coroutine has been cancelled normally or is in a state waiting for the subcoroutine to finish executing

Val causeException = causeExceptionCache? Val causeException = causeExceptionCache? Val causeException = causeExceptionCache CreateCauseException (cause). Also {causeExceptionCache = it} To obtain the cause of the cancellation, enter the createCauseException method:

 // cause is Throwable or ParentJob when cancelChild was invoked
    private fun createCauseException(cause: Any?).: Throwable = when (cause) {
        isThrowable? -> cause ? : defaultCancellationException()else -> (cause as ParentJob).getChildJobCancellationCause()
    }
Copy the code

When the cause is common abnormalities, abnormal returns itself or the default CancellationException, when the cause is ParentJob, call ParentJob getChildJobCancellationCause method, This is obviously why the child coroutine calls the parent coroutine’s method to get the cancellation of the parent coroutine, passing the cancellation of the parent to the child coroutine. When the reason for the cancellation is obtained, tryMakeCancelling is executed, and tag 4 is analyzed

With 4

    // try make new Cancelling state on the condition that we're still in the expected state
    private fun tryMakeCancelling(state: Incomplete, rootCause: Throwable): Boolean {
        / / 1
        assert { state !is Finishing } // only for non-finishing states
        assert { state.isActive } // only for active states
        // get state's list or else promote to list to correctly operate on child lists
        / / 2
        vallist = getOrPromoteCancellingList(state) ? :return false
        // Create cancelling state (with rootCause!)
        val cancelling = Finishing(list, false, rootCause)
        if(! _state.compareAndSet(state, cancelling))return false
        / / 3
        // Notify listeners
        notifyCancelling(list, rootCause)
        return true
    }
Copy the code

The tryMakeCancelling method takes three steps, as indicated in the code:

  1. State assertions are made to ensure that the runtime is in the correct state
  2. Convert state to NodeList state (NodeList stores a series of JobNodes, which act as references to subcoroutines and can be used to cancel subcoroutines)
  3. Cancelling status is reminded and notification is cancelled

See note 5 for a detailed analysis of notifyCancelling

With 5

private fun notifyCancelling(list: NodeList, cause: Throwable) {
        // first cancel our own children
        onCancelling(cause)
        notifyHandlers<JobCancellingNode<*>>(list, cause)
        // then cancel parent
        cancelParent(cause) // tentative cancellation -- does not matter if there is no parent
    }
Copy the code

The onCancelling method is an empty implementation in JobSupport. The only implementation is in ActorCoroutine, which cannot be analyzed here. Next, notifyHandlers and cancelParent correspond to notifying child coroutines and parent coroutines of cancellation respectively. There is no practical meaning of informing the parent coroutine, because the cause is Cancellation

private inline fun <reified T: JobNode<*>> notifyHandlers(list: NodeList, cause: Throwable?). {
        var exception: Throwable? = null
        list.forEach<T> { node ->
            try {
                node.invoke(cause)
            } catch(ex: Throwable) { exception? .apply { addSuppressedThrowable(ex) } ? : run { exception = CompletionHandlerException("Exception in completion handler $node for $this", ex) } } } exception? .let { handleOnCompletionException(it) } }Copy the code

NotifyHandlers walk through the nodes of NodeList, calling the invoke method of each node. In this example, the node’s corresponding class is ChildHandleNode, which calls parentCancelled(parentJob: ParentJob) method, which is also implemented by JobSupport. It can be seen that this method will also call cancelImpl method, so the child node will recursively call cancelImpl method to cancel all child nodes.

Cancellation event triggered by an exception

Exception cancellation of coroutines is passed up and down. Here is an example:

    val parent = launch {
        println("ParentActive")
        val child = launch {
            println("before cancel")
            throw Exception("cancel")
            println("after cancel")
        }
        child.invokeOnCompletion {
            println("ChildCancelled")
        }
        child.join()
    }
    parent.invokeOnCompletion {
        println("ParentCancelled")}Copy the code

The output of this code is as follows:

ParentActive
before cancel
ChildCancelled
ParentCancelled
Copy the code

The cancellation event triggered by the exception is similar to the cancellation event triggered by the Cancel. To understand the whole process, we need to use the BaseContinuationImpl described in the previous chapter. First, we intercept the necessary code to help understand, as follows:

 valoutcome: Result<Any? > =try {
         val outcome = invokeSuspend(param)
         if (outcome === COROUTINE_SUSPENDED) return
         Result.success(outcome)
     } catch (exception: Throwable) {
         Result.failure(exception)
     }
 releaseIntercepted() // this state machine instance is terminating
 if (completion is BaseContinuationImpl) {
     // unrolling recursion via loop
     current = completion
     param = outcome
 } else {
     // top-level completion reached -- invoke and return
     completion.resumeWith(outcome)
     return
 }
Copy the code

InvokeSuspend is where the logical code actually runs. If an exception is thrown in invokeSuspend, the code is aborted and AbstractCoroutine’s resumeWith method is executed as follows:

AbstractCoroutine.resumeWith(result: Result<T>) -> JobSupport.makeCompletingOnce(proposedUpdate: Any?) -> JobSupport.tryMakeCompleting(state: Any? , proposedUpdate: Any?) -> JobSupport.tryMakeCompletingSlowPath(state: Incomplete, proposedUpdate: Any?) -> Jobsupport.notifyCancelling(list: NodeList, cause: Throwable)Copy the code

After a series of calls, we return to JobSupport’s notifyCancelling method, where notifyHandlers have been analyzed in detail and are not discussed here. We will focus on cancelParent:

 /** * The method that is invoked when the job is cancelled to possibly propagate cancellation to the parent. * Returns `true` if the parent is responsible for handling the exception, `false` otherwise. * * Invariant: never returns `false` for instances of [CancellationException], otherwise such exception * may leak to the [CoroutineExceptionHandler]. */
    private fun cancelParent(cause: Throwable): Boolean {
        // Is scoped coroutine -- don't propagate, will be rethrown
        if (isScopedCoroutine) return true

        /* CancellationException is considered "normal" and parent usually is not cancelled when child produces it. * This allow  parent to cancel its children (normally) without being cancelled itself, unless * child crashes and produce some other exception during its completion. */
        val isCancellation = cause is CancellationException
        val parent = parentHandle
        // No parent -- ignore CE, report other exceptions.
        if (parent === null || parent === NonDisposableHandle) {
            return isCancellation
        }

        // Notify parent but don't forget to check cancellation
        return parent.childCancelled(cause) || isCancellation
    }
Copy the code

The cancelParent method will simply determine if there is a parent coroutine. If there is a parent coroutine, JobSupport’s childCancelled method will be called and the parent coroutine will check for exceptions other than CancellationException. The last call is to the cancelImpl method of the parent coroutine, and the remaining calls to this point are similar to coroutine cancellation caused by Cancel, except that the exception passed is not a CancellationException.

Did Cancel really terminate?

Let’s take a look at this code and see what it looks like in action:

val parent = launch {
        println("ParentActive")
        val child = launch {
            println("ChildActive")
        }
        child.invokeOnCompletion {
            println("ChildCancelled")
        }
        println("parent: before cancel")
        cancel()
        println("parent: after cancel")
    }
    parent.invokeOnCompletion {
        println("ParentCancelled")}Copy the code

The output from this code is:

ParentActive
parent: before cancel
parent: after cancel
ChildCancelled
ParentCancelled
Copy the code

Why parent: after cancel is still printed when parent: after cancel is called before parent: after cancel. In the previous article we discussed the continuation of coroutines, where all of our logic code is contained in invokeSuspend, the call to cancel and the printing of parent: After cancel is at the same level, the Job state of the coroutine is successfully modified after the cancel, but the execution of the coroutine is not terminated. How can the execution of the coroutine be terminated successfully? We can use the yeild() function (including delay, which supports cancellation). We can check the cancellation of the coroutine by calling yeild() immediately after the cancel call. How to check the cancellation of the Yeild function is not discussed here. In this scenario, the Yeild function checks state, which is an object of type Finish and canceled. CancellationException is automatically raised to terminate the execution of the code.

conclusion

This article focuses on the implementation mechanism of the Job (task) part of the Kotlin coroutine implementation, explaining the lifecycle and cancellation events that are important in the implementation effect of the Job interface, and how the Job interface achieves structured concurrency will be covered in more detail in the next article.