Android Kotlin coroutine Flow and Channel stuff

Flow

Introduction to the

Flow is a set of apis added after Kotlin Coroutines 1.2.0 alpha. Also known as asynchronous flows, it is a combination of Kotlin Coroutines and responsive programming model.

Flow is Kotlin’s version of RxJava.

However, many of the functions of Flow can be replaced by RxJava. However, Flow provides a more straightforward API than RxJava.

Moreover, Google already uses Flow for Compose, Pagging, ViewModel, and many more. Therefore, it is necessary to learn Flow, otherwise you will not be able to read the Source code of Google.

FlowThe definition of

Similar RxJava. Flow represents a Flow of data. Since it is a data stream, there should be two basic apis for transmitting and receiving.

In Kotlin, Flow is just an interface:

public interface Flow<out T> {
    @InternalCoroutinesApi
    public suspend fun collect(collector: FlowCollector<T>)
}
Copy the code

It has only one method, collect, to receive a FlowCollector.

public interface FlowCollector<in T> {
    public suspend fun emit(value: T)
}
Copy the code

The FlowCollector is also very simple and has only one method, which is to fire a value.

Like RxJava, Flow provides a number of operators in the form of extension functions. Such as

public inline fun <T, R> Flow<T>.map(crossinline transform: suspend (value: T) - >R): Flow<R> = transform { value ->
   return@transform emit(transform(value))
}
Copy the code

To create aFlow

The simplest is created with the top-level function flow:

flow { 
    (1.3.).forEach { emit(it) }
}
Copy the code

It can also be created using the top-level function flowOf:

flowOf(1.2.3)
Copy the code

Or, we can turn a set into a flow

(1.3.).asFlow()
Copy the code

There are many other top-level functions provided by the authorities. Here is not an example. Let’s look at the definition of flow:

public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>. () - >Unit): Flow<T> = SafeFlow(block)
Copy the code

It is worth mentioning that:

  1. Here,blockThe parameters are markedsuspendKeywords. This means that suspended functions can be called internally.
  2. flowtheemitIs thread unsafe, so do not use it internallywithContextTo modify the context of the coroutine. If you need to modify, useflowOn“Are described in more detail below. By default,FlowData transmitting and receiving in the same thread.
  3. FlowYou don’t need a back pressure strategy. By default, data is transmitted and received in the context of a coroutine. You have to send one data to receive one data. However, in useflowOnAnd then things will change,flowOnYou’re going to get aChannelFlowOperatorImpl, please refer to the followingChannelFlowPart.
  4. FlowIs the cold flow. That is, unless calledcollect, otherwise,FlowWill not start transmitting data.

Terminal operator

As mentioned above, flow starts emitting data only after the end operator is called. In addition to collect, common terminal operators include:

  1. Collection conversion type. Such astoList,toSet
  2. Gets a data flow-specific element. Such asfirst,last.
  3. Operator at the end(accumulator). Such asreduce,fold.
  4. Only the latest data is processedthecollectLatest. Unfinished processing of previous data is cancelled when the latest data is received.

The code below adds up the numbers for each launch, resulting in a 6.

flowOf(1.2.3)
    .reduce { accumulator, value -> accumulator + value }
Copy the code

Fold is almost the same as Reduce, except that you can set an initial value.

  1. launchIn. withcollectSimilar, but specifying that the collected code runs in a specific coroutine scope. However,launchInThe data stream at the end is ignored, so a common approach is toonEachUse together.
val scope = CoroutineScope(Dispatchers.IO)

flowOf(1.2.3)
    .onEach { print(it) }   // print 1, 2, 3
    .launchIn(scope)
Copy the code

Intermediate operator

  1. transform

The intermediate operators are used to receive data transmitted upstream, intercept or transform it, and send it downstream. Intermediate operators do not trigger data collection.

public inline fun <T, R> Flow<T>.transform(
    @BuilderInference crossinline transform: suspend FlowCollector<R>. (value: T) - >Unit
): Flow<R> = flow {
    collect { value ->
        return@collect transform(value)
    }
}
Copy the code

