Public number: byte array, keen to share Android system source code parsing, Jetpack source code parsing, popular open source library source code parsing and other essential knowledge interview

Recently, I have been learning about kotlin coroutines, and the best learning materials are naturally the official learning documents. After reading them, I have the idea of translating the official documents. Before and after spent close to a month time, a total of nine articles, here also share out, hope to help readers. Limited by personal knowledge, some translation is not too smooth, also hope that readers can put forward their opinions

Coroutines official documentation: Coroutines – Guide

Coroutines-cn-guide coroutines-cn-guide

Coroutine official document Chinese translator: leavesC

Deferred values provide a convenient way to pass single values between coroutines, while Channels provide a way to transmit streams of values

1. Channel Basics

A channel is very similar in concept to BlockingQueue, with one key difference: instead of a blocking PUT and a blocking take, a channel has a pending send and a pending receive

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

fun main(a) = runBlocking {
//sampleStart
    val channel = Channel<Int>()
    launch {
        // this might be heavy CPU-consuming computation or async logic, we'll just send five squares
        for (x in 1.. 5) channel.send(x * x)
    }
    // here we print five received integers:
    repeat(5) { println(channel.receive()) }
    println("Done!")
//sampleEnd
}
Copy the code

The output is:

1
4
9
16
25
Done!
Copy the code

Closing and iteration over Channels

Unlike a queue, a channel can be closed to indicate that an element has been sent. On the receiver side, it is convenient to use the regular for loop to receive elements from the channel

Conceptually, close is similar to sending a special Cloase tag to a channel. Once the close tag is received, the iteration stops, so all elements sent before the close are guaranteed to be received:

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

fun main(a) = runBlocking {
//sampleStart
    val channel = Channel<Int>()
    launch {
        for (x in 1.. 5) channel.send(x * x)
        channel.close() // we're done sending
    }
    // here we print received values using `for` loop (until the channel is closed)
    for (y in channel) println(y)
    println("Done!")
//sampleEnd
}
Copy the code

3. Building channel Producers

Patterns in which coroutines generate sequences of elements are very common. This is part of the producer-consumer pattern that is often found in concurrent programming. You could abstract such a producer as a function that takes a channel as an argument, but this goes against the common sense that you must return a result from the function

There is a handy coroutine constructor called Product that makes it easy to perform this operation on the producer side; There is also an extension function consumerEach, which replaces the for loop on the consumer side:

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

fun CoroutineScope.produceSquares(a): ReceiveChannel<Int> = produce {
    for (x in 1.. 5) send(x * x)
}

fun main(a) = runBlocking {
//sampleStart
    val squares = produceSquares()
    squares.consumeEach { println(it) }
    println("Done!")
//sampleEnd
}
Copy the code

4. Pipelines

A pipe is a pattern, a stream of values that a coroutine is generating that may be infinitely many elements

fun CoroutineScope.produceNumbers(a) = produce<Int> {
    var x = 1
    while (true) send(x++) // infinite stream of integers starting from 1
}
Copy the code

There are one or more coroutines that value the stream of values, do some processing, and produce some other result. In the following example, each return value is also the square of the input parameter value (number)

fun CoroutineScope.square(numbers: ReceiveChannel<Int>): ReceiveChannel<Int> = produce {
    for (x in numbers) send(x * x)
}
Copy the code

Start and connect the entire pipeline:

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

fun main(a) = runBlocking {
//sampleStart
    val numbers = produceNumbers() // produces integers from 1 and on
    val squares = square(numbers) // squares integers
    repeat(5) {
        println(squares.receive()) // print first five
    }
    println("Done!") // we are done
    coroutineContext.cancelChildren() // cancel children coroutines
//sampleEnd
}

fun CoroutineScope.produceNumbers(a) = produce<Int> {
    var x = 1
    while (true) send(x++) // infinite stream of integers starting from 1
}

fun CoroutineScope.square(numbers: ReceiveChannel<Int>): ReceiveChannel<Int> = produce {
    for (x in numbers) send(x * x)
}
Copy the code

All functions that create coroutines are defined as extensions of CoroutineScope, so we can rely on structured concurrency to ensure that there are no global coroutines that are delayed in the application

Prime Numbers with Pipeline

Let’s take the pipe to the extreme with an example of using coroutine pipes to generate prime numbers. We start with an infinite sequence of numbers

fun CoroutineScope.numbersFrom(start: Int) = produce<Int> {
    var x = start
    while (true) send(x++) // infinite stream of integers from start
}
Copy the code

The following pipes filter the incoming stream of numbers, removing all numbers that are divisible by a given prime:

fun CoroutineScope.filter(numbers: ReceiveChannel<Int>, prime: Int) = produce<Int> {
    for (x in numbers) if(x % prime ! =0) send(x)
}
Copy the code

