The concept of coroutines has been around since 1958, much earlier than threading, and many languages now have native support. Java does not have native coroutines but large companies can support coroutine programming themselves or using third-party libraries, but Kotlin supports coroutines natively.

I think the core of a coroutine is one word: scope. What is scope to understand a coroutine

What is a coroutine

Coroutines are collaborative tasks, threads are preemptive tasks, and both are inherently concurrent

Kotlin coroutines are thread libraries not coroutines? Thread pools for internal code?

  1. The best-known coroutine language, Go, also maintains threads internally. Is it a coroutine?
  2. Coroutines are just for the convenience of developers to deal with asynchrony, threads can improve performance efficiency, and the two are not a replacement relationship in itself. It does not say that if you use one, you don’t need the other
  3. Coroutines are a concept, regardless of how they are implemented
  4. Coroutines in the Kotlin standard library do not contain thread pool code, and only the extension library handles thread pools internally

Coroutine design source

  1. Kotlin’s coroutine perfectly replicates Google Go’s coroutine design pattern (Scope/channel/select), the scope is instantiated as an object; And better control of the scope lifecycle;
  2. awaitPatterns (Asynchronous task solution for JavaScript)
  3. Kotlin referenced the RxJava responsive frameworkFlow
  4. With coroutines, you don’t have to worry about threads to start with, you just have to use different schedulers for different scenarios that are optimized for specific tasks

features

Usage scenarios

Assuming there are seven interface network requests on the home page (poorly handled by the back-end staff), using serial network requests one by one is nearly seven times slower than concurrent network requests

Today, computers are powered by multi-core cpus, so mastering concurrent programming is the future

Coroutines advantage

  1. Convenient concurrent implementation
  2. No callback nesting occurs and the code structure is clear
  3. The performance cost of creating coroutines is better than that of creating threads. A thread can run multiple coroutines, and a single thread can be asynchronous

The experimental features

Coroutine was released in Kotlin1.3, but there are still unstable functions (which do not affect project development), identified by annotations

@ FlowPreview representative may exist Api function changes after the @ ExperimentalCoroutinesApi represents the function may be unstable factors @ ObsoleteCoroutinesApi may be abandonedCopy the code

Constitute a

Kotlin’s coroutine consists of three main components

  1. CoroutineScopeCoroutine scope: Each coroutine body has a scope that determines whether it is asynchronous or synchronous
  2. ChannelChannel: Data is sent and received as a channel, passing data between coroutines or controlling blocking and continuation
  3. FlowResponse flow: Structurally similar to RxJava

For automated/concurrent network requests I created a library that I’ll just call Android’s strongest network request library: Net

Version 1.0+ is the RxJava implementation, and version 2.0+ is the Coroutine implementation, which also includes stronger round-robin functionality in place of RxJava

Because I needed to ditch RxJava in favor of RxBus event distribution, I used coroutines to create a more powerful: Channel

The project of our company belongs to MVVM + Kotlin + Coroutine + JetPack, which is commonly used in foreign countries.

  1. Compact, 70% less code
  2. Bidirectional data binding
  3. Concurrent asynchronous task (network) doubling speed
  4. More robust data saving and recovery

I usually project development essential framework

The framework describe
Net Android is not the most powerful network request/asynchronous task library
BRV Android is not the strongest list
Serialize Create autosave and restore fields
StateLayout Android is not the strongest default page
LogCat JSON and long text log printing tools
Tooltip Perfect toast tool
DebugKit Develop debugging window tools
StatusBar One line of code creates the transparent status bar
Channel Event distribution framework based on coroutine and JetPack features

Looking forward to

Coroutines have great advantages for high concurrency on the back end. As for Google Jetpack, there are basically coroutine extensions, the most obvious one is concurrent network request speed doubling; At the same time, the code is more clearly structured, and this article will be updated according to coroutine iterations in Kotlin’s version

Rely on

Here we use the coroutine extension library. The coroutines of the Kotlin standard library are too crude to be used by developers

implementation "Org. Jetbrains. Kotlinx: kotlinx coroutines -- core: 1.3.9"
implementation "Org. Jetbrains. Kotlinx: kotlinx coroutines - android: 1.3.9"
Copy the code

Create coroutines

There are three ways to open the main coroutine

Life cycle is the same as App, cannot cancel (no Job), no thread blocking

fun main(a) {
    GlobalScope.launch { // Start a new coroutine in the background and continue
        delay(1000L)
        println("World!")
    }
    Thread.sleep(2000) // Prevent the JVM from exiting
}
Copy the code

GlobalScope does not have a Job, but all launches do. GlobalScope itself is a scope, launch belongs to its sub-scope;

There is no thread blocking, which can be cancelled, and the coroutine life cycle can be controlled through CoroutineContext

fun main(a) {
    CoroutineScope(Dispatchers.IO).launch {
    }
    Thread.sleep(1000)}Copy the code

Thread blocking, suitable for unit testing, does not require delayed blocking to prevent JVM vm exit. RunBlocking is a global function that can be called anywhere

Normally we wouldn’t use runBlocking in a project, because blocking the main thread doesn’t make any sense to open it

fun main(a) = runBlocking { 
    // Block the thread until all coroutines within the scope have executed
}
Copy the code

Creating a scope

Other coroutine scopes can also be created inside coroutines using functions. There are two types of creation functions:

  1. CoroutineScopeOnly within a scope can other scopes be created
  2. suspendInside the modified function
  3. A coroutine will always wait until all coroutines in its inner scope have executed before it closes

Subcoroutine scopes can also be created within the main coroutine. There are two types of creation functions

  1. Blocking scope (serial): blocks the current scope

  2. Suspended scope (concurrent): Does not block the current scope

Synchronize scoped functions

Both belong to suspend functions

  • withContextYou can switch the scheduler and return the result
  • coroutineScopeCreates a coroutine scope that blocks the current scope and waits for its subcoroutine to finish executing before resuming
  • supervisorScopeThe SupervisorJob coroutineScope SupervisorJob does not cancel the parent coroutine
public suspend fun <T> withContext(
    context: CoroutineContext,
    block: suspend CoroutineScope. () - >T
): T
// Return the result; Can interact with the parent coroutine of the current coroutine, mainly for switching the scheduler back and forth

public suspend inline operator fun <T> CoroutineDispatcher.invoke(
    noinline block: suspend CoroutineScope. () - >T
): T = withContext(this, block)
// withContext is just a utility function

public suspend fun <R> coroutineScope(block: suspend CoroutineScope. () - >R): R

public suspend fun <R>  supervisorScope(block: suspend CoroutineScope. () - >R): R

public suspend inline fun <T> suspendCoroutine(crossinline block: (Continuation<T- > >)Unit): T
Copy the code

Asynchronous scoped functions

Both functions are not part of Suspend and can be called simply by CoroutineScope

  • launch: Asynchronous and concurrent, no result is returned
  • async: Asynchronous and concurrent, with a result returned
public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope. () - >Unit
): Job

public fun <T> CoroutineScope.async(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope. () - >T
): Deferred<T>
Copy the code