The transform implementation only does transformations. In most cases, emit is also required for downstream to receive data. The other data flow transform operators are all extensions of transform.

A small detail, why return is used in the collect method? Specifically goole, and Kotlin compiler bug related. If there is no return, Tail Call Elimination (TCE) optimization is not possible. Please refer to this issue for more information.

  1. transformmap
public inline fun <T, R> Flow<T>.map(crossinline transform: suspend (value: T) - >R): Flow<R> = transform { value ->
   return@transform emit(transform(value))
}
Copy the code

The map operator is used to transform upstream data into new data by the specified transform method and then send it downstream. For example:

flowOf(1.2.3)
    .map { it * 2 }
    .collect { println(it) } // print 2, 4, 6
Copy the code
  1. Smooth the transformationflatMapConcat,flatMapMergeandflatMapLatest

FlatMap has the same functions as Map. But it also serves another purpose, which is to convert upstream data into new data streams, transmitting each data stream in turn.

FlatMapConcat’s data flow is sent synchronously. After one data stream is sent, the next data stream will be sent. FlatMapMerge allows concurrent operations, sending multiple data streams simultaneously. The difference between them can be seen in an example on the website.

First we define a method that takes an Int and converts it to flow

.

private fun requestFlow(i: Int): Flow<String> = flow {
    emit("$i: First")
    delay(500) // Wait 500 milliseconds
    emit("$i: Second")}Copy the code

Create the data flow and transform it using flatMap.

val startTime = System.currentTimeMillis()
flowOf(1.2.3)
    .onEach { delay(100)}/ / delay 100 ms
    .flatMapConcat { requestFlow(it) }
    .collect { Log.d("denny"."at ${System.currentTimeMillis() - startTime} ${Thread.currentThread()} $it")}Copy the code

Notice the timestamp for each of the following logs:

