The Kotlin coroutine Channel produces consumers

A Channel is actually an application of a coroutine to the productionconsumption model. Replace the functionality you used to implement with BloockingQuqe with a Channel, and you might find something new

1. Meet the Channel


A Channel is actually a queue, and it is concurrency safe, and it can be used to connect coroutines, to achieve communication between different coroutines. Without further ado, let’s look at an example:

suspend fun main(a){
    val channel = Channel<Int> ()val producer = GlobalScope.launch{
        var i = 0
        while (true){
            channel.send(i++)
            delay(1000)}}val consumer = GlobalScope.launch{
        while(true) {val element = channel.receive()
            Logger.debug(element)
        }
    }
    
    producer.join()
    consumer.join()
}
Copy the code

We built two coroutines, called producer and Consumer, and we didn’t specify a scheduler, so they were the default, familiar thread pool on the Java virtual machine: they could run on different threads, or they could run on the same thread.

In the example, the producer sends a number to a Channel every 1s, while the consumer reads the Channel to get the number and prints it. We can see that the sender is slower than the receiver. When there is no value to read, Receive is suspended until a new element send comes along — so you know that receive is a suspended function. What about send?

2. Capacity of the Channel


If you go to your IDE and write this code yourself, you’ll find that send is also a suspend function. Well, why does the starter hang? Think of the BlockingQueue we used to know. When we add an element to it, it actually takes up space in the queue. If the queue is short of space, there are two ways to add an element to it: 1. Block, waiting for queue to clear space; 2. Throw an exception and refuse to add elements. Send faces the same problem. If a Channel is actually a queue, shouldn’t the queue have a buffer? If the buffer is full and no one has called Receive to fetch the element, send will soon be suspended. So let’s look at the cache definition for Channel:

    public fun <E> Channel(capacity: Int = RENDEZVOUS): Channel<E> =
        when (capacity) {
            RENDEZVOUS -> RendezvousChannel()
            UNLIMITED -> LinkedListChannel()
            CONFLATED -> ConflatedChannel()
            else -> ArrayChannel(capacity)
        }
Copy the code

When we construct a Channel we call u a hash called Channel, HMM, which is really not its constructor, but in Kotlin we can just define a top-level function that’s the same as some class name and pretend to be a constructor, which is essentially a factory method.

The default value RENDEZVOUS is 0. The default value RENDEZVOUS is 0. The default value RENDEZVOUS is 0, so if you don’t receive it, I’ll leave it hanging. In other words, if the consumer does not receive, the first send in produver will be suspended:

 val producer = GlobalScope.launch {
     var i = 0
     while (true) {i++ // to make it easier to output logs, we put the increment in front of logger.debug ("before send $i")
         channel.send(i)
         Logger.debug("before after $i")
         delay(1000)
     }
 }
 
 val consumer = GlobalScope.launch {
     while (trueVal element = channel.receive() logger.debug (element)}}Copy the code

