preface

Originally, this paper was going to write paging corresponding to Jetpack, but when sorting out the data, Kotlin found that he had not explained Flow, which is also a key point, so this paper will explain Flow in detail!

It is convenient to combine Flow and Paging for mixed explanation.

In that case! So what is Flow?

1. Know Flow

1.1 Introduction to Kotlin Flow

The official document gives a brief introduction:

Flow — cold asynchronous stream with flow builder and comprehensive operator set (filter, map, etc);

Flow is something like an RxJava Observable. An Observable can be Cold or Hot.

Official is too official, RxJava is not familiar with the small partners, light by this concept is still a bit of a cloud.

Let’s go deeper into Flow through a series of small cases.

1.1 How to Represent Multiple Values?

Think about it: suspended functions can asynchronously return a single value, but how do you asynchronously return multiple calculated values?

emmm….

What can be done to return multiple values asynchronously?

  • The collection?
  • The sequence?
  • Suspend function?
  • And of course Flow, right?

So let’s see how this works, right?

1.1.1 collection

    fun simpleList(a): List<Int> = listOf<Int> (1.2.3)

    @Test
    fun `test multiple values`(a) {
        simpleList().forEach { value -> println(value) }
    }
Copy the code

This does return multiple values, but not asynchronously!

1.1.2 sequence

    fun simpleSequence(a): Sequence<Int> = sequence {
        for (i in 1.3.) {
            // thread.sleep (1000) // block the current Thread directly, not asynchronously!
            // suspend (1000) // suspend (1000) Even if it was used here, it would not be used in the test below!
            yield(i)
        }
    }

    @Test
    fun `test multiple values`(a) {
		simpleSequence().forEach { value -> println(value) }
    }
Copy the code

This method also returns multiple values, but not asynchronously!

1.1.3 Suspend functions

    suspend fun simpleList2(a): List<Int> {
        delay(1000)
        return listOf<Int> (1.2.3)}@Test
    fun `test multiple values2`(a) = runBlocking<Unit> {
        simpleList2().forEach { value -> println(value) }
    }
Copy the code

This method returns multiple values and is also asynchronous! Satisfy async return multiple values!

So what about Flow?