concurrent

Asynchronous tasks in the same coroutine scope are executed in order. Suitable for serial network requests where an asynchronous task requires the results of the previous asynchronous task.

Coroutines take time to suspend, so asynchronous coroutines will always execute slower than synchronous code

fun main(a) = runBlocking<Unit> {
    launch {
        System.err.println("(main.kt :34)")
    }

    System.err.println((main.kt :37))}Copy the code

Concurrent tasks can be created when the async function is used in the coroutine scope

public fun <T> CoroutineScope.async(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope. () - >T
): Deferred<T>
Copy the code

The sample

fun main(a) = runBlocking<Unit> {
	val name = async { getName() }
    val title = async { getTitle() }

    System.err.println("(Main.kt:35)    result = ${name.await() + title.await()}")
    delay(2000)}Copy the code
  1. Returns the objectDeferred; Through the functionawaitGet the result value
  2. The Deferred collection can also be usedawaitAll()Wait for it all to be done
  3. Does not performawaitTasks also wait for the coroutine to close
  4. If Deferred does not execute await functions, exceptions thrown inside async will not be caught by logCat or tryCatch, but will still cause scope cancellation and exception crash; However, the exception message is rethrown when an await is executed

The inertia of concurrent

Set start in the async function to coroutinestart. LAZY and the async task will start (or start) only if await is called on the Deferred object.

Boot mode

  1. DEFAULTExecuted immediately
  2. LAZYThe Job is not executed until start or join is executed
  3. ATOMICCannot cancel before the scope begins execution
  4. UNDISPATCHEDNo scheduler is executed, it executes directly in the current thread, but switches according to the scheduler of the first suspended function

abnormal

If an exception occurs in a coroutine, the parent coroutine is cancelled and all other subcoroutines of the parent are cancelled

Deferred

Inherited from the Job

Provides a global function to create a CompletableDeferred object that implements custom Deferred functionality

public suspend fun await(a): T 
/ / the result
public val onAwait: SelectClause1<T>
// Used in select

public fun getCompleted(a): T
// Return the result if [isCompleted], otherwise raise an exception
public fun getCompletionExceptionOrNull(a): Throwable?
// Return the result if [isCompleted], otherwise raise an exception
Copy the code

The sample

fun main(a) = runBlocking<Unit> {
    val deferred = CompletableDeferred<Int>()
    
    launch {
        delay(1000 )
        deferred.complete(23)
    }

    System.err.println("= (Demo. Kt: 72) results${deferred.await()}")}Copy the code

Create a top-level function for the CompletableDeferred

public fun <T> CompletableDeferred(parent: Job? = null): CompletableDeferred<T>
public fun <T> CompletableDeferred(value: T): CompletableDeferred<T>
Copy the code

CompletableDeferred function

public fun complete(value: T): Boolean
/ / the result

public fun completeExceptionally(exception: Throwable): Boolean
// Throw an exception that occurs at 'await()'

public fun <T> CompletableDeferred<T>.completeWith(result: Result<T>): Boolean
// You can use the tag to determine success and avoid throwing exceptions
Copy the code

CoroutineScope

Creating this object means creating a coroutine scope

Structured concurrency

If you look at the coroutine tutorials you’ll probably see this word a lot, this is opening a new coroutine inside the scope; The parent coroutine limits the life cycle of the subcoroutine, and the subcoroutine takes on the context of the parent. This hierarchy is called structured concurrency

Turns on multiple subcoroutines in a coroutine scope for concurrent behavior

CoroutineContext

Coroutine Context, I think of a coroutine Context as a Context that contains the basic information about the coroutine, which determines the name of the coroutine or the run of the coroutine

Create a new scheduler

fun newSingleThreadContext(name: String): ExecutorCoroutineDispatcher
fun newFixedThreadPoolContext(nThreads: Int, name: String): ExecutorCoroutineDispatcher
Copy the code

Creating a new scheduler is more resource-intensive, and it is recommended to reuse it and use the close function to release it when it is not needed

The scheduler

Dispatchers inherits from CoroutineContext, which has three implementations; Indicates different thread scheduling; A scheduler that accepts the current scope when the function is not using a scheduler

  1. Dispatchers.UnconfinedWithout specifying a thread, if the subcoroutine switches the thread then the following code runs on that thread
  2. Dispatchers.IOApplicable to IO read and write
  3. Dispatchers.MainDepending on the platform, the main thread is on Android
  4. Dispatchers.DefaultThe default scheduler, which executes the body of a coroutine in a thread pool, is suitable for computing operations

Executed immediately

Dispatchers.Main.immediate
Copy the code

Immediate belongs to a property that all schedulers have. This property means that if you are currently executing in the scheduler without performing a scheduler switch, you can be understood to be in the synchronous coroutine scope within the same scheduler

For example, the launch function opens the scope in a lower order than subsequent code execution, but the coroutine using this property is sequential execution

The sample

CoroutineScope(Job() + Dispatchers.Main.immediate).launch {
	// Execute sequence 1
}

// Execute sequence 2

CoroutineScope(Job() + Dispatchers.Main).launch {
		// Execute sequence 4
}

// Execute sequence 3
Copy the code

Coroutines named

You specify the CoroutineName in the constructor by creating a CoroutineName object, which inherits from CoroutineContext.

Launch (CoroutineName){}Copy the code

Coroutine context names are used for debugging purposes

Coroutines hung

Yield allows the current coroutine to suspend execution of other coroutine bodies, or continue execution of the current coroutine body if no other coroutine bodies are concurrent (equivalent to an invalid call)

public suspend fun yield(): Unit
Copy the code

When you look at coroutines, you may often refer to a hang. A hang can be thought of as a pause in this section of code (scope) and then execution of subsequent code; Suspend functions generally represent functions that are modified by the suspend keyword. The suspend requirement allows calls only within the suspend keyword, but the keyword itself does nothing. Just to restrict developers from calling at will

The pending function call displays an arrow icon in the left row number column

JOB

In coroutines, a Job is often called a Job and represents a coroutine work task, which also inherits from CoroutineContext

val job = launch {

}
Copy the code

Job is an interface

interface Job : CoroutineContext.Element
Copy the code

function

public suspend fun join(a)
// Block the current thread until the coroutine finishes executing
public val onJoin: SelectClause0
// used in the selectors mentioned below

public fun cancel(cause: CancellationException? = null)
// Take the elimination coroutine
public suspend fun Job.cancelAndJoin(a)
// Block and cancel the coroutine after it has finished

public fun start(a): Boolean
public val children: Sequence<Job>
// All subjobs

public fun getCancellationException(a): CancellationException

public fun invokeOnCompletion(
  onCancelling: Boolean = false, 
  invokeImmediately: Boolean = true, 
  handler: CompletionHandler): DisposableHandle
// p1: True means cancel does not call back to handler
P2: If true, execute [handler] and return to DisposableHandle; if false, return to DisposableHandle.