We deliberately slow down the pace of the end, and you’ll notice that the send will always hang until it has been received:

    07:11:23:119 [DefaultDispatcher-worker-2 @coroutine#1] before send 1
    07:11:24:845 [DefaultDispatcher-worker-2 @coroutine1 # 2]
    07:11:24:846 [DefaultDispatcher-worker-2 @coroutine#1] before after 1
    07:11:25:849 [DefaultDispatcher-worker-4 @coroutine#1] before send 2
    07:11:26:850 [DefaultDispatcher-worker-2 @coroutine2 # 2]
    07:11:26:850 [DefaultDispatcher-worker-3 @coroutine#1] before after 2
Copy the code

His implementation of LinkedListChannel is similar to our LinkedBlockingQueue.

CONFLATED means merge, but is the same root. The con-prefix inflates. Does that mean I may receive a [1,2,3,4,5] set when I post a 1,2,3,4,5? After all, it literally means merge. But in fact the small pot is only the last element, not merge, should be a replacement, in other words, the type of the Channel has an element of buffer size, but each time there is a new element to come over, will replace the old with the new, that is to say, I send a 1, 2, 3, 4, 5 after receiving only receive, can only get the 5.

All that remains is an ArrayChannel, which receives a value as the size of the buffer capacity, similar to ArrayBlockingQueue.

3. The iterative Channel


We used while(true) earlier when we were sending and reading a Channel, because we want to keep reading and writing, and the Channel itself is actually kind of like a sequence, you can read it one at a time, So we can get a Channel iterator directly when we read:

    val consumer = GlobalScope.launch {
        val iterator = channel.iterator()
        while(iterator.hansNext()) { / / starting point
            val element = iterator.next()
            Logger.debug(element)
            delay(2000)}}Copy the code

In this case, iterator.hasNext() is the suspended function, which actually reads the element from the Channel to determine if there is a next element. This can be simplified as for each:

    val consumer = GlobalScope.launch {
        for (element in channel){
            Logger.debug(element)
            delay(2000)
        }
    }
Copy the code

4. Produce and actor


We implemented a simple production-consumer example earlier by defining a Channel outside of a coroutine and accessing it inside. Is there a convenient way to construct a producer and a consumer?

    val receiveChannel: ReceiveChannel<Int> = GlobalScope.produce {
        while(true){
            delay(2000)
            send(2)}}Copy the code

We can start a producer coroutine with the Produce method and return a ReceiveChannel, which other coroutines can use to receive data. Conversely, we can launch a consumer coroutine using Actor:

    val sendChannel: SendChannel<Int> = GlobalScope.actor<Int> {
        while(true) {val element = receive()
        }
    }
Copy the code

ReceiveChannel and SendChannel are both parent interfaces of a Channel. The former defines receive and the latter sends, so a Channel can receive and send. Produce and Actor, like launch, are called “coroutine initiators.” Coroutines started by the coroutine’s initiator are also naturally bound to the returned Channel, so the closure of the Channel is automatically completed at the end of the coroutine. For example, produce constructs a ProducerCoroutine object:

    internal open class ProducerCoroutine<E>(
        parentContext: CoroutineContext,channel:Channel<E>
    ) : ChannelCoroutine<E>(parentContext,channel,active = true),ProducerScope<E>{
        ...
        override fun onCompleted(value: Unit) {
            _channel.close() // Coroutine completion
        }
        
        override fun onCancelled(cause: Throwable,handled:Boolean) {
            val processed = _channel.close(cause) // When the coroutine is cancelled
            if(! processed && ! handled) handleCoroutineException(context,cause) } }Copy the code

Notice that in both coroutine completion and cancellation method calls, the corresponding _channel is closed.

It seems to be working. But so far the two apis produce and actors are not stable, the former is still marked as ExperimentalCoroutinesApi, the latter is marked ObsoleteCoroutinesApi, this is awkward, obviously not to use it. The discussion of the issue mentioned in the Actor documentation also shows that the Actor API provided by the Kotlin coroutine is nothing more than a return value of SendChannel compared to the actor-based concurrency framework. Of course, the coroutine leaders also have the idea of implementing a more complex set of actors, but the obvious superior for this period of time is Flow — which starts in open beta with the coroutine framework V1.2 and is stable with the coroutine V1.3, which we’ll cover in a later article.

Although Produce has not been tagged as the Obsolete Outsources API, it is clear that it is unlikely to be obsolete alone as a partner to actors, my recommendation for both apis would be to take a look.

5. Close the Channel


Earlier we mentioned that both produce and actor return channels that close when the corresponding coroutine completes execution. Oh, it turns out that a Channel has a closing concept.

Channel is different from Flow, which we will discuss in the next article. It is online and a hot data source. In other words, if you want to receive data, someone needs to send it to you on the other side, just like sending wechat. If we call close on a Channel, it will immediately stop accepting new elements, which means its isClosedForSend will immediately return true, and because of the Channel buffer, IsClosedForReceive will not return true until all the elements have been read.

    val channel = Channel<Int> (3)
    
    val producer = GlobalScope.launch {
        List(5){
            channel.send(it)
            Logger.debug("send $it")
        }
        channel.close()
        Logger.debug("close channel. ClosedForSend = ${channel.isClosedForSend} ClosedForReceive = ${channel.isClosedForReceive}")}val consumer = GlobalScope.launch {
        for (element in channel){
            Logger.debug("receiveb: $element")
            delay(1000)
        }
        
        Logger.debug(""After Consuming. ClosedForSend = ${channel.isClosedForSend} ClosedForReceive = ${channel.isClosedForReceive})
    }
Copy the code

Let’s change the example a bit and open a Channel with a buffer size of 3. The producer coroutine sends elements quickly and closes the Channel after sending 5 elements, while the Consumer coroutine reads one element per second. The result is as follows:

11:05:20:678 [DefaultDispatcher-worker-1] send 0
11:05:20:678 [DefaultDispatcher-worker-3] receive:0
11:05:20:678 [DefaultDispatcher-worker-1] send 1
11:05:20:678 [DefaultDispatcher-worker-1] send 2
11:05:20:678 [DefaultDispatcher-worker-1] send 3
11:05:21:688 [DefaultDispatcher-worker-3] receive:1
11:05:21:688 [DefaultDispatcher-worker-3] send 4
11:05:21:689 [DefaultDispatcher-worker-3] close channel.ClosedForSend = true ClosedForReceive = false
11:05:22:693 [DefaultDispatcher-worker-3] receive:2
11:05:23:694 [DefaultDispatcher-worker-4] receive:3
11:05:24:698 [DefaultDispatcher-worker-4] receive:4
11:05:25:700 [DefaultDispatcher-worker-4] After Consuming.ClosedForSend = true ClosedForReceive = true
Copy the code

Let’s explore the significance of Channel closure.

When we talk about closing an IO, we tend to think of it as an IO. If you don’t shut it down, you may leak resources. As we mentioned before, the resource inside a Channel is essentially a buffer, and this thing is essentially a linear list, just a piece of memory, so if we open a Channel and we don’t close it, we’re not actually leaking any resources, so if the originator has already sent it, It can ignore this Channel. Well, that seems all right, doesn’t it?

But, at this time, it is awkward for the receiver, it does not know whether there will be data sent, if the Channel is wechat, then the receiver may always see “the other party is typing” when opening the window of wechat, and then it will always be like this and die alone. So the closure here is more of a convention:

Woman: We don’t have a chance. Don’t wait.

Man: oh. (Your message was not sent successfully)

So who should handle the closure of a Channel? Normal communication, if it is one-way, just like the leader’s speech, will say “I’ve finished”, you can’t say “I’ve finished” when the leader hasn’t finished, so the one-way communication is recommended to be closed by the originative processing; In the case of two-way communication, negotiation should be considered. Technically, both ends of two-way communication are equal, but usually not in business scenarios. It is recommended that the dominant party handle the shutdown.

There are more complicated cases, the previous examples we saw were one-to-one sending and receiving, but there are one-to-many, many-to-many cases where there is still a dominant party, and the life cycle of a Channel is best maintained by the dominant party. Official documentation gives you fan-in and Fan-out, but this is actually the case.

The concept of fan in and fan out may not be very familiar to you, it’s not very popular on the Internet, but you can imagine it’s a folding fan, and the edge of the folding fan goes to the center of the circle and that’s the fan in, and in this case the center of the circle if it’s the end of the communication, it’s the receiver, and if it’s a function, it’s the caller. The larger the fan in, the higher the degree of module reuse. For example, if a function is called more times, the higher the degree of reuse. Fan out is the reverse, describing a complex situation, such as a Model that calls network modules, databases, files, and so on.

6. BroadcastChannel


As mentioned earlier in the one-to-many case, in terms of the data processing itself, although there are multiple receivers, the same element will only be read by one receiver. In broadcast, there is no mutual exclusion between multiple receivers.

The broadcastChannel method does not seem to be much different from the broadcastChannel method:

    val broadcastChannel = broadcastChannel<Int> (5)
Copy the code

To subscribe, just call:

    val receiveChannel = broadcastChannel.openSubscription()
Copy the code

So we get a ReceiveChannel, and we just need to call its receive to get the subscription message.

Let’s look at a more complete example where we receive a broadcast when we send 1 to 5 at the origin and start all three coroutines at the same time:

    val producer = GlobalScope.launch {
        List(5) {
            broadcastChannel.send(it)
            Logger.debug("send it")
        }
        channel.close()
    }
    List(3) { index ->
        GlobalScope.launch {
            val receiveChannel = broadcast.openSubscription()
            for (element in receiveChannel) {
                Logger.debug("[$index] receive: $element")
                delay(1000)
            }
        }
    }.forEach { it.join() }
    
    producer.join()
Copy the code

The output is as follows:

    12:34:59:656 [DefaultDispatcher-worker-6]  [2] receive: 0
    12:34:59:656 [DefaultDispatcher-worker-3]  [1] receive: 0
    12:34:59:656 [DefaultDispatcher-worker-5]  [0] receive: 0
    12:34:59:656 [DefaultDispatcher-worker-7]  send 0
    12:34:59:657 [DefaultDispatcher-worker-7]  send 1
    12:34:59:658 [DefaultDispatcher-worker-7]  send 2
    12:35:00:664 [DefaultDispatcher-worker-3]  [0] receive: 1
    12:35:00:664 [DefaultDispatcher-worker-5]  [1] receive: 1
    12:35:00:664 [DefaultDispatcher-worker-6]  [2] receive: 1
    12:35:00:664 [DefaultDispatcher-worker-8]  send 3
    12:35:01:669 [DefaultDispatcher-worker-8]  [0] receive: 2
    12:35:01:669 [DefaultDispatcher-worker-3]  [1] receive: 2
    12:35:01:669 [DefaultDispatcher-worker-6]  [2] receive: 2
    12:35:01:669 [DefaultDispatcher-worker-8]  send 4
    12:35:02:674 [DefaultDispatcher-worker-8]  [0] receive: 3
    12:35:02:674 [DefaultDispatcher-worker-7]  [1] receive: 3
    12:35:02:675 [DefaultDispatcher-worker-3]  [2] receive: 3
    12:35:03:678 [DefaultDispatcher-worker-8]  [1] receive: 4
    12:35:03:678 [DefaultDispatcher-worker-3]  [0] receive: 4
    12:35:03:678 [DefaultDispatcher-worker-1]  [2] receive: 4
Copy the code

What I want to focus on here is that each receiving coroutine can read each element.

Log order is not a very intuitive reflection of the data read and write order, if you run again, the order may be different.

Instead of creating a Channel directly, we can also use the normal Channel we defined earlier to make a transformation:

    val channel = Channel<Int> ()val broadcast = channel.broadcast(3)
Copy the code

Parameter indicates the size of the buffer.

In fact, the broadcastChannel is considered to be cascaded from the broadcastChannel. The source code for the broadcastChannel extension is clear:

    fun <E> ReceiveChannel<E>.broadcast(
        capacity: Int = 1,
        start: CoroutineStart = CoroutineStart.LAZY
    ): broadcastChannel<E> =
        GlobalScope.broadcast(Dispatchers.Unconfined, capacity = capacity, start
    = start, onCompletion = consumes()) {
        for (e in this@broadcast) { // This is actually reading the original Channel
        send(e) 
        }
    }
Copy the code

Oh ~ for BroadcastChannel, official also offer similar to produce and the actor, we can through CoroutineScope. Broadcast to directly start from a collaborators, and returns a BroadcastChannel.

Note that the transition from the original Channel to the BroadcastChannel is a read operation on the original Channel. If there are other coroutines reading the BroadcastChannel, it is mutually exclusive with the BroadcastChannel.

In addition, most marked as BroadcastChannel related API ExperimentalCoroutinesApi, follow-up may also have to adjust.

7. Channel version of the sequence generator


In the previous article we talked about sequences, whose generators are based on the standard library’s coroutine API. In fact, channels themselves can also be used to generate sequences. For example:

    val channel = GlobalScope.produce(Dispatchers.Unconfined) { 
    Logger.debug("A")
    send(1)
    Logger.debug("B")
    send(2)
          Logger.debug("Done")}for (item in channel) { Logger.debug("Got $item")}Copy the code

The coroutine created by produce returns a Channel with a buffer size of 0. To make the problem easier to describe, we pass in a Dispatchers.Unconfined Dispatchers. Means that the coroutine immediately reaches the first suspension point of the current coroutine, so it immediately outputs A and suspends at send(1) until A subsequent for loop reads the first value, which is essentially A call to the hasNext method of the channel’s iterator. This hasNext method checks to see if there is a next element, which is a suspend function. In doing so, it tells the coroutine to continue executing from where send(1) was suspended, so it sees the log B output, and then hangs up to send(2), at which point hasNext ends the suspend. The for loop finally prints the first element, and so on. The output is as follows:

    22:33:56:073 [main @coroutine#1] A
    22:33:56:172 [main @coroutine#1] B
    22:33:56:173 [main]  Got 1
    22:33:56:173 [main @coroutine#1] Done
    22:33:56:176 [main]  Got 2
Copy the code

We see that B outputs before Got1, and likewise, Done outputs before Got2. This may seem counterintuitive, but the order in which the suspended recovery is executed is true. The key point is that the hasNext method we mentioned earlier suspends and triggers the continuation of the operation within the coroutine from the point of suspension. If you choose another scheduler, of course there are other reasonable outputs. In any case, we experimented with using channels to simulate sequences. If similar code were changed to sequence, it would look like this:

    val sequence = sequence { 
        Logger.debug("A") 
        yield(1) 
        Logger.debug("B") 
        yield(2) 
        Logger.debug("Done")
    }
    
    Logger.debug("before sequence")
    
    for (item in sequence) { 
        Logger.debug("Got $item")
    }
Copy the code

The sequence execution order is much more intuitive, it does not have the concept of scheduler, and the sequence iterator hasNext and next are not suspended functions, hasNext also triggers the element lookup. This triggers the execution of the sequence’s internal logic, so this time hasNext actually triggers the output of A. Yield yields the output of 1 as the first element in the sequence, and the complete output is as follows:

    22:33:55:600 [main]  A
    22:33:55:603 [main]  Got 1
    22:33:55:604 [main]  B
    22:33:55:604 [main]  Got 2
    22:33:55:604 [main]  Done
Copy the code

Sequence is essentially implemented based on the standard library coroutine API, without the scope of the upper-level coroutine framework and the concept of jobs.

So we can switch between different schedulers to generate elements in the Channel example, for example:

    val channel = GlobalScope.produce(Dispatchers.Unconfined) { Logger.debug(1)
          send(1)
          withContext(Dispatchers.IO){
                Logger.debug(2)
                send(2) 
          }
          Logger.debug("Done")}Copy the code

Sequence doesn’t work.

Of course, using a Channel as a sequence generator is a bit of a storm in a tempest, but it’s more a way of telling you that it’s a possibility, and you can use it in the future when you’re in the right situation.

8. Internal structure of a Channel


Previously we mentioned that sequence does not enjoy the capabilities of the higher-level coroutine framework concepts, and that sequence is clearly not thread-safe, whereas Channel can be used in concurrent scenarios.

Buffers are versions of lists and arrays, respectively. The linked list version is mainly defined in the AbstractSendChannel:

    internal abstract class AbstractSendChannel<E> : SendChannel<E> { 
        protected val queue = LockFreeLinkedListHead()
        ...
    }
Copy the code

LockFreeLinkedListHead is itself a bidirectional linked list node, in fact the Channel makes it a circular list from end to end, and the Queque is the Sentinel node. When a new element is added, it is inserted at the front of the queue, which in effect is inserted at the end of the queue.

The so-called LockFree is implemented in the Java virtual machine through atomic read and write. For linked lists, all that needs to be changed is the reference of the nodes before and after:

    public actual open class LockFreeLinkedListNode {
        private val _next = atomic<Any>(this) // Node | Removed | OpDescriptor private val _prev = atomic<Any>(this) // Node | Removed. }Copy the code

Its implementation is based on a paper without chains mentioned in the table, because the CAS atomic operation is usually can only modify a reference, for the atoms at the same time to modify nodes before and after the situation is not applicable, such as when a singly linked list insert node need to modify the two references, respectively is next to a node in the operation nodes and their next, In other words, Head -> A -> B -> C needs to first modify X -> B and then modify A -> X when inserting X between A and B. If A is deleted during this process, then X may be deleted altogether and the linked list obtained is Head -> B -> C.

This implementation of unlinked lists helps to solve this problem by introducing prev, that is, at the same time that A is removed, we can actually do x.ext = B, x.rev = A, and then determine that if A has been removed, then B.rev was originally A, The result is a Head, which can be fixed by assigning x.rev to B.rev again. This process is a little more complicated, but you can also refer to the implementation of LockFreeLinkedListNode on the Jvm.

For the array version, ArrayChannel is a little rougher, with an array inside:

    // If the buffer size is larger than 8, an array of size 8 will be allocated first and expanded later
    private varbuffer: Array<Any? > = arrayOfNulls<Any? >(min(capacity,8))
Copy the code

The array is directly locked using a ReentrantLock.

Is there some room for optimization here? Actually, we can also do CAS reading and writing to the elements of an array. If you’re interested, you can refer to the ConcurrentHashMap implementation. The JDK 7 implementation uses UnSafe CAS reading and writing to the elements of an array. The UnSafe CAS is also used to read and write buckets.

The implementation of coroutines in Js and Native is much simpler, because their coroutines run only on a single thread, and there is little need to deal with concurrency.

9. Summary


The appearance of Channel, it should be said, infused coroutines with soul. Each individual coroutine is no longer a solitary individual, and a Channel allows them to collaborate more easily. In fact, the concept of a Channel was not originally invented by Kotlin, not just in Golang, but also in Java NIO, so it’s easy to think about multiplexing, Can we still simply hang when multiplexing? Or what are we gonna do without it? Let’s do the decomposition next time.


Welcome to Kotlin Chinese community!

Chinese website: www.kotlincn.net/

Chinese official blog: www.kotliner.cn/

Official account: Kotlin

Zhihu column: Kotlin

CSDN: Kotlin Chinese community

Nuggets: Kotlin Chinese Community

Kotlin Chinese Community