“This article has participated in the good article call order activity, click to see: back end, big front end double track submission, 20,000 yuan prize pool for you to challenge!”

preface

What is a coroutine? What is a coroutine? What is a coroutine? How does a coroutine switch threads?

In addition to the above introduction CoroutineContext CoroutineScope etc., there are two important concepts in coroutines, This paper mainly makes an in-depth study of Flow(cold Flow) and Channel(hot Flow), mainly including the following contents: 1. What is Channel and the basic use of Channel 2 4. Analysis and introduction of the principles behind Flow

1. ChannelBasic use of

1.1 ChannelWhat is?

A Channel is actually a queue. It is a BlockQueue for data transfer between multiple coroutines. It is used for intercoroutine communication

1.2 ChannelImplement the producer-consumer model

The producer-consumer pattern in traditional Java is simple: one or more producer threads, a common blocking queue (often with ArrayBlockingQueue and LinkedBlockingQueue options), and one or more consumer threads. Producers continuously queue data into a blocking queue, and consumers recycle elements out of the queue for consumption.

So how do you implement the producer-consumer pattern with channels?

    fun produceAndConsume(a) {
        GlobalScope.launch {
            val channel = Channel<Int> ()val producer = GlobalScope.launch {
                var i = 0
                while (true) {
                    Log.i(tag, "The producer produces:$i")
                    channel.send(i++)
                    delay(1000)}}val consumer = GlobalScope.launch {
                while (true) {
                    val element = channel.receive()
                    Log.i(tag, "Consumers spent:$element")
                }
            }
            producer.join()
            consumer.join()
        }
    }
/ / outputI/ProduceAndConsume:0I/ProduceAndConsume:0I/ProduceAndConsume:1I/ProduceAndConsume:1I/ProduceAndConsume:2I/ProduceAndConsume:2I/ProduceAndConsume:3I/ProduceAndConsume:3
Copy the code
  1. See to useChannelIt’s easier to implement the producer-consumer pattern
  2. The producer and consumer are called alternately, because the producer will hang if the cache is full after production, and the consumer will hang if the cache is empty

1.3 Buffer Capacity

We mentioned above that buffers hang when they are full, so what is the buffer capacity?

    public fun <E> Channel(capacity: Int = RENDEZVOUS): Channel<E> =
        when (capacity) {
            RENDEZVOUS -> RendezvousChannel()
            UNLIMITED -> LinkedListChannel()
            CONFLATED -> ConflatedChannel()
            else -> ArrayChannel(capacity)
        }
Copy the code

As you can see, different implementations of the rendezvous Schannel parameters will be passed as we initialize. 1. The rendezvous schannel cache has a size of 0, and will remain hanged if no one receives it after it is sent. 3.ConflatedChannel has a cache of one element size, but every time a new element comes, it replaces the old one with the new one. 4

2.ChannelThe principle of analytic

2.1 send.receiveProcess analysis

As described above, the producer and consumer are called alternately. After the producer is produced, if the cache is full, it will be suspended, while the consumer will also be suspended if the cache is empty. The specific process is as follows:

  1. ifreceiveOperation queue containsSendElement wakes up asynchronouslysendcoroutines
  2. ifreceiveNot included in the queue package during operationSendThe element is suspendedreceivecoroutines
  3. ifsendOperation queue containsreceiveElement wakes up asynchronouslyreceivecoroutines
  4. Suspend the send coroutine if the queue does not contain receive at the time of the send operation

Because of the space reason is not here to post the source code, want to understand the source code of students can refer to: Kotlin coroutine Channel receive and send principle analysis

2.2 ChannelwithBlockingQueueThe difference between

1. Suspend coroutines instead of blocking them. A Channel uses pending send and receive instead of blocking put and take. 2. Better performance. A Channel has better suspended performance than BlockingQueue, which involves thread blocking and waking up. A lot of thread resources are wasted in blocking. Disable. Channel can be closed at any time. When the sender receives the close instruction, it will stop sending immediately. When the element in the cache has been sent, the receiver will also close. Supports exception handling. Channel uses structured concurrency to handle exceptions, which can be implemented when one producer or consumer coroutine throws an exception and all producer and consumer coroutines are cancelled immediately. You can avoid the problem of a failure of a task in multiple threads and mistakenly think that all of them have succeeded

For details, see Kotlin Channel and producer-consumer model

2.3 ChannelHow is it thread-safe?

We know that a Channel can be used to communicate between multiple coroutines, which may run on multiple threads. So Channel also has to deal with thread-safe issues. How do you ensure that?

The Channel buffer can be implemented as a linked list or an array

2.3.1 Linked list implementation

The linked list implementation stores the cache in LockFreeLinkedListHead

    internal abstract class AbstractSendChannel<E> : SendChannel<E> { 
        protected val queue = LockFreeLinkedListHead()
        ...
    }
Copy the code

LockFreeLinkedListHead itself is actually a bidirectional linked list node. The so-called LockFree in Java virtual machine is actually implemented by CAS atomic operation. Its implementation principle comes from a paper lock-free and Practical 1. Key function: One-word compare-and-swap 1. Key function: One-word compare-and-swap

2.3.2 Array implementation