Now, we get a prime number from the current channel by starting a stream of numbers at 2 and start a new channel for each prime number found:

numbersFrom(2) -> filter(2) -> filter(3) -> filter(5) -> filter(7)...Copy the code

The following example code prints the first ten primes and runs the entire pipe in the context of the main thread. Because all coroutines are started within the scope of the main runBlocking coroutine, we do not have to keep explicit references to all started coroutines. We use the cancelChildren extension function to cancel all subcoroutines after the first ten primes

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

fun main(a) = runBlocking {
//sampleStart
    var cur = numbersFrom(2)
    repeat(10) {
        val prime = cur.receive()
        println(prime)
        cur = filter(cur, prime)
    }
    coroutineContext.cancelChildren() // cancel all children to let main finish
//sampleEnd    
}

fun CoroutineScope.numbersFrom(start: Int) = produce<Int> {
    var x = start
    while (true) send(x++) // infinite stream of integers from start
}

fun CoroutineScope.filter(numbers: ReceiveChannel<Int>, prime: Int) = produce<Int> {
    for (x in numbers) if(x % prime ! =0) send(x)
}
Copy the code

Running result:

2
3
5
7
11
13
17
19
23
29
Copy the code

Note that you can use the iterator coroutine constructor from the standard library to build the same pipe. Replace product with iterator, send with yield, receive with next, ReceiveChannel with iterator, and remove the coroutine scope. And you don’t have to use runBlocking anymore. However, the advantage of a pipeline using the channel shown above is that it can actually utilize multiple cpus to execute the code if run in the context of dispatchers.default

But in any case, the alternative described above is also a very impractical way to find primes. In fact, pipes do involve some other pending calls (such as asynchronous calls to remote services), and these pipes cannot be built using sequence/iterator because they do not allow arbitrary suspending, whereas Products are completely asynchronous

6. Fan-out

Multiple coroutines can receive data from the same channel and assign tasks among them. Let’s start with a producer coroutine that periodically generates integers (10 per second) :

fun CoroutineScope.produceNumbers(a) = produce<Int> {
    var x = 1 // start from 1
    while (true) {
        send(x++) // produce next
        delay(100) / / wait 0.1 s}}Copy the code

Then we can have multiple processor coroutines. In this case, they just print their ID and the number they received:

fun CoroutineScope.launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch {
    for (msg in channel) {
        println("Processor #$id received $msg")}}Copy the code

Now let’s boot up five processors and let them work for almost a second. Here’s what happens:

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

fun main(a) = runBlocking<Unit> {
//sampleStart
    val producer = produceNumbers()
    repeat(5) { launchProcessor(it, producer) }
    delay(950)
    producer.cancel() // cancel producer coroutine and thus kill them all
//sampleEnd
}

fun CoroutineScope.produceNumbers(a) = produce<Int> {
    var x = 1 // start from 1
    while (true) {
        send(x++) // produce next
        delay(100) / / wait 0.1 s}}fun CoroutineScope.launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch {
    for (msg in channel) {
        println("Processor #$id received $msg")}}Copy the code

Although the processor ID that receives each particular integer may be different, the result of the run will be output similar to the following:

Processor #2 received 1
Processor #4 received 2
Processor #0 received 3
Processor #1 received 4
Processor #3 received 5
Processor #2 received 6
Processor #4 received 7
Processor #0 received 8
Processor #1 received 9
Processor #3 received 10
Copy the code

Note that canceling the Producer coroutine closes its channel and ultimately terminates the iteration on the channel that the Processor coroutine is executing

Also, notice how we use a for loop to explicitly iterate over the channel to execute fan-out in the launchProcessor code. Unlike consumeEach, this for loop pattern is completely safe to use across multiple coroutines. If one of the processor coroutines fails, the other processors still process the channel, and the processor that writes through consumeEach always consumes (cancels) the underlying channel on normal or abnormal completion

7. Fan-in

Multiple coroutines can be sent to the same channel. For example, there is a string channel and a suspend function that repeatedly sends the specified string to the channel with the specified delay:

suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) {
    while (true) {
        delay(time)
        channel.send(s)
    }
}
Copy the code

Now, let’s see what happens if we start two coroutines to send strings (in this case, we start them as subcoroutines of the main coroutine, in the context of the main thread) :

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

fun main(a) = runBlocking {
//sampleStart
    val channel = Channel<String>()
    launch { sendString(channel, "foo".200L) }
    launch { sendString(channel, "BAR!".500L) }
    repeat(6) { // receive first six
        println(channel.receive())
    }
    coroutineContext.cancelChildren() // cancel all children to let main finish
//sampleEnd
}

suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) {
    while (true) {
        delay(time)
        channel.send(s)
    }
}
Copy the code