public fun invokeOnCompletion(handler: CompletionHandler): DisposableHandle
// The main coroutine designation is valid only when the scope is complete, not when specified directly to the CoroutineScope
CancellationException raised manually will also be assigned to cause
Copy the code

state

You can obtain the current status of the JOB through the field

public val isActive: Boolean
public val isCancelled: Boolean
public val isCompleted: Boolean
Copy the code

Extension function

public fun Job.cancelChildren(cause: CancellationException? = null)

public suspend fun Job.cancelAndJoin(a)
Copy the code

CoroutineContext exists in every coroutine scope. Job objects exist in every coroutineContext

coroutineContext[Job]
Copy the code

End coroutines

The coroutine cannot be cancelled if there is a computation task in the scope (including logging), and can be cancelled if the delay function is used.

fun main(a) = runBlocking<Unit> {

  val job = launch(Dispatchers.Default) {
    while (true){
      delay(100) // If this line of code exists, the coroutine can be successfully cancelled. If this line of code does not exist, it cannot be cancelled
      System.err.println("(Main.kt:30) ")
    }
  }
  
  delay(500)
  job.cancel() 
  System.err.println("(the Main kt: 42) the end")}Copy the code

The coroutine internal isActive attribute is used to determine whether it should end

fun main(a) = runBlocking<Unit> {

    val job = launch(Dispatchers.Default) {
        while (isActive) { // False once the coroutine is cancelled
            System.err.println("(Main.kt:30) ")
        }
    }

    delay(500)
    job.cancel()
    System.err.println("(the Main kt: 42) the end")}Copy the code

Release resources

Coroutines can be manually cancelled, but some resources need to be released when the coroutine is cancelled. This operation can be performed in finally

Will finally be implemented anyway

fun main(a) = runBlocking<Unit> {

    val startTime = System.currentTimeMillis()
    val job = launch(Dispatchers.Default) {
        try {
            repeat(1000){
                System.err.println("(Main.kt:31)    it = $it")
                delay(500)}}finally {
           // A cancelled coroutine cannot continue to hang
        }
    }
    delay(1500)
    job.cancel()
    System.err.println("(Main.kt:42) ")}Copy the code

Open the coroutine again

WithContext and NonCancellable allow you to continue suspending coroutines that have been canceled; This usage can be seen as creating a task that cannot be undone

withContext(NonCancellable) {
    println("job: I'm running finally")
    delay(1000L)
    println("job: And I've just delayed for 1 sec because I'm non-cancellable")}Copy the code

Context combination

Coroutine scope can receive multiple Coroutinecontexts as context parameters; CoroutineContext itself is an interface, and many context-dependent classes implement it

Configuring multiple CoroutineContext can specify multiple coroutine contexts at the same time through the + symbol. Each implementation object may contain part of the information that can have overridden behavior. Therefore, the order of addition has overridden behavior

val a = CoroutineScope(SupervisorJob() + coroutineContext).launch(handler) {
  delay(1000)
  System.err.println("(Main.kt:51)    ${Thread.currentThread()}")}Copy the code
launch(Dispatchers.IO + CoroutineName("Daniel Wu")){	}
Copy the code

Coroutine local variables

Using ThreadLocal allows you to get local variables for the thread, but requires using the extension function asContextElement to convert the coroutine context to be passed in as an argument when the coroutine is created

This local variable acts in the scope of the coroutine that holds the context of the coroutine

public fun <T> ThreadLocal<T>.asContextElement(value: T = get(a)): ThreadContextElement<T> =
    ThreadLocalElement(value, this)
Copy the code

timeout

public suspend fun <T> withTimeout(timeMillis: Long, block: suspend CoroutineScope. () - >T): T
// timeMillis automatically terminates the coroutine after the specified time;
// If there is no timeout, the return value is retrieved and execution of the coroutine continues;
/ / when the timeout throws an exception TimeoutCancellationException, but does not lead to the end

public suspend fun <T> withTimeoutOrNull(timeMillis: Long, block: suspend CoroutineScope. () - >T): T?
// Return null if the timeout does not end the coroutine
Copy the code

Can’t throw TimeoutCancellationException manually, because its constructor private

Global coroutine scope

Global coroutine scope belongs to singleton object, the entire JVM has only one instance object; Its lifetime also follows that of the JVM. Be careful to avoid memory leaks when using the global coroutine scope