For the array version,ArrayChannel is simply an array inside

    // If the buffer size is larger than 8, an array of size 8 will be allocated first and expanded later
    private varbuffer: Array<Any? > = arrayOfNulls<Any? >(min(capacity,8))
Copy the code

The array is directly locked using a ReentrantLock. You can see that a Channel is also thread-safe by locking or CAS

2.4 ChannelDesign principles

A Channel is a method of intercoroutine communication. It uses the CSP(revised sequential Processes) model. Compared with some inter-thread communication schemes, a Channel has the following characteristics:

Do not communicate by sharing memory; instead, share memory by communicating.

Do not use shared memory to communicate, but use communication to share memory.

However, in essence, the thread and coroutine information on the computer is carried out through “shared memory”, because no matter what kind of communication model, the thread or coroutine will eventually get the data from memory, and the underlying implementation of the Channel also needs to lock the shared memory

Since it’s all shared memory how is it different from using shared memory ourselves? So it would be more accurate to say “Why are we synchronizing information by sending messages instead of sharing memory directly with multiple threads or coroutines?”

  • 1. First of all, using sending messages to synchronize information is a higher level of abstraction than using shared memory and mutex directly. Using higher level of abstraction can provide better encapsulation in the program design and make the logic of the program clearer.
  • 2. Secondly, message sending also has some advantages over shared memory in terms of decoupling. We can divide the responsibilities of threads into producers and consumers, and decouple them by means of message passing, without relying on shared memory.
  • 3. Finally, the choice to use the message sending method, by ensuring that only one active thread can access the data at a time, can naturally avoid thread contention and data conflict problems;

The above is just a summary of the CSP model, if you are interested, you can see the detailed analysis: Why use communication to share memory

3. FlowBasic use of

Flow is a combination of Kotlin coroutines and the responsive programming model, and you’ll find that it’s very similar to RxJava, with an API for converting between the two, which is very easy to use. Flow has the following characteristics: 1. Cold data Flow, not consumed, not produced, this is the opposite of Channel: the sender of a Channel is not dependent on the receiver. 2.Flow changes the thread of data emission through flowOn, and the thread of data consumption is determined by the thread of coroutine 3. 4.Flow does not provide a cancellation method. You can cancel the Flow by canceling the coroutine in which the Flow is located

The specific use is as follows:

lifecycleScope.launch {
    flow {
        for (i in 1.10.) {
            emit(i)
        }
    }.flowOn(Dispatchers.Main)
        .catch {
            // Exception handling
        }
        .onCompletion {
            // Complete the callback
        }
        .collect { num ->
            // Specific consumption processing
            // Data is produced only when it is collect
            // ...}}Copy the code

4.FlowThe principle of analytic

Now that we’ve introduced the basic uses and features of Flow, we can ask two questions; 1. Why is Flow a cold Flow? 2. How does Flow switch threads?

4.1 FlowWhy is it a cold flow?

Cold flow refers to the production of data when consumption starts, not consumption, not production, let’s take a look at the source first look at flow{} what happens

public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>. () - >Unit): Flow<T> = SafeFlow(block)

// Named anonymous object
private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() {
    override suspend fun collectSafely(collector: FlowCollector<T>) {
        collector.block()
    }
}
Copy the code

As you can see, what we’re doing in Flow {} is very simple. We’re basically creating a SafeFlow that inherits from AbstractFlow. Let’s take a look at what’s in AbstractFlow

public abstract class AbstractFlow<T> : Flow<T> {

    @InternalCoroutinesApi
    public final override suspend fun collect(collector: FlowCollector<T>) {
  		// 1. The collector makes a layer of packaging
        val safeCollector = SafeCollector(collector, coroutineContext)
        try {
  			// 2. Process the data receiver
            collectSafely(safeCollector)
        } finally {
          	// 3. Release coroutine related parameters
            safeCollector.releaseIntercepted()
        }
    }

    // The collectSafely method should follow the following constraints
    // 1. You should not switch threads inside the collectSafely method, such as withContext(dispatchers.io)
    // 2. CollectSafely is not thread safe by default
    public abstract suspend fun collectSafely(collector: FlowCollector<T>)
}

private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() {
    override suspend fun collectSafely(collector: FlowCollector<T>) {
        collector.block()
    }
}
Copy the code

The discovery does three main things: 1. A wrapper around the data receiver FlowCollector, SafeCollector 2. Call the abstract method AbstractFlow#collectSafely inside it. 3. Release some information about the coroutine.

In conjunction with the SafeFlow we looked at earlier below, it implements the AbstractFlow#collectSafely method and calls collecte.block (), which runs the code in the flow{} block. Now it is clear why Flow is a cold Flow. This is because it only triggers sending data every time it collects

4.2 FlowHow do you switch threads

FlowThreads are switched in a similar way as coroutines switch threads

All by starting a subcoroutine and then going throughCoroutineContextIn theDispatchersSwitch threads

Here’s the differenceFlowSwitching process usedChannelTo pass data

Because the Flow switch thread of the source code is too much, it is not described here, interested students can follow the source code, details can be seen: flowOn() how to switch coroutine

conclusion



As shown above, this article focuses onChannel.FlowSome questions are raised and some answers are given.

More difficult, if this article is helpful to you, welcome to like the collection ~

The resources

Why use communication to share memory to break Kotlin coroutines