1.1.4 Flow way

    fun simpleFlow(a) = flow<Int> {
        for (i in 1.3.) {
            delay(1000) // Pretend to be in something important
            emit(i) // Emit, producing an element}}@Test
    fun `test multiple values3`(a) = runBlocking<Unit> {
        simpleFlow().collect { value -> println(value) }
    }
Copy the code

Here we see the simpleFlow method, which uses the delay suspend function but does not have the suspend modifier, so it is not a suspend function! Can be used anywhere! (Non-coroutine module, non-suspended module)

Having looked at the examples above, let’s conclude:

1.1.5 Difference between Flow and other methods

  • The flow type named flow builds the function
  • flow{... }Code in a building block can be suspended
  • The function simpleFlow is no longer marked with the suspend modifier
  • Streams emit values using the emit function
  • The stream uses the collect function to collect values

Here’s another picture, just to make a better impression

Now that you have a general idea of Flow, what does it do?

1.2 application Flow

File download is a classic example of Flow in Android development

As is shown in

When the file is downloaded, the corresponding background download progress can send data through the emit in Flow and receive the corresponding data through collect. (Paging corresponding to Jetpack will be explained later)

1.3 cold flow

    fun simpleFlow2(a) = flow<Int> {
        println("Flow started")
        for (i in 1.3.) {
            delay(1000)
            emit(i)
        }
    }

    @Test
    fun `test flow is cold`(a) = runBlocking<Unit> {
        val flow = simpleFlow2()
        println("Calling collect...")
        flow.collect { value -> println(value) }
        println("Calling collect again...")
        flow.collect { value -> println(value) }
    }
Copy the code

The results

Calling collect...
Flow started
1
2
3
Calling collect again...
Flow started
1
2
3
Copy the code

It can be seen from this run result:

Flow is a cold Flow similar to a sequence, and the code in the Flow builder does not run until the Flow is collected.

ChannelFlow is heat flow.

1.4 Continuity of flow

    @Test
    fun `test flow continuation`(a) = runBlocking<Unit> {(1.. 5).asFlow().filter {
            it % 2= =0
        }.map {
            "string $it"
        }.collect {
            println("Collect $it")}}Copy the code

The results

Collect string 2
Collect string 4
Copy the code

From this running effect, we know:

  • Each individual collection of a stream is performed sequentially, unless a special operator is used
  • Each emitted value is processed by each transition operator upstream and downstream, and then passed to the end operator

1.5 Stream Builder

    @Test
    fun `test flow builder`(a) = runBlocking<Unit> {
        flowOf("one"."two"."three")
                .onEach { delay(1000) }
                .collect { value ->
                    println(value)
                }

        (1.3.).asFlow().collect { value ->
            println(value)
        }
    }
Copy the code

Running effect

one
two
three
1
2
3
Copy the code

From this running effect, we can know that the corresponding flow flow can be constructed through the corresponding flowOf and asFlow

1.6 Stream context

Here will be a few small cases for detailed explanation

1.6.1 case a

    fun simpleFlow3(a) = flow<Int> {
        println("Flow started ${Thread.currentThread().name}")
        for (i in 1.3.) {
            delay(1000)
            emit(i)
        }
    }
    
    @Test
    fun `test flow context`(a) = runBlocking<Unit> {
        simpleFlow3()
                .collect { value -> println("Collected $value ${Thread.currentThread().name}")}}Copy the code

Running effect

Flow started Test worker @coroutine# 1
Collected 1 Test worker @coroutine# 1
Collected 2 Test worker @coroutine# 1
Collected 3 Test worker @coroutine# 1
Copy the code

From here you can see that the emit to collect context runs through all of them, all in the same context.

What if you want to use another context for emit?

1.6.2 second case

    fun simpleFlow4(a) = flow<Int> {
        withContext(Dispatchers.Default) {
            println("Flow started ${Thread.currentThread().name}")
            for (i in 1.3.) {
                delay(1000)
                emit(i)
            }
        }
    }
    
    @Test
    fun `test flow context`(a) = runBlocking<Unit> {
        simpleFlow4()
                .collect { value -> println("Collected $value ${Thread.currentThread().name}")}}Copy the code

Running effect

Flow invariant is violated:
		Flow was collected in [CoroutineId(1), "coroutine#1":BlockingCoroutine{Active}@3f3e9270, BlockingEventLoop@67bc91f8], ... slightlyCopy the code

Proffer: tender, tender, tender, tender, tender, tender, tender, tender, tender, tender, tender, tender, tender, tender, tender, tender, tender, tender, tender, tender, tender, tender, tender, tender, tender

1.6.3 case three

    fun simpleFlow5(a) = flow<Int> {
        println("Flow started ${Thread.currentThread().name}")
        for (i in 1.3.) {
            delay(1000)
            emit(i)
        }
    }.flowOn(Dispatchers.Default)
    
    @Test
    fun `test flow context`(a) = runBlocking<Unit> {
        simpleFlow5()
                .collect { value -> println("Collected $value ${Thread.currentThread().name}")}}Copy the code

Running effect

Flow started DefaultDispatcher-worker-1 @coroutine# 2
Collected 1 Test worker @coroutine# 1
Collected 2 Test worker @coroutine# 1
Collected 3 Test worker @coroutine# 1
Copy the code

From these few small cases can be concluded

  • The collection of a flow always takes place in the context of the invocation of the coroutine, and this property of the flow is held in context.
  • flow{... }Code in the builder must store properties contextually and not allow them to occur from other contexts (emit)
  • FlowOn operator, this function is used forChange the context in which the flow is emitted

1.7 start the flow

1.7.1 case a

    / / the event source
    fun events(a) = (1.3.)
            .asFlow()
            .onEach { delay(100) }
            .flowOn(Dispatchers.Default)

    @Test
    fun `test flow launch`(a) = runBlocking<Unit> {
        val job = events()
                .onEach { event -> println("Event: $event ${Thread.currentThread().name}")}// .collect {}
                .launchIn(CoroutineScope(Dispatchers.IO)) // Another context is used here to pass in Flow
//.launchin (this)// Here the current context is passed in as Flow

        delay(200)
        job.cancelAndJoin()
    }

Copy the code

Here we see that collect has been cancelled and launchIn has been changed, and a new context has been passed in as the save context of Flow.

Job.join () or job.cancelandJoin () is called to wait for the Flow to complete its operation

Running effect

Event: 1 DefaultDispatcher-worker-1 @coroutine# 2
Event: 2 DefaultDispatcher-worker-1 @coroutine# 2
Copy the code

Because it was suspended for two seconds and cancelled, not all logs are printed and the context is: DefaultDispatcher

Let’s look at case two

1.7.2 second case

    / / the event source
    fun events(a) = (1.3.)
            .asFlow()
            .onEach { delay(100) }
            .flowOn(Dispatchers.Default)

    @Test
    fun `test flow launch`(a) = runBlocking<Unit> {
        val job = events()
                .onEach { event -> println("Event: $event ${Thread.currentThread().name}")}// .collect {}
LaunchIn (CoroutineScope(dispatchers.io)) // Another context is used here to pass in the Flow
                .launchIn(this)// Flow is passed in using the current context

// delay(200)
 // job.cancelAndJoin()
    }
Copy the code

Running effect

Event: 1 Test worker @coroutine# 2
Event: 2 Test worker @coroutine# 2
Event: 3 Test worker @coroutine# 2
Copy the code

Because here the Flow is passed in the current context, there is no additional need to wait for execution to complete.

1.8 Stream cancellation

1.8.1 Passive Cancellation

    fun simpleFlow6(a) = flow<Int> {
        for (i in 1.3.) {
            delay(1000)
            emit(i)
            println("Emitting $i")}}@Test
    fun `test cancel flow`(a) = runBlocking<Unit> {
        withTimeoutOrNull(2500) {
            simpleFlow6().collect { value -> println(value) }
        }
        println("Done")}Copy the code

Running effect

1
Emitting 1
2
Emitting 2
Done
Copy the code

Here we see that withTimeoutOrNull is used to set the timeout method to cancel and stop execution in case of a timeout.

But this cancellations are all passive cancellations, what if you cancel actively?

1.8.2 Active cancellation


    @Test
    fun `test cancel flow check`(a) = runBlocking<Unit> {(1.. 5).asFlow().collect { value ->
            println(value)
            if (value == 3) cancel()
            println("cancel check ${coroutineContext[Job]? .isActive}")}}Copy the code

Running effect

1
cancel check true
2
cancel check true
3
cancel check false
4
cancel check false
5
cancel check false
Copy the code

Here we see that when we cancel actively, the corresponding state has changed, but all are still executed!

This is because uncheck is not included here!

1.8.3 Canceling traffic detection

    @Test
    fun `test cancel flow check`(a) = runBlocking<Unit> {(1.. 5).asFlow().cancellable().collect { value ->
            println(value)
            if (value == 3) cancel()
            println("cancel check ${coroutineContext[Job]? .isActive}")}}Copy the code

Here we see the cancellable method used!

Run it again and see what happens

1
cancel check true
2
cancel check true
3
cancel check false
Copy the code

OK, here you can see that it has been successfully cancelled! On to the next topic!

1.9 back pressure

But before we do that, let’s look at the original case

    fun simpleFlow8(a) = flow<Int> {
        for (i in 1.3.) {
            delay(100)
            emit(i)
            println("Emitting $i ${Thread.currentThread().name}")}}@Test
    fun `test flow back pressure`(a) = runBlocking<Unit> {
        val time = measureTimeMillis {
            simpleFlow8()
                .collect { value ->
                    delay(300)   // Processing this element takes 300ms
                    println("Collected $value ${Thread.currentThread().name}")
                }
        }
        println("Collected in $time ms")}Copy the code

Let’s see how it works

Collected 1 Test worker @coroutine# 1
Emitting 1 Test worker @coroutine# 1
Collected 2 Test worker @coroutine# 1
Emitting 2 Test worker @coroutine# 1
Collected 3 Test worker @coroutine# 1
Emitting 3 Test worker @coroutine# 1
Collected in 1237 ms
Copy the code

Here we see that this is a very standard producer-consumer pattern, one to one. So try adding different keywords?

1.9.1 buffer (xx)

    @Test
    fun `test flow back pressure`(a) = runBlocking<Unit> {
        val time = measureTimeMillis {
            simpleFlow8()
                .buffer(50)
                .collect { value ->
                    delay(300)   // Processing this element takes 300ms
                    println("Collected $value ${Thread.currentThread().name}")
                }
        }
        println("Collected in $time ms")}Copy the code

Here we see.buffer(50) added!

Let’s see how it works

Emitting 1 Test worker @coroutine# 2
Emitting 2 Test worker @coroutine# 2
Emitting 3 Test worker @coroutine# 2
Collected 1 Test worker @coroutine# 1
Collected 2 Test worker @coroutine# 1
Collected 3 Test worker @coroutine# 1
Collected in 1108 ms
Copy the code

Here we see that the production messages are all stacked together and then sent in a centralized manner, which also takes a little less time than standard.

As is shown in

We can temporarily understand that Buffer (50) lengthens the corresponding transfer channel, so that the transfer channel can hold more elements.

Next, move on to the next operator!

1.9.2 conflate ()

    @Test
    fun `test flow back pressure`(a) = runBlocking<Unit> {
        val time = measureTimeMillis {
            simpleFlow8()
                .conflate()
                .collect { value ->
                    delay(300)   // Processing this element takes 300ms
                    println("Collected $value ${Thread.currentThread().name}")
                }
        }
        println("Collected in $time ms")}Copy the code

Running effect

Emitting 1 Test worker @coroutine# 2
Emitting 2 Test worker @coroutine# 2
Emitting 3 Test worker @coroutine# 2
Collected 1 Test worker @coroutine# 1
Collected 3 Test worker @coroutine# 1
Collected in 800 ms
Copy the code

Here we see that the consumer has not processed all of the elements corresponding to the producer. Move on to the next one!

1.9.3 collectLatest {}

    @Test
    fun `test flow back pressure`(a) = runBlocking<Unit> {
        val time = measureTimeMillis {
            simpleFlow8()
                .collectLatest { value ->
                    delay(300)   // Processing this element takes 300ms
                    println("Collected $value ${Thread.currentThread().name}")
                }
        }

        println("Collected in $time ms")}Copy the code

This time, replace collect with collectLatest, which looks like just the last element

Emitting 1 Test worker @coroutine# 2
Emitting 2 Test worker @coroutine# 2
Emitting 3 Test worker @coroutine# 2
Collected 3 Test worker @coroutine# 5
Collected in 807 ms
Copy the code

If so, the operator will only process the last element! One last summary!

1.9.4 Back pressure summary

  • Buffer (), the code that runs the emission element in the stream concurrently;
  • Conflate (), merges emission items without processing each value;
  • CollectLatest (), cancels and relaunches the last value

2. Operators

2.1 Transition flow operator

2.1.1 case a

    suspend fun performRequest(request: Int): String {
        delay(500)
        return "response $request"
    }
  
    @Test
    fun `test transform flow operator`(a) = runBlocking<Unit> {(1.3.).asFlow()
            .map { request -> performRequest(request) }
            .collect { value -> println(value) }
    }
Copy the code

Running effect

response 1
response 2
response 3
Copy the code

Here we see the data flow as Map and then send each element in turn!

What if you want to send your own custom elements before and after you want to send them?

    suspend fun performRequest(request: Int): String {
        delay(500)
        return "response $request"
    }
  
    @Test
    fun `test transform flow operator`(a) = runBlocking<Unit> {(1.3.).asFlow()
                .transform { request ->
                    emit("Making request $request")
                    emit(performRequest(request))
                }.collect { value -> println(value) }
    }
Copy the code

Here we see the transform used and the emit send element used in the corresponding closure to see how this works

Making request 1
response 1
Making request 2
response 2
Making request 3
response 3
Copy the code

From this effect, we can customize emission elements in this way!

2.1.2 second case

    fun numbers(a) = flow<Int> {
        try {
            emit(1)
            emit(2)
            println("This line will not execute")
            emit(3)}finally {
            println("Finally in numbers")}}@Test
    fun `test limit length operator`(a) = runBlocking<Unit> {
    	//take(2), indicating that the original stream is cancelled when the count element is consumed
        numbers().take(2).collect { value -> println(value) }
    }
Copy the code

Let’s see how it works

1
2
Finally in numbers
Copy the code

You can see from this runtime that when the consumer processes element 2, the corresponding original stream is cancelled!

2.1.3 summary

Transition-flow operators:

  • Operators can be used to transform streams, just as collections and sequences are;
  • The transition operator applies to the upstream flow and returns to the downstream flow;
  • These operators are also cold operators, just like streams. Such operators are not themselves suspend functions
  • It runs fast and returns the definition of the new transformation

2.2 Terminal Operators

The end operator is a suspend function on a stream that the user ** starts the stream collection. **collect is the most basic end operator, but there are other more convenient end operators:

  • To various sets, such as toList and toSet;
  • Gets the first value and ensures that the stream emits a single value
  • Reduce and fold are used to shrink streams to a single value

Having said that, let’s see how it works!

    @Test
    fun `test terminal operator`(a) = runBlocking<Unit> {
        val sum = (1.. 5).asFlow()
            .map {
                println("it * it= ${it * it}")
                it * it
            }
            // Start accumulating values from the first element and apply the operation to the current accumulator value and to each element
            .reduce { a, b ->
                println("a=$a,b=$b,a+b=${a + b}")
                a + b
            }
        println(sum)
    }
Copy the code

Running effect

it * it= 1
it * it= 4
a=1,b=4,a+b=5
it * it= 9
a=5,b=9,a+b=14
it * it= 16
a=14,b=16,a+b=30
it * it= 25
a=30,b=25,a+b=55
55
Copy the code

It’s all in the notes, though a little sparse! Next!

2.3 Combining Multiple Streams

Just like the Kotlin library’s sequence. zip extension function, streams have a ZIP operator for combining related values in two streams!

Don’t say a word, just turn it on!

2.3.1 case a

    @Test
    fun `test zip`(a) = runBlocking<Unit> {
        val numbs = (1.3.).asFlow()
        val strs = flowOf("One"."Two"."Three")
        numbs.zip(strs) { a, b -> "$a -> $b" }.collect { println(it) }
    }
Copy the code

Running effect

1 -> One
2 -> Two
3 -> Three
Copy the code

Very simple, by zip the corresponding stream into one, and then output! So simple, let’s do something a little more complicated!

2.3.2 second case

   @Test
    fun `test zip2`(a) = runBlocking<Unit> {
        val numbs = (1.3.).asFlow().onEach { delay(300)}val strs = flowOf("One"."Two"."Three").onEach { delay(400)}val startTime = System.currentTimeMillis()
        numbs.zip(strs) { a, b -> "$a -> $b" }.collect {
            println("$it at ${System.currentTimeMillis() - startTime} ms from start")}}Copy the code

Ha ha ha, slightly complicated, is the corresponding stream after the additional suspend wait, is the simulation of the corresponding time operation

Running effect

1 -> One at 462 ms from start
2 -> Two at 861 ms from start
3 -> Three at 1269 ms from start
Copy the code

Since this is suspended, not blocked, numBS and STRS do their own work without interfering with each other when they merge emitted elements.

So the base time for each launch is STRS, not numBS plus STRS. I believe readers can further understand the difference between a hang and a block.

2.4 show advection

Streams represent a sequence of values received asynchronously, so it is easy to run into situations where each value triggers a request for another sequence of values. However, due to the asynchronous nature of streams, different flattening modes are required, so there are a series of flattening operators:

  • FlatMapConcat connection mode;
  • FlatMapMerge merge mode;
  • FlatMapLatest latest flatting mode

Let’s see how it works!

2.4.1 flatMapConcat Connection mode

    fun requestFlow(i: Int) = flow<String> {
        emit("$i: First")
        delay(500)
        emit("$i: Second")}@Test
    fun `test flatMapConcat`(a) = runBlocking<Unit> {
        //Flow<Flow<String>>
        val startTime = System.currentTimeMillis()
        (1.3.).asFlow()
            .onEach { delay(100)}//.map { requestFlow(it) }
            .flatMapConcat { requestFlow(it) }
            .collect { println("$it at ${System.currentTimeMillis() - startTime} ms from start")}}Copy the code

Running effect

1: First at 147 ms from start
1: Second at 653 ms from start
2: First at 755 ms from start
2: Second at 1256 ms from start
3: First at 1357 ms from start
3: Second at 1859 ms from start
Copy the code

Here we see that this connects the two streams, similar to the series mode

2.4.2 flatMapMerge Merge mode

    fun requestFlow(i: Int) = flow<String> {
        emit("$i: First")
        delay(500)
        emit("$i: Second")}@Test
    fun `test flatMapMerge`(a) = runBlocking<Unit> {
        //Flow<Flow<String>>
        val startTime = System.currentTimeMillis()
        (1.3.).asFlow()
            .onEach { delay(100)}//.map { requestFlow(it) }
            .flatMapMerge { requestFlow(it) }
            .collect { println("$it at ${System.currentTimeMillis() - startTime} ms from start")}}Copy the code

Running effect

1: First at 166 ms from start
2: First at 261 ms from start
3: First at 366 ms from start
1: Second at 668 ms from start
2: Second at 762 ms from start
3: Second at 871 ms from start
Copy the code

This mode is similar to the conflate mode in the combination + back pressure above, first the producer generates all the elements, and then notifies the consumer to consume!

2.4.3 flatMapLatest Latest flatting mode

    @Test
    fun `test flatMapLatest`(a) = runBlocking<Unit> {
        //Flow<Flow<String>>
        val startTime = System.currentTimeMillis()
        (1.3.).asFlow()
            .onEach { delay(100)}//.map { requestFlow(it) }
            .flatMapLatest { requestFlow(it) }
            .collect { println("$it at ${System.currentTimeMillis() - startTime} ms from start")}}Copy the code

Running effect

1: First at 164 ms from start
2: First at 364 ms from start
3: First at 469 ms from start
3: Second at 971 ms from start
Copy the code

This pattern is similar to the combination +collectLatest pattern above. Very simple!

3. Exception handling

3.1 Exception handling of flows

When an exception is thrown by an emitter or code in an operator, there are several ways to handle exceptions:

  • The try/catch block
  • Catch the function

3.1.1 Case 1 (Try/Catch Block)

    fun simpleFlow(a) = flow<Int> {
        for (i in 1.3.) {
            println("Emitting $i")
            emit(i)
        }
    }

    @Test
    fun `test flow exception`(a) = runBlocking<Unit> {
        try {
            simpleFlow().collect { value ->
                println(value)
                // If the value is false, IllegalStateException and the result of calling lazyMessage are thrown.
                check(value <= 1) { "Collected $value"}}}catch (e: Throwable) {
            println("Caught $e")}}Copy the code

Running effect

Emitting 1
1
Emitting 2
2
Caught java.lang.IllegalStateException: Collected 2
Copy the code

That’s so easy. Next!

3.1.2 Case 2 (Catch Function)

    @Test
    fun `test flow exception2`(a) = runBlocking<Unit> {
        flow {
            emit(1)
            throw ArithmeticException("Div 0")}.catch { e: Throwable -> println("Caught $e") }
            .flowOn(Dispatchers.IO)
            .collect { println(it) }
    }
Copy the code

Now we handle the corresponding exception information directly through the catch {} code block.

Running effect

1
Caught java.lang.ArithmeticException: Div 0
Copy the code

We see that it says, “All the logic is in the catch. Can you come up with that?”

    @Test
    fun `test flow exception2`(a) = runBlocking<Unit> {
        flow {
            emit(1)
            throw ArithmeticException("Div 0")}.catch { e: Throwable ->
            println("Caught $e")
            emit(10)
        }.flowOn(Dispatchers.IO).collect { println(it) }

    }
Copy the code

We see that the correspondence logic is out there, and that both the format and the readability of the code are much better than before

Let’s see how it works

Caught java.lang.ArithmeticException: Div 0
1
10
Copy the code

3.2 Completion of the flow

When the stream collection is complete (normal or abnormal), it may need to perform an action.

  • Imperative finally block
  • OnCompletion declarative processing

3.2.1 Case 1 (Finally Block)

 	fun simpleFlow2(a) = (1.3.).asFlow()
 
    @Test
    fun `test flow complete in finally`(a) = runBlocking<Unit> {
        try {
            simpleFlow2().collect { println(it) }
        } finally {
            println("Done")}}Copy the code

Very simple code block, this will not stick to run the effect of ha, do not want to eventually print Done!

3.2.2 Case 2 (onCompletion)

    @Test
    fun `test flow complete in onCompletion`(a) = runBlocking<Unit> {
        simpleFlow2()
                .onCompletion { println("Done") }
                .collect { println(it) }
    }
Copy the code

Here we remove the try… Finally, and an additional onCompletion code block, which implements the corresponding logic

Let’s see how it works

1
2
3
Done
Copy the code

Obviously the same as above!

conclusion

Well, that’s the end of this piece! I believe the friends who see this have a clear understanding of the asynchronous Flow! In the next article, we will cover the channel-multiplexe-concurrency security of Flow