Running result:

foo
foo
BAR!
foo
foo
BAR!
Copy the code

Buffered channels

None of the channels shown so far have buffers. Unbuffered channels transport elements when the send and receive operations are invoked by both the sender and the receiver. If send is called first, it hangs until Receive is called; If receive is called first, it hangs until Send is called

Both the Channel() factory function and the produce builder take the optional parameter capacity to specify the buffer size. Buffering is used to allow a sender to send multiple elements before suspending, similar to BlockingQueue with a specified capacity, which blocks only when the buffer is full

See the following code in effect:

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

fun main(a) = runBlocking<Unit> {
//sampleStart
    val channel = Channel<Int> (4) // create buffered channel
    val sender = launch { // launch sender coroutine
        repeat(10) {
            println("Sending $it") // print before sending each element
            channel.send(it) // will suspend when buffer is full}}// don't receive anything... just wait....
    delay(1000)
    sender.cancel() // cancel sender coroutine
//sampleEnd    
}
Copy the code

A buffer channel of capacity 4 is used, so it will be printed five times:

Sending 0
Sending 1
Sending 2
Sending 3
Sending 4
Copy the code

The first four elements are added to the buffer, and the sender hangs when trying to send the fifth element

The Channels are fair.

The send and receive operations on channels are fair to the order in which they are called from multiple coroutines. They are provided in a first-in, first-out order, for example, the coroutine that calls Receive first gets the element first. In the following example, two coroutines “ping” and “Pong” receive the “ball” object from the shared “table” channel

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

//sampleStart
data class Ball(var hits: Int)

fun main(a) = runBlocking {
    val table = Channel<Ball>() // a shared table
    launch { player("ping", table) }
    launch { player("pong", table) }
    table.send(Ball(0)) // serve the ball
    delay(1000) // delay 1 second
    coroutineContext.cancelChildren() // game over, cancel them
}

suspend fun player(name: String, table: Channel<Ball>) {
    for (ball in table) { // receive the ball in a loop
        ball.hits++
        println("$name $ball")
        delay(300) // wait a bit
        table.send(ball) // send the ball back}}//sampleEnd
Copy the code

The “ping” coroutine runs first, so it is the first to receive the ball. Even if the “ping” coroutine receives the ball immediately after sending it back to the table, the ball will still be received by “Pong” because it was already waiting for it:

ping Ball(hits=1)
pong Ball(hits=2)
ping Ball(hits=3)
pong Ball(hits=4)
Copy the code

Note that sometimes, due to the nature of the executor used, a channel can have an execution effect that appears unfair. See this issue for more information

Ticker Channels

The timer channel is a special rendezvous channel that returns the Unit value at the end of each given delay time since the last consumption of the channel. Although it may seem useless, it is a useful building block for creating complex time-based produce pipes and for window-building and other time-based processing. The timer channel can be used for SELECT to perform the “on tick” operation

To create such a channel, use the factory method ticker. If you don’t need the channel to send any more elements, use Receivechannel. cancel on it

Now let’s see how it works in practice:

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

fun main(a) = runBlocking<Unit> {
    val tickerChannel = ticker(delayMillis = 100, initialDelayMillis = 0) // create ticker channel
    var nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
    println("Initial element is available immediately: $nextElement") // initial delay hasn't passed yet

    nextElement = withTimeoutOrNull(50) { tickerChannel.receive() } // all subsequent elements has 100ms delay
    println("Next element is not ready in 50 ms: $nextElement")

    nextElement = withTimeoutOrNull(60) { tickerChannel.receive() }
    println("Next element is ready in 100 ms: $nextElement")

    // Emulate large consumption delays
    println("Consumer pauses for 150ms")
    delay(150)
    // Next element is available immediately
    nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
    println("Next element is available immediately after large consumer delay: $nextElement")
    // Note that the pause between `receive` calls is taken into account and next element arrives faster
    nextElement = withTimeoutOrNull(60) { tickerChannel.receive() } 
    println("Next element is ready in 50ms after consumer pause in 150ms: $nextElement")

    tickerChannel.cancel() // indicate that no more elements are needed
}
Copy the code

Running result:

Initial element is available immediately: kotlin.Unit
Next element is not ready in 50 ms: null
Next element is ready in 100 ms: kotlin.Unit
Consumer pauses for 150ms
Next element is available immediately after large consumer delay: kotlin.Unit
Next element is ready in 50ms after consumer pause in 150ms: kotlin.Unit
Copy the code

Note that the Ticker is aware that the consumer may be paused, and by default, if a pause occurs, will delay the generation of the next element, trying to maintain a fixed rate of generated elements

Optionally, the mode argument to the ticker function can be specified as tickermode.fixed_delay to ensure a fixed delay between elements