Happy New Year, everybody. Last year, I wrote a quick introduction to Kotlin coroutines, which briefly introduced some basic concepts of coroutines. Today, I will introduce some other important points.

Channel

It’s very convenient to start another coroutine inside a coroutine, but what if you want to send messages between them, or communicate between coroutines? A Channel can be used simply to send and receive data between coroutines:

fun main(a) = runBlocking {
        val channel = Channel<String>()
        launch {
                channel.send("apple")
        }
        println("I like ${channel.receive()}")}Copy the code

This approach is very much like the consumer and producer model. The producer side generates and sends a certain amount of data into the buffer, while the consumer consumes the data in the buffer. This is also best illustrated by the interface definition it inherits:

public interface Channel<E> : SendChannel<E>, ReceiveChannel<E> {
  public companion object Factory {
       
        public const val UNLIMITED = Int.MAX_VALUE

        public const val RENDEZVOUS = 0

        public const val CONFLATED = - 1}}Copy the code

Channel Buffer a channel is an interface with four different implementations, depending on the size of the buffer.

public fun <E> Channel(capacity: Int = RENDEZVOUS): Channel<E> =
    when (capacity) {
        RENDEZVOUS -> RendezvousChannel()
        UNLIMITED -> LinkedListChannel()
        CONFLATED -> ConflatedChannel()
        else -> ArrayChannel(capacity)
    }
Copy the code