D/denny: at 103 Thread[main @coroutine#5.5,main] 1: First
D/denny: at 604 Thread[main @coroutine#5.5,main] 1: Second
D/denny: at 704 Thread[main @coroutine#5.5,main] 2: First
D/denny: at 1205 Thread[main @coroutine#5.5,main] 2: Second
D/denny: at 1306 Thread[main @coroutine#5.5,main] 3: First
D/denny: at 1807 Thread[main @coroutine#5.5,main] 3: Second
Copy the code

If we replace the above example with flatMapMerge, then the data emission will be concurrent:

D/denny: at 107 Thread[main @coroutine#5.5,main] 1: First
D/denny: at 207 Thread[main @coroutine#5.5,main] 2: First
D/denny: at 308 Thread[main @coroutine#5.5,main] 3: First
D/denny: at 608 Thread[main @coroutine#5.5,main] 1: Second
D/denny: at 708 Thread[main @coroutine#5.5,main] 2: Second
D/denny: at 809 Thread[main @coroutine#5.5,main] 3: Second
Copy the code

FlatMapMerge takes the parameter concurrency, which is the maximum number of concurrency. The default is 16. When concurrency equals 1, there is no difference between concurrency and flatMapMerge.

@FlowPreview
public fun <T> Flow<Flow<T>>.flattenMerge(concurrency: Int = DEFAULT_CONCURRENCY): Flow<T> {
    require(concurrency > 0) { "Expected positive concurrency level, but had $concurrency" }
    return if (concurrency == 1) flattenConcat() else ChannelFlowMerge(this, concurrency)
}
Copy the code

Go ahead and replace the above method with flatMapLatest and print:

D/denny: at 107 Thread[main @coroutine#5.5,main] 1: First
D/denny: at 229 Thread[main @coroutine#5.5,main] 2: First
D/denny: at 332 Thread[main @coroutine#5.5,main] 3: First
D/denny: at 832 Thread[main @coroutine#5.5,main] 3: Second
Copy the code

FlatMapLatest only accepts the latest data. When new data arrives, the collection of previously incomplete data flows is cancelled.

  1. Context switchflowOn

Remember from above that flow emit is not thread safe. For operators that allow data to be sent, internal calls to suspended functions such as withContext to modify the coroutine context are not allowed. Only the flowOn operator is allowed to modify the coroutine context within the data flow.

FlowOn can be called multiple times. The scope is from the current flowOn operator to the previous flowOn (or data source). Data collection is usually performed in the context of flow creation and can be changed using launchIn.

val scope = CoroutineScope(Dispatchers.Main)
flow { emit(1)}// emit on IO
    .flowOn(Dispatchers.IO)
    .onEach { println(it) } // collect on Main
    .launchIn(scope)
Copy the code
  1. Event listenersonEach,onStart,onCompletionandonEmpty
flow { emit(1) }
    .onStart { emit(2)}// Called before the upstream data is emitted, which emits 2 first, then 1
    .onEach { println("onEach $it")}// Perform an operation before each value of the upstream data stream is sent, including the data emitted in onStart
    .onEmpty { println("onEmpty")}// When the upstream data flow completes without any value passing consumption, an operation is triggered to send additional elements. The upstream data stream also includes data sent by operators such as onStart and onCompletion
    .onCompletion { println("onCompletion")}// The operation performed when the upstream data flow completes, cancels, or fails
    .collect { println("collect $it")}Copy the code
  1. Exception handling mechanism

Flow has a variety of exception handling mechanisms. Most intuitively, you can use Kotlin’s try and catch methods to catch exceptions.

try {
    flowOf(1.2.3)
        .onEach { check(it < 2) }
        .collect { println(it) }
} catch (e: Throwable) {
    println(Log.getStackTraceString(e))
}
Copy the code

The above code fires 1, then throws an exception when it fires 2, which is caught by the catch block. After that, the flow no longer emits any values.

Sometimes we will provide a flow to external listeners, which is not a good time to use a try-catch block. However, we still want exceptions to be transparent to the outside world, and consider using the catch operator.

flowOf(1.2.3) .onEach { check(it ! =2)}.catch { println(Log.getStackTraceString(it)) }
    .onCompletion { cause -> println(Log.getStackTraceString(cause)) }
    .collect { println(it) }
Copy the code

Note that in the example above, the onCompletion operator can also listen for exceptions and determine successful completion based on whether cause is null. But unlike catch, onCompletion only accepts exceptions, but does not handle them.

In addition, Kotlin provides retry and retryWhen to retry in the event of an error. I won’t give you any examples here.

  1. mergeconflate

Conflate is designed to process only the latest values, not every value.

flowOf(1.2.3)
    .onEach { delay(100) }
    .conflate()
    .collect {
        println("collect start $it")
        delay(500)
        println("collect end $it")}Copy the code

Output:

D/denny:  Thread[main @coroutine#5.5,main] collect start 1
D/denny:  Thread[main @coroutine#5.5,main] collect end 1
D/denny:  Thread[main @coroutine#5.5,main] collect start 3
D/denny:  Thread[main @coroutine#5.5,main] collect end 3
Copy the code

The comparison is similar to the collectLatest mentioned earlier. CollectLatest also processes only the latest values, but unfinished blocks of code that process the previous data are cancelled when new data arrives.

flowOf(1.2.3)
    .onEach { delay(100) }
    .collectLatest {
        println("collect start $it")
        delay(500)
        println("collect end $it")}Copy the code

This code prints:

D/denny:  Thread[main @coroutine#7.5,main] collect start 1
D/denny:  Thread[main @coroutine#8.5,main] collect start 2
D/denny:  Thread[main @coroutine#9.5,main] collect start 3
D/denny:  Thread[main @coroutine#9.5,main] collect end 3
Copy the code
  1. combinationcombineandzip

Combine is used to combine the latest data from two data sources and send it downstream. Zip sends the corresponding values of the two data sources to the downstream. For example: Using combine to combine two flows with different emission speeds:

val flow1 = flowOf(1.2.3)
    .onEach { delay(50)}val flow2 = flowOf("one"."two"."three")
    .onEach { delay(100) }
flow1.combine(flow2) { arg1, arg2 ->
    "$arg1 $arg2"
}.collect { println(it) }
Copy the code

Output:

D/denny:  Thread[main @coroutine#5.5,main] 1 one
D/denny:  Thread[main @coroutine#5.5,main] 2 one
D/denny:  Thread[main @coroutine#5.5,main] 3 one
D/denny:  Thread[main @coroutine#5.5,main] 3 two
D/denny:  Thread[main @coroutine#5.5,main] 3 three
Copy the code

If combined with ZIP, output:

D/ Denny: Thread[main @coroutine#5,5,main] 1 one D/ Denny: Thread[main @coroutine#5,5,main] 2 two D/ Denny: Thread/main @ coroutine# 5, 5, the main 3 threeCopy the code

And you can see that even though the rates are different, there’s still a one-to-one correspondence.

  1. The cachebuffer

Flows do not have buffers by default. Using this operator, you can add a cache to the flow. When the cache is added, even if the downstream doesn’t have time to consume the data, as long as the cache is not full, the upstream will still send the data.

val flow = flow {
    (1.. 5).forEach {
        println("emit start $it")
        emit(it)
        println("emit end $it")
    }
}.buffer(1)

flow.collect {
    println("collect $it")
    delay(200)}Copy the code

In the above code, the upstream transmitting rate is obviously higher than the downstream, but we have added a cache of capacity 1 for flow. Output:

D/denny:  Thread[main @coroutine#6.5,main] emit start 1
D/denny:  Thread[main @coroutine#6.5,main] emit end 1
D/denny:  Thread[main @coroutine#6.5,main] emit start 2
D/denny:  Thread[main @coroutine#6.5,main] emit end 2
D/denny:  Thread[main @coroutine#6.5,main] emit start 3
D/denny:  Thread[main @coroutine#5.5,main] collect 1
D/denny:  Thread[main @coroutine#5.5,main] collect 2
D/denny:  Thread[main @coroutine#6.5,main] emit end 3
D/denny:  Thread[main @coroutine#6.5,main] emit start 4
D/denny:  Thread[main @coroutine#5.5,main] collect 3
D/denny:  Thread[main @coroutine#6.5,main] emit end 4
D/denny:  Thread[main @coroutine#6.5,main] emit start 5
D/denny:  Thread[main @coroutine#5.5,main] collect 4
D/denny:  Thread[main @coroutine#6.5,main] emit end 5
D/denny:  Thread[main @coroutine#5.5,main] collect 5
Copy the code

As you may have noticed, we added a buffer of 1. So why did you send two data in the first place? As mentioned earlier, the buffer operator essentially returns a channelFlow. So this question will be answered later in the channel section.

  1. Other common operators
  • Filter intermediate operator, which filters the values of the upstream data stream and allows only those values that meet the condition to continue to be passed.
  • The take intermediate operator takes only a specified number of elements from the upstream data stream and passes them downstream, discarding subsequent elements
  • The debounce intermediate operator, which allows only the latest value to be passed downstream for a specified period of time, is used to filter high-frequency producer data upstream.
  • distinctUntilChangedThe intermediate operator, which filters duplicate elements in the upstream data stream, is equivalent toRxJavaWithin thedistinctOperators.

Flow cancellation detection

The builder of the flow calls ensureActive internally to check if the flow has been canceled. If it has been cancelled, a CancellationException is thrown. As in the following code, an exception is thrown when a fourth digit is attempted.

flow { (1.. 5).forEach { emit(it) } }
    .collect { if (it == 3) cancel() }
Copy the code

However, a flow created using extension functions such as intrange.asflow will cancel internal checks for performance reasons. So they emit all the numbers and only throw an exception when they return from runBlocking.

Back pressure

By default, flow data and send are in the same coroutine context, and both send and receive methods are suspended, sending data only when the receiver is ready. However, when the buffer or flowOn operators are used, the flow is converted to channelFlow, which supports back pressure. The specific back pressure strategy will be introduced in channelFlow.

Channel

Introduction to the

Channels predate flows and were originally designed for intercoroutine communication. However, after the emergence of Flow, Channel gradually faded into the background. The figure of Channel can still be seen in the source code of Flow. Its own responsibilities are increasingly single, and it only exists as a concurrent safe buffer queue for communication between coroutines.

Channel design is similar to BlockQueue in Java. The difference is that a Channel does not block a thread, but instead provides the suspend functions send and receive.

Multiple coroutines can send data to the same channel when needed, and data from a channel can be received by multiple coroutines.

When multiple coroutines receive data from the same channel, each element is consumed by one of the consumers only once. The processing element is automatically removed from the channel.

coldFlowHeat,Channel

Unlike Flow, the sender of a Channel sends data even when there is no receiver.

define

A Channel is essentially an interface:

public interface Channel<E> : SendChannel<E>, ReceiveChannel<E>
Copy the code

A Channel cannot be instantiated directly. However, it can be created using the official function:

public fun <E> Channel(
    capacity: Int = RENDEZVOUS,
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,
    onUndeliveredElement: ((E) - >Unit)? = null
): Channel<E>
Copy the code

Parameter is introduced

  1. capacity

Capacity is the capacity of a channel. Kotlin defines several constants for us:

  • RENDEZVOUS

    Data is sent only when called by the consumer, otherwise the send operation is suspended. This is also the default type for a channel.

  • CONFLATED

    If capacity is set to this parameter, the onBufferOverflow parameter must be bufferoverflow.suspend.

    When the buffer is full, it is always replaced with the latest element, and the previous element is discarded. OnBufferOverflow is equal to drop_simple.

  • UNLIMITED

    Unlimited capacity. After the buffer queue is full, the buffer queue will be expanded until OOM.

  • BUFFERED

    By default, a 64-bit buffer queue is created. When the buffer queue is full, data is suspended until the queue is empty.

    We can also pass a value directly to create a channel that specifies the buffer size.

  1. onBufferOverflow

Specifies the backpressure policy when the buffer is full. There are 3 options:

  • SUSPEND a hang
  • DROP_OLDEST discards the oldest element
  • DROP_LATEST Discards the latest element
  1. onUndeliveredElement

Specifies a callback for when data is sent but not received by the receiver.

ChannelBasic usage of

Instead of using send and receive to receive the send and receive elements, you can directly traverse each element in a channel:

val channel = Channel<Int>()
launch {
    for (x in 1.. 5) channel.send(x * x)
    channel.close() // We finish sending
}
// Here we use the 'for' loop to print all received elements (until the channel is closed)
for (y in channel) println(y)
println("Done!")
Copy the code

ChannelIs a fair

The send and receive operations of a Channel are fair. If send and receive are performed in multiple coroutines, they follow the first-in, first-out principle.

Here’s a quiz to help you understand Channel:

fun main(a) = runBlocking<Unit> {
    val channel = Channel<String>()
    launch {  // Coroutine body A
        channel.send("A1")
        channel.send("A2")
        log("A done")
    }
    launch { // Coroutine body B
        channel.send("B1")
        log("B done")
    }
    launch { // Coroutine body C
        repeat(3) {
            val x = channel.receive()
            log(x)
        }
    }
}

fun log(message: Any?). {
    println("[${Thread.currentThread().name}] $message")}Copy the code

The above code starts two sending coroutines, sending a total of three pieces of data, and then receiving and printing the three pieces of data. So what would be the result of this?

The answer:

[main @coroutine#4] A1
[main @coroutine#4] B1
[main @coroutine#2] A done
[main @coroutine#3] B done
[main @coroutine#4] A2
Copy the code

To understand the above results, there are two things to understand:

  • The default coroutine type is RENDEZVOUS, with no buffer. Therefore, send and receive must correspond one by one to succeed; otherwise, the send and receive will be suspended.
  • channelIt’s fair.

The above code is executed in the following order:

  • Coroutine body A tries to send data, but hangs because there are no recipients.
  • Coroutine body B tries to send data, but hangs because there are no recipients.
  • The coroutine receives data from coroutine A and B in turn, prints it, and hangs on receiving the third data because there is no sender.
  • Coroutine body A sends dataA2At this time, there is already a recipient, so send directly. After printingA done, end.
  • Coroutine body B printsB done, end.
  • The coroutine body C accepts the last dataA2And print, and then finish.

Now that you understand this example, you can see why the flow’s buffer was set to 1 and sent two data at the beginning.

Selectexpression

define

Select is a special mechanism in channels. It allows you to wait for multiple suspended functions and return the result of the first one as the result of the select.

Select can be used in a scenario where we need to pull data from both the network and the local cache and use the result returned first for presentation.

Select is defined as follows:

public suspend inline fun <R> select(crossinline builder: SelectBuilder<R>. () - >Unit): R {
    return suspendCoroutineUninterceptedOrReturn { uCont ->
        val scope = SelectBuilderImpl(uCont)
        try {
            builder(scope)
        } catch (e: Throwable) {
            scope.handleBuilderException(e)
        }
        scope.getResult()
    }
}
Copy the code

Select accepts a SelectBuilder as an argument. Let’s look at the SelectBuilder definition again:

public interface SelectBuilder<in R> {
    public operator fun SelectClause0.invoke(block: suspend() - >R)

    public operator fun <Q> SelectClause1<Q>.invoke(block: suspend (Q) - >R)

    public operator fun <P, Q> SelectClause2<P, Q>.invoke(param: P, block: suspend (Q) - >R)
}
Copy the code

SelectBuilder defines a set of extension functions for us that are based on SelectClauseN. So, we use a function that returns a value of the SelectClauseN family of types as a substatement.

A series of selection expressions are officially defined for us:

  • DeferredInterface:
public interface Deferred<out T> : Job {
    public val onAwait: SelectClause1<T>
}
Copy the code
  • ReceiveChannelinterface
public interface ReceiveChannel<out E> {
    public val onReceive: SelectClause1<E>
    
    public val onReceiveCatching: SelectClause1<ChannelResult<E>>
}
Copy the code
  • SendChannelinterface
public interface SendChannel<in E> {
    public val onReceive: SelectClause1<E>

    public val onReceiveCatching: SelectClause1<ChannelResult<E>>
}
Copy the code

Let me give you an example

Here’s an example from the official Kotlin tutorial:

fun CoroutineScope.fizz(a) = produce<String> {
    while (true) { // Send "Fizz" every 300 milliseconds
        delay(300)
        send("Fizz")}}fun CoroutineScope.buzz(a) = produce<String> {
    while (true) { // Send "Buzz!" every 500 milliseconds
        delay(500)
        send("Buzz!")}}suspend fun selectFizzBuzz(fizz: ReceiveChannel<String>, buzz: ReceiveChannel<String>) {
    select<Unit> { // 
      
        means that the SELECT expression does not return any results
      
        fizz.onReceive { value ->  // This is the first select clause
            println("fizz -> '$value'")
        }
        buzz.onReceive { value ->  // This is the second SELECT clause
            println("buzz -> '$value'")}}}Copy the code

Use:

val fizz = fizz()
val buzz = buzz()
repeat(7) {
    selectFizzBuzz(fizz, buzz)
}
coroutineContext.cancelChildren() // Cancel fizz and Buzz coroutines
Copy the code

This code is executed as follows:

fizz -> 'Fizz'
buzz -> 'Buzz! '
fizz -> 'Fizz'
fizz -> 'Fizz'
buzz -> 'Buzz! '
fizz -> 'Fizz'
buzz -> 'Buzz! '
Copy the code

Note that the Select executes the expressions in the SelectBuilder in order. If the first item cannot be executed, the next item is selected, and so on. If you need a completely fair selection expression, use selectUnbiased.

Subsequent TODO

Next, if I have time, I will write more usage and source analysis of ChannelFlow, SharedFlow and StateFlow.

Refer to the article

  • Kotlin official documentation
  • Kotlin Flow manual # (a) basic use
  • Kotlin Flow guide (2) ChannelFlow
  • Most comprehensive Kotlin coroutines: Coroutine/Channel/Flow and practical applications
  • KotlinTutorials