preface

In the previous article, we mainly explained the knowledge about asynchronous cold Flow. In this article we’ll talk about the heat flow of a Channel.

So what is a Channel?

1. Channel

1.1 know the Channel

As is shown in

A Channel is actually a concurrency-safe queue that can be used to connect coroutines and communicate between different coroutines.

In that case, let’s try a little demo:

    @Test
    fun `test know channel`(a) = runBlocking<Unit> {
        val channel = Channel<Int> ()/ / producer
        val producer = GlobalScope.launch {
            var i = 0
            while (true) {
                delay(1000)
                channel.send(++i)
                println("send $i")}}/ / consumer
        val consumer = GlobalScope.launch {
            while (true) {
                val element = channel.receive()
                println("receive $element")
            }
        }
        joinAll(producer, consumer)
    }
Copy the code

This is very simple, just two coroutines, representing producers and consumers

Let’s see how it works

receive 1 send 1 send 2 receive 2 .... Skip send 999 Receive 999Copy the code

This is easy enough to move on to the next topic!

1.2 Channel capacity

A Channel is actually a queue, and there must be a buffer in the queue, so once the buffer is full and no one calls receive and fetches send, it needs to be suspended. Deliberately slow down the receiving end and find that send always hangs and does not continue until after receive.

A lot of concepts, to try a Demo hand:

    @Test
    fun `test know channel2`(a) = runBlocking<Unit> {
        val channel = Channel<Int> ()/ / producer
        val producer = GlobalScope.launch {
            var i = 0
            while (true) {
                delay(1000)
                channel.send(++i)
                println("send $i")}}/ / consumer
        val consumer = GlobalScope.launch {
            while (true) {
                delay(2000)
                val element = channel.receive()
                println("receive $element")
            }
        }
        joinAll(producer, consumer)

    }
Copy the code

Here we see that the consumer takes more time than the producer, so

Let’s see how it works

Receive 1 Send 1 Receive 2 // Send 2 wait 2 seconds send 2 Receive 3 // Send 3 wait 2 secondsCopy the code

This runtime also validates that send needs to be suspended once the buffer is full and no one calls receive and fetches it.

In plain English, when it takes more time for the consumer to process an element than the producer to produce an element, and the cache is full, the producer will be lazy and wait for the consumer to process the buffer.

That’s easy enough to understand, I believe. Now, the next topic

1.3 the iterative Channel

A Channel itself is like a sequence, so we can get an iterator of a Channel directly when we read it.

    @Test
    fun `test iterate channel`(a) = runBlocking<Unit> {
        val channel = Channel<Int>(Channel.UNLIMITED)
        / / producer
        val producer = GlobalScope.launch {
            for (x in 1.. 5) {
                channel.send(x * x)
                println("send ${x * x}")}}/ / consumer
        val consumer = GlobalScope.launch {
            /*val iterator = channel.iterator() while (iterator.hasNext()){ val element = iterator.next() println("receive $element") delay(2000) }*/
			
			// Either way
            for (element in channel) {
                println("receive $element")
                delay(2000)
            }
        }
        joinAll(producer, consumer)

    }
Copy the code

It’s all in the notes.

Let’s see how it works

Send 1 Send 4 Send 9 Send 16 Send 25 // The first five messages are sent almost instantaneously. Receive 1 // Each subsequent message is sent at an interval of two seconds. Receive 4 Receive 9 Receive 16 Receive 25Copy the code

As we can see, this works completely differently from 1.2! Here the producer does all the work ahead of time without waiting for the corresponding consumer processing to complete!

We have mentioned above: the conditions for producers to be “lazy” : first, consumers take longer to process than producers; Second, the cache must be full!

Val Channel = Channel

(channel.unlimited) : val Channel = Channel

(channel.unlimited)

1.4 produce and actor

  • A convenient way to structure producers and consumers
  • We can start a producer coroutine with the Produce method and return a ReceiveChannel. Other coroutines can use this Channel to receive data. Conversely, we can use actors to launch a consumer coroutine!

With the concept out of the way, it’s time to get started