The default buffer for a Channel is 0. When a message is sent, the coroutine is suspended. If the capacity is greater than 0, it will also be suspended when the maximum capacity is reached:

 fun main(a) = runBlocking {
        val channel = Channel<Int> (2)
        launch {
            for (x in 1.. 5) {
                channel.send(x * x)
                println("send $x")
            }
        }
        delay(200L)
        repeat(2) { println("receive ${channel.receive()}")}Copy the code

The result is as follows, after sending two data, the message will continue only after receiving one data:

The 2019-01-01 19:01:11. 176, 30809-30809 / com. Renny. Kotlin I/System. Out: Send 1 2019-01-01 19:01:11.176 30809-30809/com.renny.kotlin I/ system. out: Send 2 2019-01-01 19:01:11.377 30809-30809/com.renny.kotlin I/ system. out: Receive 1 2019-01-01 19:01:11.377 30809-30809/com.renny.kotlin I/ system.out: Receive 4 2019-01-01 19:01:11.377 30809-30809/com.renny.kotlin I/ system.out: Send 3 2019-01-01 19:01:11.377 30809-30809/com.renny.kotlin I/ system. out: send 4Copy the code

This is the basic usage of Channel. If you are familiar with Concurrent programming in Java, you can easily think of blocking queues. The blocking queue of a Channel implementation is not really blocking, but rather the coroutine is suspended, and it can be closed.

The Channel,

Channel inherits SendChannel and ReceiveChannel. It does not implement logic itself, so let’s look at some important methods for both interfaces:

public fun offer(element: E): Boolean
Copy the code

This is also the way to send a message, but unlike Send, it has a return value and does not suspend when the Channel buffer is full but returns false.

public fun close(cause: Throwable? = null): Boolean
Copy the code

Closing a channel and then calling send or Offer raises an exception. You can use isClosedForSend on the sender to determine whether the channel is closed. There is also isClosedForReceive, but it does not return “true” until all previously sent elements have been received.

 public fun poll(): E?
Copy the code

Instead of suspending the coroutine as with receive, no message from the buffer is returned empty.

public fun cancel(): Unit
Copy the code

Unaccepts the message and removes all elements of the buffer, so isClosedForReceive also immediately returns “true”.

 public operator fun iterator(): ChannelIterator<E>
Copy the code

It is also possible to use a for loop to receive a buffer message by returning an iterator (Channel is not a collection) :

fun main() = runBlocking {
        val channel = Channel<Int>()
        launch {
            for (x in1.. 5) channel.send(x * x) channel.close() }for (y in channel) println(y)
        println("Done!")}Copy the code

The Channel into the order

Merging of events

If channel. CONFLATED = -1, the buffer capacity is greater than or equal to 0. Let’s modify the above demo:

 fun main() = runBlocking {
        val channel = Channel<Int>(Channel.CONFLATED)
        launch {
            for (x in1.. 5) { channel.send(x * x) println("send $x")
            }
        }
        delay(200L)
        repeat(2) { println("receive ${channel.receive()}")}}Copy the code

The output is as follows:

The 2019-01-01 20:10:29. 922, 1314-1314 / com. Renny. Kotlin I/System. Out: Send 1 2019-01-01 20:10:29.922 1314-1314/com.renny.kotlin I/ system. out: Send 2 2019-01-01 20:10:29.927 1314-1314/com.renny.kotlin I/ system. out: Send 3 2019-01-01 20:10:29.927 1314-1314/com.renny.kotlin I/ system. out: Send 4 2019-01-01 20:10:29.928 1314-1314/com.renny.kotlin I/ system. out: Send 5 2019-01-01 20:10:30.117 1314-1314/com.renny.kotlin I/ system. out: receive 25Copy the code

The SEND method is not suspended, but we only received one message. In fact, when channel. CONFLATED, the capacity of the buffer is also 1, but when the capacity is already filled with messages and new messages arrive, it replaces the current message with the new message. So according to this feature, the receiver can always receive the latest messages. What exactly does it do? For example, click a button to trigger an animation, click events during the animation play will be merged into one, when the animation is over, it will start the latest click of the animation, between the clicks are skipped.

extension

Produce and consumeEach are extensions provided by the authorities. We should write the example without manually opening the coroutine of the sending party:

fun main() = runBlocking {
        val squares = produce {
            for (x in1.. 5) send(x * x) } squares.consumeEach { println(it) } println("Done!")}Copy the code

async/await

Async, async, await, these two methods are introduced by coroutines in order to better solve asynchronous tasks, people familiar with JS, C# and other languages must be familiar with these two methods, and the usage is similar.

 fun main(a) = runBlocking{
        var time = measureTimeMillis {
            val one = doSomethingUsefulOne()
            val two = doSomethingUsefulTwo()
            println("The answer is ${one + two}")
        }
        println("Sync completed in $time ms")

         time = measureTimeMillis {
            val one = async { doSomethingUsefulOne() }
            val two = async { doSomethingUsefulTwo() }
            println("The answer is ${one.await() + two.await()}")
        }
        println("Async completed in $time ms")
    }

    suspend fun doSomethingUsefulOne(a): Int {
        delay(1000L)
        return 13
    }

    suspend fun doSomethingUsefulTwo(a): Int {
        delay(1000L)
        return 29
    }
Copy the code

The result was the same, but it took half as long. We enable asynchrony just like calling a synchronous task, which is, I have to say, much more elegant than the Java native implementation

The 2019-01-01 20:52:15. 482, 3520-3520 / com. Renny. Kotlin I/System. Out: The answer is 42 2019-01-01 20:52:15.483 3520-3520/com.renny.kotlin I/ system. out: Sync completedin2006 MS 2019-01-01 20:52:16.489 3520-3520/com.renny.kotlin I/System.out: The answer is 42 2019-01-01 20:52:16.489 3520-3520/com.renny.kotlin I/ system. out: Async completedin 1006 ms
Copy the code

Async is an extension method that starts a child coroutine.

public fun <T> CoroutineScope.async(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> T
): Deferred<T> {
    val newContext = newCoroutineContext(context)
    val coroutine = if (start.isLazy)
        LazyDeferredCoroutine(newContext, block) else
        DeferredCoroutine<T>(newContext, active = true)
    coroutine.start(start, coroutine, block)
    return coroutine
}
Copy the code

Instead of returning a Job, we return a Deferred, which is a runnable versus a callable, but pretty much everything else. Await suspends the current coroutine until the subcoroutine finishes and gets the result returned, similar to join.

summary

That’s all for today. As the title suggests, the purpose of these articles is to make it easier and faster for you to get a first look at coroutines. Like RxJava, coroutines are also a library of functions Kotlin introduced to better handle asynchronous tasks. How to use them in network requests, databases, file IO, etc., to make the code more concise and elegant is the ultimate goal.