public object GlobalScope : CoroutineScope {
    /** * Returns [EmptyCoroutineContext]; * /
    override val coroutineContext: CoroutineContext
        get() = EmptyCoroutineContext
}
Copy the code

A global coroutine scope does not inherit the context of its parent coroutine scope, so it is not cancelled because the parent coroutine is cancelled

Boot mode

  • DEFAULTExecute coroutine body immediately
  • ATOMICExecutes the coroutine body immediately, but cannot fetch the elimination coroutine before execution begins
  • UNDISPATCHEDThe body of the coroutine is executed immediately in the current thread. The first suspended function is executed on the thread of the function, and the next is executed on the thread specified by the function
  • LAZYPerformed manuallystartorjoinCoroutines are executed

Coroutines cancel

If the coroutine body is actually non-cancellable, the coroutine body uses isActive to determine whether the coroutine isActive

You can customize the exception object by specifying the CancellationException as an argument to the cancel function

Non-cancelable coroutine scope

NonCancellable This singleton is used by the withContext function to create a coroutine scope that cannot be cancelled

withContext(NonCancellable) {
  delay(2000)}Copy the code

The sample

fun main(a) = runBlocking {
  launch {
    delay(1000)
    System.err.println("(Main.kt:19) ")
  }

  launch {
    withContext(NonCancellable) {
      delay(2000)
      System.err.println("(Main.kt:26) ")
    }
  }

  delay(500) // Prevent the launch from being canceled before the withContext is started
  cancel()
}
Copy the code
  1. When a subscope contains an unterminated task, it will wait for the task to complete before canceling (delayIs not present,Thread.sleepYou can simulate unfinished tasks)
  2. throwCancellationExceptionAs an end exception,invokeOnCompletionAlso executes (which contains the exception object), but the other exceptions will not execute invokeOnCompletion

Cancel GlobalScope

A GlobalScope is a global coroutine. Any coroutine opened by a GlobalScope does not have a Job. However, you can use Job to cancel the coroutine by specifying the scope of the coroutine opened by GlobalScope

Coroutines abnormal

By CoroutineExceptionHandler function can create an object of the same name, this interface inherits from CoroutineContext, by setting the context parameter passed to the same global coroutines scope of use, when scope throws an exception has been the object of the callback function to receive, No exceptions will be thrown

  1. CoroutineExceptionHandler only as the outermost parent coroutines literary talent and effective, because the exception will be up layer upon layer, unless with ban on abnormal SupervisorJob supervision work, the child exception handler to catch exceptions to the scope

  2. CoroutineExceptionHandler exception handler does not stop coroutines scope to cancel, just listen to coroutines exception information to avoid the JVM exit throw an exception

  3. The handler is supposed to be the parent coroutine and all its children. It is supposed to be a two-way exception cancellation mechanism. The handler is supposed to be the parent coroutine and all its children coroutines.

  4. CoroutineExceptionHandler possessed by a scope as coroutines context is passed to the child down scope (unless a scope specified separately)

Do not try to catch exceptions in the launch scope using try/catch. They cannot be caught.

try {
  launch {
    throw NullPointerException()
  }
} catch (e: Exception) {
  e.printStackTrace()
}
Copy the code

This section is devoted to catching coroutine exceptions to avoid throwing them.

Coroutine cancels the exception

A Job that removes a coroutine throws an exception, which is ignored by the default exception handler, but we can see the exception by catching it

fun main(a) = runBlocking<Unit> {
  val job = GlobalScope.launch {

    try {
      delay(1000)}catch (e: Exception) {
      e.printStackTrace()
    }
  }

  job.cancel(CancellationException("Custom an exception for fetching a cancellation coroutine."))
  delay(2000)}Copy the code

Job cancel function

public fun cancel(cause: CancellationException? = null)
Copy the code
  • Cause: The parameter is not transmitted. The default value is JobCancellationException

Exception handling in global coroutine scope

val exceptionHandler = CoroutineExceptionHandler { coroutineContext, throwable ->
       System.err.println("(Main.kt:41):main   coroutineContext = $coroutineContext, throwable = $throwable")
}

GlobalScope.launch(exceptionHandler) { 

}
Copy the code

The subcoroutine set exception handler is invalid, and even if an error is set, it is still thrown to the parent coroutine without meaning. This is when the subcoroutine’s errors are not thrown up in the SupervisorJob, and are handled by its internal exception handler.

Exception aggregation and unpacking

The global coroutine scope also has nested child – parent relationships, so exceptions may throw multiple exceptions in turn

fun main(a) = runBlocking {
    val handler = CoroutineExceptionHandler { _, exception ->
				// Third, the exception here is the first exception object to be thrown
        println("Caught exceptions:$exceptionAnd nested exceptions:${exception.suppressed.contentToString()}")}val job = GlobalScope.launch(handler) {
        launch {
            try {
                delay(Long.MAX_VALUE)
            } finally { // When the parent coroutine is cancelled, all its children are cancelled, finally before cancellation or after completion of the task must be executed
                throw ArithmeticException() // Second, the exception is thrown again, and the exception is aggregated
            }
        }
        launch {
            delay(100)
            throw IOException() // First, throwing an exception causes the parent coroutine to be cancelled
        }
        delay(Long.MAX_VALUE)
    }
    job.join() // Avoid JVM virtual machine exit before GlobalScope has finished executing
}
Copy the code

Monitor homework

In general, if the subcoroutine is abnormal, the parent coroutine will be cancelled, and the parent coroutine will cancel all subcoroutines. It is designed to run on the SupervisorJob, and it is designed to run on the SupervisorJob. It is designed to run on the SupervisorJob, and it is designed to run on the SupervisorJob

Create a supervisor job object

fun main(a) = runBlocking<Unit> { CoroutineScope(coroutineContext).launch { launch(SupervisorJob(coroutineContext[Job]) + CoroutineExceptionHandler { _, _ ->}) {throw NullPointerException()
        }

        delay(500)
        println("( Process.kt:13 ) ")
    }

    println("( Process.kt:16 ) finish")}Copy the code
  1. Must add exception handling CoroutineExceptionHandler, or abnormal will cancel the father transferred upwards coroutines

  2. The SupervisorJob() object is passed into the scope when it is created. It is expected that the scope and the parent coroutine life cycle will be different when the parent coroutine is cancelled. Therefore, it is necessary to specify coroutineContext[Job] to pass the Job object into the parent coroutine

  3. The SupervisorJob is designed to catch exceptions in the scope of the internal coroutine, and cannot catch the internal coroutine directly

    supervisorScope {
        // Throw NoSuchFieldException() throws a crash
        
        launch {
             throw NoSuchFieldException() // Not thrown}}Copy the code

Monitor job added in withContext and Async is invalid

Create an exception directly to pass down the scope of the supervisory job

public suspend fun <R>  supervisorScope(block: suspend CoroutineScope. () - >R): R
Copy the code
  • This function is blocking
  • Have a return value
  • supervisorScopeThe function still uses the current scoped Job, so it can be cancelled with the current scoped life cycle
fun main(a) = runBlocking<Unit> {
    CoroutineScope(coroutineContext).launch {
        
      / / as long as within the scope set CoroutineExceptionHandler only passed down
        supervisorScope {
            launch(CoroutineExceptionHandler { _, _ ->  }) {
                throw NullPointerException()
            }

            launch {
                delay(1000) // This execution will continue even if the above launch throws an exception
                println("( Process.kt:18 ) ")
            }
        }
    }

    println("( Process.kt:16 ) finish")}Copy the code

Catch exceptions

Exception catching in scope is different from normal exception catching

  • CoroutineExceptionHandlerAll subscope exceptions can be caught
  • asyncAn exception that occurs internally can be caught using a supervisory job, but it isawaitRequest trycatch
  • launchThe supervisory work and the exception processor are required to be used at the same time
  • withContext/supervisorScope/coroutineScope/selectYou can trycatch exceptions

The original coroutines

function The callback field describe
suspendCoroutine Continuation Result
suspendCancellableCoroutine CancellableContinuation Can be cancelled
suspendAtomicCancellableCoroutine CancellableContinuation Can be cancelled

[Continuation]

public val context: CoroutineContext
public fun resumeWith(result: Result<T>)
Copy the code

[CancellableContinuation] -| Continuation

public val isActive: Boolean
public val isCompleted: Boolean
public val isCancelled: Boolean

public fun resume(value: T, onCancellation: (cause: Throwable) - >Unit)
public fun tryResume(value: T, idempotent: Any? = null): Any?
public fun tryResumeWithException(exception: Throwable): Any?
public fun completeResume(token: Any)

public fun cancel(cause: Throwable? = null): Boolean

public fun invokeOnCancellation(handler: CompletionHandler)
public fun CoroutineDispatcher.resumeUndispatched(value: T)
public fun CoroutineDispatcher.resumeUndispatchedWithException(exception: Throwable)
Copy the code

Thread unsafe

Resolve the thread insecurity problem

  1. The mutex
  2. Switching threads to achieve single threads
  3. Channel

The mutex

Equivalent to the Java Lock alternative: Mutex

Create a mutex

public fun Mutex(locked: Boolean = false): Mutex
// p: sets the initial state, whether to lock immediately
Copy the code

Lock and unlock automatically using extension functions

public suspend inline fun <T> Mutex.withLock(owner: Any? = null, action: () -> T): T
/ / owner: the key
Copy the code

function

public suspend inline fun <T> Mutex.withLock(owner: Any? = null, action: () -> T): T
public fun holdsLock(owner: Any): Boolean
// Whether the owner is used for locking
public fun tryLock(owner: Any? = null): Boolean
// Use owner to lock, return false if owner is already locked
Copy the code

Channel

  1. Multiple scopes can send and receive data through a single Channel object
  2. Channel is designed with reference to GochanDesign that can be used to control the blocking and continuation of the scope (by pairingselect)

A Channel belongs to an interface and cannot be created directly. We need to create its implementation class using the function Channel()

The source code

public fun <E> Channel(capacity: Int = RENDEZVOUS): Channel<E> =
  when (capacity) {
    RENDEZVOUS -> RendezvousChannel() / / no cache
    UNLIMITED -> LinkedListChannel() / / unlimited
    CONFLATED -> ConflatedChannel()  / / merge
    BUFFERED -> ArrayChannel(CHANNEL_DEFAULT_CAPACITY) / / 64
    else -> ArrayChannel(capacity) // Specify the cache size
  }
Copy the code
  • capacity

    Buffer size, 0 by default, when a Channel sends a piece of data, it suspends the Channel (does not continue to send subsequent code), and only when the data is received, it unsuspends the Channel and continues to execute; But we can set the cache sizeCopy the code

Channels are allowed to be traversed for the currently sent data

val channel = Channel<Int> ()for (c in channel){

}
Copy the code
public suspend fun yield(a): Unit
Copy the code

Channel

The Channel interface implements both SendChannel and ReceiveChannel interfaces, so it can send and receive data

public interface Channel<E> : SendChannel<E>, ReceiveChannel<E> 
Copy the code

SendChannel

public val isClosedForSend: Boolean
// Whether to close
public fun close(cause: Throwable? = null): Boolean
// Close the sending channel

public fun offer(element: E): Boolean
// The send function is recommended
public suspend fun send(element: E)
// Send the message

public fun invokeOnClose(handler: (cause: Throwable?). ->Unit)
// Perform a callback when the channel is closed

public val onSend: SelectClause2<E, SendChannel<E>>
// Send data immediately (if allowed), used in select
Copy the code
  • ReceiveChannel cannot be used to receive data after the sending channel is closedClosedReceiveChannelExceptionthrow

ReceiveChannel

public val isClosedForReceive: Boolean
// Whether SendChannel has closed the channel. If there is a cache after closing the channel, return false after receiving the cache

public val isEmpty: Boolean // Whether the channel is empty

public fun poll(a): E? // The receive function is recommended
public suspend fun receive(a): E
// Accept the channel event

public val onReceive: SelectClause1<E> // Throw an exception if the channel is closed
public valonReceiveOrNull: SelectClause1<E? >// Discard the function that returns null if the channel is closed
public val onReceiveOrClosed: SelectClause1<ValueOrClosed<E>>
// The listener used in select. The third function is recommended

public suspend fun receiveOrClosed(a): ValueOrClosed<E>
// The 'ValueOrClosed' object determines whether the channel is closed

public fun cancel(cause: CancellationException? = null)
// Close the channel
Copy the code
  1. The sending and receiving of a channel will cause the scope to block, but the sending message can be cached so that it does not block, or the unblocking of the channel can continue
  2. Channels are only allowed to be sent and received in suspend functions, but channel creation is not restricted
  3. Closing a channel may causereceiveAn exception is thrown
  4. SendChannel performcloseFunction is not allowed to send or receive data, otherwise an exception is thrown
  5. The Channel of thesend | receiveThe scope of the function is canceledcancelDoes not end the channel (isClosedForReceive returns false)
  6. Receiving rather than traversing causes the scope to get stuck

consume

ReceiveChannel can receive events not only through iterators, but also using the CONSUME series of functions; There’s essentially no difference between consume and iteration except that consume automatically cancles the channel when an exception occurs (via the cancel function);

The source code

public inline fun <E, R> ReceiveChannel<E>.consume(block: ReceiveChannel<E>. () - >R): R {
    var cause: Throwable? = null
    try {
        return block() // Return directly
    } catch (e: Throwable) {
        cause = e
        throw e
    } finally {
        cancelConsumed(cause) // Cancel the channel if an exception occurs}}Copy the code

The consumeEach function simply iterates to receive events and exceptions are automatically cancelled; It is generally recommended to use the CONSUME function to receive events

BroadcastChannel

This channel is different from normal channels in that each data can be received by each scope; The default channel is that no other coroutine can receive data after it has been received

The broadcast channel creates objects through global functions

public fun <E> BroadcastChannel(capacity: Int): BroadcastChannel<E>
Copy the code

The broadcast channel itself is inherited from SendChannel and can only send data. The receive channel can be obtained through functions

public fun openSubscription(a): ReceiveChannel<E>
Copy the code

Cancel the channel

public fun cancel(cause: CancellationException? = null)
Copy the code

Convert a Channel to a BroadcastChannel

fun <E> ReceiveChannel<E>.broadcast(
    capacity: Int = 1,
    start: CoroutineStart = CoroutineStart.LAZY
): BroadcastChannel<E>
Copy the code

Quickly create a broadcast transmit channel in coroutine scope with an extension function

public fun <E> CoroutineScope.broadcast(
    context: CoroutineContext = EmptyCoroutineContext,
    capacity: Int = 1,
    start: CoroutineStart = CoroutineStart.LAZY,
    onCompletion: CompletionHandler? = null.@BuilderInference block: suspend ProducerScope<E>. () - >Unit
): BroadcastChannel<E> 
Copy the code

Iterative channel

Receive channels can use iteration to implement operator overloading

public operator fun iterator(): ChannelIterator<E>
Copy the code

The sample

for (i in produce){
	// Receive a message for each hairstyle
}
Copy the code

When multiple coroutines receive data from the same channel, they will receive data in turn. The channel is fair to multiple coroutines

Produce

The previous example is to create a Channel object to send and receive data, but you can also quickly create and return a ReceiveChannel object with sending data using the extension function

public fun <E> CoroutineScope.produce(
    context: CoroutineContext = EmptyCoroutineContext,
    capacity: Int = 0.@BuilderInference block: suspend ProducerScope<E>. () - >Unit
): ReceiveChannel<E>
Copy the code
  • Context: Information such as scheduler can be determined by coroutine context
  • Capacity: initializes the channel space

ProducerScope this interface inherits from SendChannel and CoroutineScope, sending channel data and CoroutineScope.

When the produce scope is complete, the channel is closed, as mentioned earlier, and the channel is closed to continue receiving data

Waiting for the cancellation

This function calls back when the channel is cancelled. As mentioned earlier, we can finally free memory when the coroutine is cancelled, but we cannot finally use the channel cancellation function

public suspend fun ProducerScope< * >.awaitClose(block: () -> Unit = {}) 
// [sendchannel. close] or [Receivechannel. cancel] cancels the channel
Copy the code

Actor

Actor functions can be used to create a coroutine scope with channel action

public fun <E> CoroutineScope.actor(
    context: CoroutineContext = EmptyCoroutineContext,
    capacity: Int = 0.// todo: Maybe Channel.DEFAULT here?
    start: CoroutineStart = CoroutineStart.DEFAULT,
    onCompletion: CompletionHandler? = null,
    block: suspend ActorScope<E>. () - >Unit
): SendChannel<E>
Copy the code
  • Context: Coroutine context
  • Capacity: channel cache space
  • Start: coroutine start mode
  • OnCompletion: completion callback
  • Block: The callback function can send data

This function is similar to the produce function,

  1. Produce returnsReceiveChannel, external data receiving; The actor returnedSendChannelData is sent externally
  2. The actor callback has propertieschannel:Channel, can send and receive data. The produce attribute channel belongs to SendChannel
  3. Whether it isproduceoractorTheir channels are called channels. They can send and receive data. They only need to be typed strongly.
  4. The Channel itself enables two-way data communication, but the design produce and Actor belong to the producer-consumer pattern in design thinking
  5. They all belong to the combination of coroutine scope and data channel

Round robin device

Round robin RxJava or coroutines support function, in my library network request also gives the round robin is suspended | to | | reset function such as multiple observer

The coroutine wheel robin here is relatively simple

public fun ticker(
    delayMillis: Long,
    initialDelayMillis: Long = delayMillis,
    context: CoroutineContext = EmptyCoroutineContext,
    mode: TickerMode = TickerMode.FIXED_PERIOD
): ReceiveChannel<Unit>
Copy the code

The data returned by this channel is Unit

By default, it can be understood that the channel will continue to send Unit data after a specified interval

fun main(a) = runBlocking<Unit> {
  
    val tickerChannel = ticker(delayMillis = 1000, initialDelayMillis = 0)

  // Print per second
    for (unit in tickerChannel) {
        System.err.println("unit = $unit")}}Copy the code

However, if the downstream does not receive the data immediately after sending the data, it delays using the receive function to receive the channel data

TickerMode This enumeration has two fields

  • FIXED_PERIODThe default value is used to dynamically adjust the data sending interval of the channel. The interval can be regarded as the upstream data sending interval
  • FIXED_DELAYThe interval is calculated only after the data is received. The interval can be regarded as the downstream receiving data

Round robin this device does not support multiple subscription | | to | | suspended replacement completed, but my Net library Interval object implemented all functions

Select

Listen for results from multiple Deferred/ channels in the SELECT function callback, and only the Channel or result callback that receives the fastest data is performed.

action

In the previous function introduction, you can see a series of on{action} variables whose values are all SelectClause{number} interface objects;

[SelectBuilder]

public interface SelectBuilder<in R> {
    public operator fun SelectClause0.invoke(block: suspend() - >R)
    public operator fun <Q> SelectClause1<Q>.invoke(block: suspend (Q) - >R)
    public operator fun <P, Q> SelectClause2<P, Q>.invoke(param: P, block: suspend (Q) - >R)
    public operator fun <P, Q> SelectClause2
       

?>
.invoke(block: suspend (Q) - >R)
= invoke(null, block) @ExperimentalCoroutinesApi public fun onTimeout(timeMillis: Long, block: suspend() - >R) } Copy the code

The extension function defined by this allows the action to be used directly

object Functions used
SelectClause0 onJoin
SelectClause1 OnReceive
SelectClause2 onSend

The sample

@ObsoleteCoroutinesApi
@UseExperimental(InternalCoroutinesApi::class)
suspend fun selectMulti(a: Channel<Int>, b: Channel<Int>): String = select<String> {

    b.onReceive {
        "b $it" // Execute first first, not for function reasons, but in order
    }

    b.onReceiveOrClosed {
        "b $it"
    }
    
    a.onSend(23) {
        "Send 23"}}fun main(a) = runBlocking<Unit> {
    val a = Channel<Int> (1) // The number of buffers to avoid blocking when sending data
    val b = Channel<Int> (1)
    
    launch {
        b.send(24)
        val s = selectMulti(a, b)
        println(Result ="$s")}}Copy the code
  • onReceiveClosing the channel causes an exception to be thrown, which should be used if you do not want to throw an exceptiononReceiveOrClosedTo replace
  • onSendThis function is equivalent to PIChannel.send, is to send a value, assuming that registering multiple onSend must be the first callback to return the result
  • Even if a member has been selected (select) does not cause other member coroutines to end their scope

[ValueOrClosed]

public val isClosed: Boolean // Whether the channel is closed

public val value: T
public val valueOrNull: T?
// Both get the values in the channel, but the second does not throw an exception but returns NULL if the channel is closed
Copy the code
  1. When a channel in select has both send and receive listeners, an exception is thrown if both are executed (i.e. the select is executed without interruption)
  2. If the channel listens repeatedly (multiple actions), the first one is executed first
  3. Closing the channel also receives data. OnReceive throws an exception, and onReceiveOrClose is null

Flow

Flow is also divided into three parts like RxJava:

  1. The upstream
  2. The operator
  3. The downstream

Downstream receiving events requires execution in coroutine scope (suspend function)

Create a Flow

public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>. () - >Unit): Flow<T>
Copy the code

The sample

fun shoot() = flow { for (i in 1.; 3) {delay(1000) // pretend we did something useful here emit(I) // send the next value}}Copy the code
  • Collections or sequences can be turned into Flow objects using the asFlow function

  • You can also create Flow objects directly using fowOf as you would a collection

  • Channel Converts the Channel to Flow

    public fun <T> ReceiveChannel<T>.consumeAsFlow(): Flow<T>                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                   
    Copy the code
  • Even suspended functions can be converted to Flow

    public fun <T> (suspend () -> T).asFlow(): Flow<T>
    Copy the code

The callbacks for collect and flow are themselves part of the Suspend function, which can enable coroutine scope

Function to create Flow

function describe
flow Normal Flow
channelFlow Create channels that support buffer channels that allow different CorotineContext to send events
callbackFlow With the channelFlow function except not usedawaitCloseThere is no difference other than an error
emptyFlow Empty Flow
flowOf Direct data transmission

The emit function of flow is not thread safe and is not allowed to be called by other threads. If you want to be thread safe, use channelFlow instead of flow

ChannelFlow uses the send function to send data

Sample emission data

flow<Int> {
  emit(23)
}

channelFlow<Int> {
  send(23) // offer(23)
}
Copy the code
  1. Offer can be used in a non-suspend function, send must be used in a suspend function
  2. Offer has a return value, false if there is no element space, and send is suspended and blocked waiting for a new element space.

Flow Resources can be released when unscoped using callbackFlow. This demonstrates registering and canceling a broadcast AppWidgetProvider

callbackFlow<Int> {
  val appWidgetProvider = AppWidgetProvider()
  registerReceiver(appWidgetProvider, IntentFilter()) / / register
  awaitClose {  // This callback is called when the coroutine scope is cancelled
    unregisterReceiver(appWidgetProvider) / / logout
  }
}.collect { 

}
Copy the code

collect

To collect data

Flow is cold data. Data can be transmitted only when the function collect is called. These functions also become the end operators;

flow {
  emit(23)
}.collect {
	System.err.println("(Demo.kt:9)    it = $it")}Copy the code

A look at the source code shows that the emit is actually a parameter function that executes collect

The collect function receives data from the upstream

public suspend fun Flow< * >.collect(a) 
// A collector that does no processing, only to trigger the emission of data

public suspend inline fun <T> Flow<T>.collect(crossinline action: suspend (value: T) - >Unit): Unit
/ / collection

public suspend fun <T> Flow<T>.collectLatest(action: suspend (value: T) - >Unit)
// The difference between the previous function and the previous function is that if the upstream continues with the next launch before the downstream has finished processing, the last downstream launch will be cancelled

public suspend inline fun <T> Flow<T>.collectIndexed(
      crossinline action: suspend (index: Int.value: T) - >Unit
): Unit
// A collector with indexes and values

public fun <T> Flow<T>.launchIn(scope: CoroutineScope): Job
// Run Flow in the specified coroutine scope
Copy the code

[FlowCollector] transmitters

public suspend fun emit(value: T)
// Send a data

public suspend inline fun <T> FlowCollector<T>.emitAll(flow: Flow<T>)
// Emit another flow object
Copy the code

The scheduler

The scheduler

By default, Flow uses the current thread or coroutine context in which it is located. Flow is not allowed to use withContext internally to switch schedulers and should instead use the flowOn function

public fun <T> Flow<T>.flowOn(context: CoroutineContext): Flow<T>
Copy the code

This function changes the thread within the Flow function when it is emitted, while collecting data automatically cuts back to the thread that created the Flow

The cache

Instead of waiting for the collection to be executed, the data is cached temporarily, improving performance

By default the scheduler is automatically cached when switched

public fun <T> Flow<T>.buffer(capacity: Int = BUFFERED): Flow<T>
Copy the code

The merge function, which is actually a buffer, is discarded when upstream data cannot be processed in time

public fun <T> Flow<T>.conflate(a): Flow<T> = buffer(CONFLATED)
Copy the code

merge

Multiple events are combined and sent downstream

zip

Processing the two flows in the callback function returns a new value, R

When two flows differ in length, only the shortest length event is sent

public fun <T1, T2, R> Flow<T1>.zip(other: Flow<T2>, transform: suspend (T1.T2) - >R): Flow<R>
Copy the code

The sample

val nums = (1.;3).asFlow().onEach { delay(300)}// The number 1. 3, 300 milliseconds apart
val strs = flowOf("one"."two"."three").onEach { delay(400)}// The string is fired every 400 milliseconds
val startTime = System.currentTimeMillis() // Record the start time
nums.zip(strs) { a, b -> "$a -> $b" } // Combine a single string with "zip"
    .collect { value -> // Collect and print
            println("$value at ${System.currentTimeMillis() - startTime} ms from start")}Copy the code

combine

public fun <T1, T2, R> Flow<T1>.combine(
    flow: Flow<T2>, transform: suspend (a: T1.b: T2) - >R
): Flow<R>
// Combine the two streams. After the first launch, the two streams can be launched when new data comes from either side, and the other side may be data that has already been launched

public fun <T1, T2, R> Flow<T1>.combineTransform(
    flow: Flow<T2>,
    @BuilderInference transform: suspend FlowCollector<R>. (a: T1.b: T2) - >Unit
): Flow<R>
Copy the code

A collection of

Flow directly converts to set function

public suspend fun <T> Flow<T>.toList(destination: MutableList<T> = ArrayList()): List<T>
public suspend fun <T> Flow<T>.toSet(destination: MutableSet<T> = LinkedHashSet()): Set<T>
public suspend fun <T, C : MutableCollection<in T>> Flow<T>.toCollection(destination: C): C
Copy the code

The superposition

public suspend fun <S, T : S> Flow<T>.reduce(operation: suspend (accumulator: S.value: T) - >S): S

public suspend inline fun <T, R> Flow<T>.fold(
    initial: R.crossinline operation: suspend (acc: R.value: T) - >R
): R
// 'acc' is the value returned by the last callback function. The difference between this function and Reduce is that it supports initial values; Reduce accumulates two elements before the callback function

public fun <T> Flow<T>.onEach(action: suspend (T) - >Unit): Flow<T>
public fun <T, R> Flow<T>.scan(initial: R.@BuilderInference operation: suspend (accumulator: R.value: T) - >R): Flow<R>
public fun <T, R> Flow<T>.scan(initial: R.@BuilderInference operation: suspend (accumulator: R.value: T) - >R): Flow<R>
Copy the code

conversion

public inline fun <T, R> Flow<T>.map(crossinline transform: suspend (value: T) - >R): Flow<R>
public inline fun <T, R: Any> Flow<T>.mapNotNull(crossinline transform: suspend (value: T) - >R?).: Flow<R>

public fun <T, R> Flow<T>.flatMapMerge(
    concurrency: Int = DEFAULT_CONCURRENCY,
    transform: suspend (value: T) - >Flow<R>): Flow<R> = map(transform).flattenMerge(concurrency)
// Upstream sends all elements first, and then each element upstream causes the Flow in the callback to send all elements once

public fun <T, R> Flow<T>.flatMapConcat(transform: suspend (value: T) - >Flow<R>): Flow<R>
// Equivalent to RxJava FlatMap

public fun <T> Flow<Flow<T>>.flattenConcat(a): Flow<T>
// Collect data sequentially

public fun <T> Flow<Flow<T>>.flattenMerge(
  concurrency: Int = DEFAULT_CONCURRENCY
): Flow<T>
// Collect data concurrently

public inline fun <T, R> Flow<T>.flatMapLatest(
  @BuilderInference crossinline transform: suspend (value: T) - >Flow<R>): Flow<R> 
// Cancel the previous collect after each emit new data

public fun <T> Flow<T>.withIndex(a): Flow<IndexedValue<T>>
// Include the element index
Copy the code

The life cycle

public fun <T> Flow<T>.onStart(
    action: suspend FlowCollector<T>. () - >Unit
): Flow<T>
/ /

public fun <T> Flow<T>.onCompletion(
    action: suspend FlowCollector<T>. (cause: Throwable?). ->Unit
): Flow<T>
// If 'cause' is null, no exception will be thrown; otherwise, an exception will be thrown.
// As with the catch function, it can only listen to the exception that occurs upstream, but it can't avoid the exception. It can only execute the callback function before the exception is thrown

public fun <T> Flow<T>.catch(action: suspend FlowCollector<T>. (cause: Throwable) - >Unit): Flow<T>
// This function can only catch upstream exceptions. If the exception is below the function call, it will still be thrown
Copy the code

filter

Restricted stream transmission

public fun <T> Flow<T>.take(count: Int): Flow<T>
// Only a specified number of events are accepted
public fun <T> Flow<T>.takeWhile(predicate: suspend (T) - >Boolean): Flow<T>

public fun <T> Flow<T>.drop(count: Int): Flow<T>
// Discard the specified number of events
public fun <T> Flow<T>.dropWhile(predicate: suspend (T) - >Boolean): Flow<T>
// The callback function determines whether to discard or receive. After discarding or receiving, the event will not continue to be sent.

public inline fun <T> Flow<T>.filter(crossinline predicate: suspend (T) - >Boolean): Flow<T>
public inline fun <T> Flow<T>.filterNot(crossinline predicate: suspend (T) - >Boolean): Flow<T>
public inline fun <reified R> Flow< * >.filterIsInstance(a): Flow<R> = filter { it is R } as Flow<R>
public fun <T: Any> Flow
       ?>.filterNotNull(a): Flow<T>

public suspend fun <T> Flow<T>.single(a): T
// Expect only one element, otherwise 'IllegalStateException' is thrown
public suspend fun <T: Any> Flow<T>.singleOrNull(a): T?
// No exception is thrown, but null is returned if there are not only elements

public suspend fun <T> Flow<T>.first(a): T
// NoSuchElementException is thrown if an element does not exist
public suspend fun <T> Flow<T>.first(predicate: suspend (T) - >Boolean): T
// Return the first element that the callback function determines to be true
Copy the code

retry

public fun <T> Flow<T>.retry(
    retries: Long = Long.MAX_VALUE, // Retry times
    predicate: suspend (cause: Throwable) - >Boolean = { true}): Flow<T>

public fun <T> Flow<T>.retryWhen(
  predicate: suspend FlowCollector<T>. (cause: Throwable.attempt: Long) - >Boolean
): Flow<T>
Copy the code

filter

public inline fun <T, R> Flow<T>.transform(
    @BuilderInference crossinline transform: suspend FlowCollector<R>. (value: T) - >Unit
): Flow<R>
// The conversion function can send new elements in the callback function

public fun <T, R> Flow<T>.transformLatest(                                      
  @BuilderInference transform: suspend FlowCollector<R>. (value: T) - >Unit
): Flow<R>
Copy the code

The difference between SCAN and Reduce is

  • reduceIs collected after all stacking calculations have been completed
  • scanThe data is collected after each stack

StateFlow/SharedFlow

Class relationships

SharedFlow

|- MutableSharedFlow

|- StateFlow

​ |- MutableStateFlow

SharedFlow is heat flow data, which is sent even when it is not collected, and then replayed when it is collected. You can use shareIn to transfer cold to heat. It can also be created directly using the following functions

public fun <T> MutableSharedFlow(
    replay: Int = 0.// Number of replays
    extraBufferCapacity: Int = 0.// Number of caches (not including number of replays)
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
): MutableSharedFlow<T>
Copy the code

Using BufferOverflow

  1. DROP_LATEST Discarding latest value
  2. DROP_OLDEST loses the oldest value
  3. SUSPEND SUSPEND blocks

StateFlow can be seen as adding LiveData features to Flow. But there is no life cycle to follow, and data can be collected all the time

class LatestNewsViewModel(
    private val newsRepository: NewsRepository
) : ViewModel() {

    // Backing property to avoid state updates from other classes
    private val _uiState = MutableStateFlow(LatestNewsUiState.Success(emptyList()))
    // The UI collects from this StateFlow to get its state updates
    val uiState: StateFlow<LatestNewsUiState> = _uiState

    init {
        viewModelScope.launch {
            newsRepository.favoriteLatestNews
                // Update View with the latest favorite news
                // Writes to the value property of MutableStateFlow,
                // adding a new element to the flow and updating all
                // of its collectors
                .collect { favoriteNews ->
                    _uiState.value = LatestNewsUiState.Success(favoriteNews)
                }
        }
    }
}
Copy the code

The sample

Change flow from cold to heat using the function shareIn

public fun <T> Flow<T>.shareIn(
    scope: CoroutineScope,
    started: SharingStarted,
    replay: Int = 0
)
Copy the code

SharingStarted:

  1. Whilesubscribe begins to share data after the first subscriber appears and keeps the data stream active forever
  2. When Lazily has subscribers, it keeps upstream providers active
  3. 15. Promptly start the provider

Android

Many components in Google’s Jetpack release come with KTX extension dependencies, mainly to add kotlin and coroutine support

Lifecycle

Official quick creation implementation of life cycle coroutine scope;

  • Specify lifecycle run coroutines
  • Automatically inonDestoryIn disappear assist cheng

Introduce the KTX dependency library

implementation "Androidx. Lifecycle: lifecycle - runtime - KTX: 2.2.0 - rc03"
Copy the code

Run coroutines when execution reaches a certain lifecycle

fun launchWhenCreated(block: suspend CoroutineScope. () - >Unit): Job

fun launchWhenStarted(block: suspend CoroutineScope. () - >Unit): Job

fun launchWhenResumed(block: suspend CoroutineScope. () - >Unit): Job

suspend fun <T> Lifecycle.whenStateAtLeast(
    minState: Lifecycle.State,
    block: suspend CoroutineScope. () - >T
)
Copy the code

These functions are all extensions of Lifecycle and LifecycleOwner

LiveData

Rely on

implementation "Androidx. Lifecycle: lifecycle - livedata - KTX: 2.2.0 - rc03"
Copy the code

These are the only two functions that developers can use. The two functions have the same function, but each parameter is received in different units of time

fun <T> liveData(
    context: CoroutineContext = EmptyCoroutineContext,
    timeoutInMs: Long = DEFAULT_TIMEOUT,
    @BuilderInference block: suspend LiveDataScope<T>. () - >Unit
): LiveData<T> = CoroutineLiveData(context, timeoutInMs, block)

fun <T> liveData(
    context: CoroutineContext = EmptyCoroutineContext,
    timeout: Duration.@BuilderInference block: suspend LiveDataScope<T>. () - >Unit
): LiveData<T> = CoroutineLiveData(context, timeout.toMillis(), block)
Copy the code
  • timeout: Unscoped liveData for the specified time (in milliseconds) if it has no active observers [block]
  • block: This scope is triggered only when it is active. The default isDispatchers.Main.immediateThe scheduler

The liveData scope can transmit data and liveData

interface LiveDataScope<T> {
    /** * Set's the [LiveData]'s value to the given [value]; If you've called [emitSource] previously, * calling [emit] will remove that source. * * Note that this function suspends until the value is set on the [LiveData];  * *@param value The new value for the [LiveData]
     *
     * @see emitSource
     */
    suspend fun emit(value: T)

    /**
     * Add the given [LiveData] as a source, similar to [MediatorLiveData.addSource]; Calling this
     * method will remove any source that was yielded before via [emitSource];
     *
     * @param source The [LiveData] instance whose values will be dispatched from the current
     * [LiveData];
     *
     * @see emit
     * @see MediatorLiveData.addSource
     * @see MediatorLiveData.removeSource
     */
    suspend fun emitSource(source: LiveData<T>): DisposableHandle

    /** * References the current value of the [LiveData]; * * If the block never `emit`ed a value, [latestValue] will be `null`; You can use this * value to check what was then latest value `emit`ed by your `block` before it got cancelled. * * Note that if the block called [emitSource], then `latestValue` will be last value * dispatched by the `source` [LiveData]; * /
    val latestValue: T?
}
Copy the code
  1. Invalid if emitSource executes before emit
  2. This scope is executed each time it is active, and is returned if the application is switched from background to foreground, but the observer receives data only when it is active