1.4.1 use produce

    @Test
    fun `test fast producer channel`(a) = runBlocking<Unit> {
    	// the producer,
        val receiveChannel: ReceiveChannel<Int> = GlobalScope.produce<Int> {
            repeat(100) {
                delay(1000)
                send(it)
            }
        }
		/ / consumer
        val consumer = GlobalScope.launch {
            for (i in receiveChannel) {
                println("received: $i")
            }
        }
        consumer.join()
    }
Copy the code

Let’s see how it works

Received: 0 // Prints every second received: 1 Received: 2 Received: 3... slightlyCopy the code

ReceiveChannel producer coroutine is returned via GlobalScope. Produce, and the consumer can use ReceiveChannel to receive the data generated by the producer. On to the next one!

1.4.2 use actor

    @Test
    fun `test fast consumer channel`(a) = runBlocking<Unit> {
        val sendChannel: SendChannel<Int> = GlobalScope.actor<Int> {
            while (true) {
                val element = receive()
                println(element)
            }
        }

        val producer = GlobalScope.launch {
            for (i in 0.3.) {
                sendChannel.send(i)
            }
        }

        producer.join()
    }
Copy the code

Let’s see how it works

0
1
2
3
Copy the code

Here we see the corresponding consumer sendChannel generated by GlobalScope.actor, which sends data to the corresponding consumer via sendChannel.send(I) in the corresponding producer!

Move on to the next one!

1.5 Closure of Channel

  • The Channel returned by produce and actor will be closed after the execution of the corresponding coroutine, which is why the Channel is called hot data flow.

  • For a Channel, if we call its close method, it will immediately stop receiving new elements, that is, its isClosedForSend will immediately return true;

    • Due to the Channel buffer, some elements may not be processed until all elements have been readisClosedForSendWill return true;
  • The life cycle of a Channel is best maintained by the dominant party. It is recommended that the dominant party close the Channel.

    • Because there may be one producer for multiple consumers, just like a teacher giving a lecture, there are multiple students listening, the teacher is responsible for the signal of whether to leave or leave the class, not the students!

As always, when the concept is finished, start the Demo:

    @Test
    fun `test close channel`(a) = runBlocking<Unit> {
        val channel = Channel<Int> (3)
        / / producer
        val producer = GlobalScope.launch {
            List(3) {
                channel.send(it)
                println("send $it")}// The producer takes the lead in the lifecycle and executes the shutdown!
            channel.close()
            println("""close channel. 
                |  - ClosedForSend: ${channel.isClosedForSend}
                |  - ClosedForReceive: ${channel.isClosedForReceive}"" ".trimMargin())
        }

        / / consumer
        val consumer = GlobalScope.launch {
            for (element in channel){
                println("receive $element")
                delay(1000)
            }
            println("""After Consuming. 
                |   - ClosedForSend: ${channel.isClosedForSend} 
                |   - ClosedForReceive: ${channel.isClosedForReceive}"" ".trimMargin())
        }

        joinAll(producer, consumer)
    }

Copy the code

Here we see that only the producers dominate the life cycle, the rest is state printing!

Let’s see how it works

send 0
receive 0
send 1
send 2
close channel. 
 - ClosedForSend: true
 - ClosedForReceive: false
receive 1
receive 2
After Consuming. 
 - ClosedForSend: true 
 - ClosedForReceive: true
Copy the code

It can be seen from this running effect:

  • When the producer completes execution: correspondsClosedForSendTo true;
  • When the consumer completes execution: correspondingClosedForReceiveTo true.

1.6 BroadcastChannel

As mentioned above, producer and consumer exist one-to-many situation in Channel. In terms of data processing itself, although there are multiple receivers, the same element can only be read by one receiver. This is not the case with broadcast, where there is no mutual exclusion between multiple receivers.

Let’s see how this broadcast is used:

    @Test
    fun `test broadcast`(a) = runBlocking<Unit> {
        //val broadcastChannel = BroadcastChannel<Int>(Channel.BUFFERED)
        val channel = Channel<Int> ()// The default cache size is used
        // Initialize three consumers
        val broadcastChannel = channel.broadcast(3)
        val producer = GlobalScope.launch {
            List(3){
                delay(100)
                broadcastChannel.send(it)
            }
            // The lifecycle is managed by the lead
            broadcastChannel.close()
        }

		// Create three consumers
        List(3){ index ->
            GlobalScope.launch {
                val receiveChannel = broadcastChannel.openSubscription()
                for (i in receiveChannel){
                    println("[#$index] received: $i")
                }
            }
        }.joinAll()
    }
Copy the code

It’s all in the notes,

Let’s see how it works

[#0] received: 0
[#1] received: 0
[#2] received: 0
[#0] received: 1
[#1] received: 1
[#2] received: 1
[#0] received: 2
[#2] received: 2
[#1] received: 2
Copy the code

From this effect, we can see that multiple consumers can receive the same information from the same generator at the same time, and there is no mutual exclusion!

2, select-multiplexing

What is multiplexing

In a data communication system or computer network system, the bandwidth or capacity of the transmission medium is often greater than the need to transmit a single signal. In order to effectively utilize the communication line, one channel is expected to transmit multiple information simultaneously. This is called Multiplexing technology.

2.1 Multiplexing multiple await

As is shown in

The two apis fetch data from the network and the local cache, respectively, and display whichever is expected to be returned first.

2.1.1 Start actual combat

The service side

public class UserServlet extends HttpServlet {

    @Override
    protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
        String user = request.getParameter("user");
        if(user ! =null){
            System.out.println(user);
        }
        System.out.println("doGet");
        PrintWriter out = response.getWriter();
        JsonObject jsonObject = new JsonObject();
        jsonObject.addProperty("name"."jason");
        jsonObject.addProperty("address"."California"); out.write(jsonObject.toString()); System.out.println(jsonObject.toString()); out.close(); }}Copy the code

The server is using the original HttpServlet+TomCat mode, without SpringBoot, the code is very simple, I will not say more.

The client

private val cachePath = "E://coroutine.cache" {"name":" HQK ","address":" chengdu "}
private val gson = Gson()

data class Response<T>(val value: T, val isLocal: Boolean)

// Load user information locally
fun CoroutineScope.getUserFromLocal(name: String) = async(Dispatchers.IO) {
    delay(10000) // Intentional delay hangs for 10 seconds
    File(cachePath).readText().let { gson.fromJson(it, User::class.java) }
}

// Load user information through the network
fun CoroutineScope.getUserFromRemote(name: String) = async(Dispatchers.IO) {
    userServiceApi.getUser(name)
}

class CoroutineTest02 {
    @Test
    fun `test select await`(a) = runBlocking<Unit> {
        GlobalScope.launch {
            val localRequest = getUserFromLocal("xxx")
            val remoteRequest = getUserFromRemote("yyy")

            val userResponse = select<Response<User>> {
                localRequest.onAwait { Response(it, true) }
                remoteRequest.onAwait { Response(it, false) } } userResponse.value? .let { println(it) } }.join() } }// Define the user data class
data class User(val name: String, val address: String)

//Retrofit network data requests
val userServiceApi: UserServiceApi by lazy {
    valretrofit = retrofit2.Retrofit.Builder() .client(OkHttpClient.Builder().addInterceptor { it.proceed(it.request()).apply {  Log.d("hqk"."request:${code()}")
                    //Log.d("hqk", "boy:${body()? .string()}")
                }
            }.build())
            .baseUrl("http://10.0.0.130:8080/kotlinstudyserver/")
            .addConverterFactory(GsonConverterFactory.create())
            .build()
    retrofit.create(UserServiceApi::class.java)
}


interface UserServiceApi {
	
	// Get user information
    @GET("user")
    suspend fun getUser(@Query("name") name: String) : User
}

Copy the code

The @test Test class calls the local and network user methods, and the select{} class calls the corresponding method onAwait, and returns the userResponse object

Let’s see how it works

User(name=jason, address=California)
Copy the code

Because the fetch from the local user is pending for 10 seconds, the network request takes less time than the local load time, so in this case, network data is being loaded.

Let’s say we comment out the local suspend 10 and run it again to see what happens:

User(name= HQK, address= HQK)Copy the code

Obviously, local data is loaded here, not network data.

When multiplexing more than one await, whichever one returns first will be displayed first

2.2 Multiplexing multiple Channels

Similar to await, receive the fastest Channel message.

    @Test
    fun `test select channel`(a) = runBlocking<Unit> {
        val channels = listOf(Channel<Int>(), Channel<Int>())
        GlobalScope.launch {
            delay(100)
            channels[0].send(200)
        }

        GlobalScope.launch {
            delay(50)
            channels[1].send(100)}val result = select<Int? > { channels.forEach { channel -> channel.onReceive { it } } } println(result) }Copy the code

Let’s take a look at the results:

100
Copy the code

Here we see that the corresponding channels are combined into a list by listOf, and then two coroutines are opened respectively, which are suspended for different times in the corresponding coroutine. Finally, we see that the channel message is received for short execution!

2.3 SelectClause

How do we know which events can be selected? All times that can be selected are of type SelectClauseN.

  • SelectClause0: The event has no return value. For example, if join has no return value, onJoin is of SelectClauseN type. When used, the argument to onJoin is a no-parameter function.
  • SelectClause1: The corresponding event has a return value, as is the case with onAwait and onReceive above (I won’t use this example below)
  • SelectClause2: The corresponding event has a return value. In addition, an additional parameter is required. For example, channel. onSend has two parameters. The second is the callback function on success.

If we want to verify that a suspended function supports SELECT, we can simply check to see if there is a corresponding SelectClauseN callback.

Concept said a lot of actual combat to see the effect:

2.3.1 Example 1 (SelectClause0)

    @Test
    fun `test SelectClause0`(a) = runBlocking<Unit> {
        val job1 = GlobalScope.launch {
            delay(100)
            println("job 1")}val job2 = GlobalScope.launch {
            delay(10)
            println("job 2")
        }

        select<Unit> {
            job1.onJoin { println("job 1 onJoin") }
            job2.onJoin { println("job 2 onJoin") }
        }

        delay(1000)}Copy the code

Let’s see how it works:

job 2
job 2 onJoin
job 1
Copy the code

This is a very standard coroutine that corresponds to an event that does not return any value, and this is the SelectClause0 type described above.

2.3.2 Example 2 (SelectClause2)

    @Test
    fun `test SelectClause2`(a) = runBlocking<Unit> {
        val channels = listOf(Channel<Int>(), Channel<Int>())
        println(channels)
        launch(Dispatchers.IO) {
             select<Unit? > { launch { delay(10)
                    channels[1].onSend(200) { sentChannel ->
                        println("sent 1 on $sentChannel")
                    }
                }
                launch {
                    delay(100)
                    channels[0].onSend(100) { sentChannel ->
                        println("sent 0 on $sentChannel")
                    }
                }
            }
        }
        GlobalScope.launch {
            println(channels[0].receive())
        }
        GlobalScope.launch {
            println(channels[1].receive())
        }
        delay(1000)}Copy the code

Let’s see how it works

[RendezvousChannel@2a084b4c{EmptyQueue}, RendezvousChannel@42b93f6b{EmptyQueue}] 200 sent 1 on RendezvousChannel@42b93f6b{EmptyQueue} // The callback succeededCopy the code

Here we see the use of Channels. OnSend. The first argument is of the corresponding type, and the second argument will call back the function, that is, the contents of the curly braces will call back the successful business logic.

2.4 Use Flow to achieve multiplexing

In most cases, we can achieve the effect of multiplexing by constructing the appropriate Flow.

private val cachePath = "E://coroutine.cache" {"name":" HQK ","address":" chengdu "}
private val gson = Gson()

data class Response<T>(val value: T, val isLocal: Boolean)

// Obtain user information locally
fun CoroutineScope.getUserFromLocal(name: String) = async(Dispatchers.IO) {
// Delay (10000) // Intentional delay
    File(cachePath).readText().let { gson.fromJson(it, User::class.java) }
}

// Obtain user information from the network
fun CoroutineScope.getUserFromRemote(name: String) = async(Dispatchers.IO) {
    userServiceApi.getUser(name)
}



class CoroutineTest02 {
    @Test
    fun `test select flow`(a) = runBlocking<Unit> {
        // function -> coroutine -> Flow -> Flow merge
        val name = "guest"
        coroutineScope {
        	// Add the corresponding method call to the list collection by scope
            listOf(::getUserFromLocal, ::getUserFromRemote)
            	// if you iterate over each method in the collection, function is the corresponding method
                .map { function ->
                    function.call(name) // After calling the corresponding method, the result is passed to the next map
                }.map { deferred -> // The corresponding method returns the result of deferred
                    flow { emit(deferred.await()) }// If you get someone, send the value through flow
                }.merge() / / merger
                .collect { user -> println(user) } // Just receive the flow corresponding emission value}}}Copy the code

It’s all in the notes,

Let’s see how it works

User(name= HQK, address= chengdu) User(name= Jason, address=California)Copy the code

Here we see that both local and network have been successfully received!

3. Concurrency security

3.1 Insecure concurrent access

Thread safety is always a problem when solving concurrency problems with threads, and the Implementation of Kotlin coroutines on the Java platform is not immune to concurrent scheduling, so thread safety is also worth paying attention to.

Such as:

    @Test
    fun `test not safe concurrent`(a) = runBlocking<Unit> {
        var count = 0
        List(1000) {
            GlobalScope.launch { count++ }
        }.joinAll()
        println(count)
    }
Copy the code

We can see that we have 1000 coroutines going on, and each coroutine increments count by one, which ideally should be 1000

Let’s see how that works out. Okay

973 // The value is different each time you run it againCopy the code

Now we see the real value, which is not ideal, so we need to pay attention to the concurrency!

3.2 Concurrency tools for coroutines

In addition to the tools we use to solve concurrency problems in threads, the coroutine framework also provides concurrency safety tools, including:

  • Channel: a concurrency safe message Channel that we are already familiar with

  • Mutex: a lightweight lock whose lock and unlock are semantically similar to thread locks. Mutex is lightweight because it does not block a thread until the lock is acquired, but suspends until the lock is released.

  • Semaphore: A lightweight Semaphore that can have multiple semaphores and can perform concurrent operations on the coroutine.

    • whenSemaphoreWhen the parameter of is 1, the effect is equivalent toMutex

All that said, give it a go!

3.2.1 Example 1 (using AtomicXXX)

    @Test
    fun `test safe concurrent`(a) = runBlocking<Unit> {
        var count = AtomicInteger(0)
        List(1000) {
            GlobalScope.launch { count.incrementAndGet() }
        }.joinAll()
        println(count.get()}Copy the code

This is a generic Java solution: atomic manipulation classes

Run effect on 1000, the effect is not posted.

3.2.2 Example 2 (Using Mutex)

   @Test
    fun `test safe concurrent tools`(a) = runBlocking<Unit> {
        var count = 0
        val mutex = Mutex()
        List(1000) {
            GlobalScope.launch {
                mutex.withLock {
                    count++
                }
            }
        }.joinAll()
        println(count)
    }
Copy the code

We can see that the Mutex object is initialized before the coroutine starts, and the corresponding logic is locked with mutex.withlock before the corresponding coroutine adds itself.

On to the next one!

3.2.3 Example 3 (Using Semaphore)

    @Test
    fun `test safe concurrent tools2`(a) = runBlocking<Unit> {
        var count = 0
        val semaphore = Semaphore(1)
        List(1000) {
            GlobalScope.launch {
                semaphore.withPermit {
                    count++
                }
            }
        }.joinAll()
        println(count)
    }
Copy the code

Here we can see that Semaphore(1) is used to obtain the corresponding object, and then the concurrent logic is solved with Semaphore. WithPermit.

3.3 Avoid accessing external mutable states

    @Test
    fun `test avoid access outer variable`(a) = runBlocking<Unit> {
        var count = 0
        val result = count + List(1000){
            GlobalScope.async { 1 }
        }.map { it.await() }.sum()
        println(result)
    }
Copy the code

Functions are written not to access external state, but to perform operations based on parameters and provide results via return values

conclusion

Well, that’s the end of this piece! I believe you should know something about Channel! In the next chapter, the comprehensive application of coroutine Flow will be explained in detail