What is an “asynchronous data flow”? In what business scenario is it useful? What’s the principle behind it? Read the source code for Flow and try to answer these questions.

Synchronous & Asynchronous & Continuous asynchronous

Synchronous and asynchronous are used to describe “calls” :

  • Synchronous call: When the call initiator triggers a synchronous call, it waits until the call completes and returns a result before continuing with the subsequent code. Obviously, this serialization effect only occurs when the caller and the called code are executing in the same thread.
  • Asynchronous call: When the call initiator triggers an asynchronous call, it does not wait for the code in the asynchronous call to complete, because the asynchronous call returns immediately, but does not contain the execution result, which is asynchronously notified to the caller. This parallel execution effect occurs when the caller’s and the called’s code are executed in different threads.

Asynchronous calls are ubiquitous in App development and often place time-consuming operations, such as writing to a file, in another thread:

suspend fun writeFile(content: String) { 
    / / write file
}

// Start the coroutine write file
val content = "xxx"
coroutineScope.launch { wirteFile(content) } 
Copy the code

The suspend method in Kotlin is used to express an asynchronous process. How to express “multiple continuously generated asynchronous processes”?

The for loop is the first solution that comes to mind:

val contents = listOf<String>(...) // Multiple strings to be written to the file
contents.forEach { string ->
    coroutineScope.launch { writeFile(string) }
}
Copy the code

The prerequisite for using a for loop is to get all the data that needs to be operated asynchronously. But in the case of “multiple continuously generated data,” the data is generated bit by bit and can’t be obtained all at once. For example, “Count down 1 minute, do a time-consuming operation every 2 seconds, after the end of the time, all the operation results will be accumulated and printed on the main thread”. This is the time to rethink the problem with “asynchronous data flows.”

Asynchronous data flows explain this scenario in terms of a “producer/consumer” model: the timer is the producer in this scenario, and it generates new data every two seconds. The accumulator is the consumer in this scenario, and it accumulates all the asynchronous data. It is as if there is a pipeline between the producer and the consumer, with the producer inserting data from one end of the pipeline and the consumer fetching data from the other. Because of the pipeline, the data is in order, on a first-in, first-out basis.

Traditional scheme

Before presenting a solution to Flow, let’s take a look at the traditional solution.

The first step is to implement a timer that can perform asynchronous operations at intervals in an asynchronous thread. A thread pool is perfect for this:

// Pour the timer
class Countdown<T>(
    private var duration: Long.// The countdown is long
    private var interval: Long.// Countdown interval
    private val action: (Long) -> T // Count down the background tasks
) {
    // The total value of the task results
    var acc: Any? = null 
    // Count down the remaining time
    private var remainTime = duration 
    // Task callback starts
    var onStart: (() -> Unit)? = null 
    // Task end callback
    var onEnd: ((T?) -> Unit)? = null 
    // Task result accumulator
    var accumulator: ((T, T) -> T)? = null 
    // Countdown task wrapping class
    private val countdownRunnable by lazy { CountDownRunnable() }
    // Handler for the main thread callback
    private val handler by lazy { Handler(Looper.getMainLooper()) } 
    / / thread pool
    private val executor by lazy { Executors.newSingleThreadScheduledExecutor() } 

    // Start the countdown
    fun start(delay: Long = 0) {
        if (executor.isShutdown) return
        // Callback to the main thread to start the countdown
        handler.post(onStart)
        executor.scheduleAtFixedRate(countdownRunnable, delay, interval, TimeUnit.MILLISECONDS)
    }

    // Wrap the countdown task as a Runnable
    private inner class CountDownRunnable : Runnable {
        override fun run(a) {
            remainTime -= interval
            // Execute the background task and get the return value
            val value = action(remainTime)
            // Add the value returned by the task
            acc = if (acc == null) value elseaccumulator? .invoke(accas T, value)
            if (remainTime <= 0) {
                // Close the countdownexecutor? .shutdown()// Callback the main thread to the end of the countdownhandler.post { onEnd? .invoke(accas? T) }
            }
        }
    }
}
Copy the code

Countdown is abstracted to execute background Countdown tasks. It uses scheduleAtFixedRate() to construct a thread pool and execute Countdown tasks at regular intervals.

The external countdown task is expressed as (Long) -> T, that is, the input countdown time outputs the result of the asynchronous task lambda. Internally it is packaged as a Runnable to implement the countdown and accumulation logic in the run() method.

Then you can use it like this:

Countdown(60 _000.2 _000) { remianTime -> calculate(remianTime) }.apply {
    onStart = { Log.v("test"."countdown start") }
    onEnd = { ret -> Log.v("test"."countdown end, ret=$ret") }
    accumulator = { acc, value -> acc + value }
}.start()
Copy the code

You have to introduce some complexity, such as thread pools, handlers, and accumulators. But thanks to class encapsulation and the sugar of Kolin syntax, the final form of the call is succinct.

The Flow scheme

Using Flow eliminates this complexity:

fun <T> countdown(
    duration: Long, 
    interval: Long, 
    onCountdown: suspend (Long) - >T
): Flow<T> =
    flow { (duration - interval downTo 0 step interval).forEach { emit(it) } }
        .onEach { delay(interval) }
        .onStart { emit(duration) }
        .map { onCountdown(it) }
        .flowOn(Dispatchers.Default)
Copy the code

Defines a top-level method countdown() that returns a stream instance to produce a countdown in an asynchronous thread and passes the countdown to the asynchronous task onCountdown() for execution. Then you can use it like this:

val mainScope = MainScope()
mainScope.launch {
    val ret = countdown(60 _000.2 _000) { remianTime -> calculate(remianTime) }
        .onStart { Log.v("test"."countdown start") }
        .onCompletion { Log.v("test"."countdown end") }
        .reduce { acc, value -> acc + value }
    Log.v("test"."coutdown acc ret = $ret")}Copy the code

Let’s start with the source code and analyze the principles behind the flow solution bit by bit.

How does Flow produce and consume data?

The definition of Flow is extremely simple, consisting of only two interfaces:

public interface Flow<out T> {
    public suspend fun collect(collector: FlowCollector<T>)
}
Copy the code

Flow is an interface that defines a collect() method that means “streams can be collected”, and a collector is also an interface:

public interface FlowCollector<in T> {
    public suspend fun emit(value: T)
}
Copy the code

The stream collector interface defines an emit() method that means “the stream collector can emit data”.

Using the “producer/consumer” model, it can be understood that the data in the stream can be consumed and the stream collector can produce the data.

A simple production and consumption data scenario:

// Start the coroutine
GlobalScope.launch {
    / / build flow
    flow { // Define how streams produce data
        (1.3).forEach {
            // Emit one number every 1 second
            delay(1000)
            emit(it)
        }
    }.collect { // Define how to consume data
        Log.v("test"."num=$it") // Print the number}}Copy the code
  • A flow is constructed with flow{block}, which is a top-level method:

    // Build a secure stream (pass in a block to define how the stream data is produced)
    public fun <T> flow(block: suspend FlowCollector<T>. () - >Unit): Flow<T> =
        SafeFlow(block)
    
        // Secure streams inherit from abstract streams
        private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() {
        override suspend fun collectSafely(collector: FlowCollector<T>) {
            collector.block()// Call block when collecting flow data, which triggers production data}}/ / abstract flow
    public abstract class AbstractFlow<T> : Flow<T>, CancellableFlow<T> {
        // Collect data concrete implementation
        public final override suspend fun collect(collector: FlowCollector<T>) {
            // Build the FlowCollector and pass in the collectSafely()
            val safeCollector = SafeCollector(collector, coroutineContext)
            try {
                collectSafely(safeCollector)
            } finally {
                safeCollector.releaseIntercepted()
            }
        }
        public abstract suspend fun collectSafely(collector: FlowCollector<T>)
    }
    Copy the code

    The block in flow {block} defines how data is produced, and the block is called in collect(). So the data in the stream is not automatically produced until the moment the stream is collected.

  • The flow is collected by collecting {action}, where the action defines how the data is consumed. Collect () is an extension of Flow:

    public suspend inline fun <T> Flow<T>.collect(crossinline action: suspend (value: T) - >Unit): Unit =
        collect(object : FlowCollector<T> {
            override suspend fun emit(value: T) = action(value)
        })
    Copy the code

    A new stream collector is created while the data is being collected. The stream collector can transmit the data by passing it directly to the Action, the data consumer.

Put these two points together:

  1. The data in the stream is not automatically produced until the moment the stream is collected. At the moment the stream is collected, the data is produced and emitted, passing it to the consumer via the stream collector.

  2. Streams and stream collectors are paired concepts. A stream is a set of data that is produced sequentially by transmitting data through a stream collector, which here acts as a stream data container (although it does not hold any data) and defines how the data is passed to the consumer.

So the above example code is no different than the following synchronous call:

// Producer-consumer pseudocode
flow {
    emit(data) / / production
}.collect { 
    action(data) / / consumption
}

// Producer consumer actual call chain
Flow.collect {
    emit(data) {
        action(data)}}Copy the code

After some lambda abstractions, it looks as if the producer and consumer are separated, but in fact they are synchronous invocation chains running in the same thread, namely:

By default, production and consumption of data in a stream occur in the same thread.

Now take a look at how the countdown stream produces and consumes data:

fun <T> countdown(
    duration: Long, 
    interval: Long, 
    onCountdown: suspend (Long) - >T
): Flow<T> =
    flow { (duration - interval downTo 0 step interval).forEach { emit(it) } }
        .onEach { delay(interval) }
        .onStart { emit(duration) }
        .map { onCountdown(it) }
        .flowOn(Dispatchers.Default)
Copy the code

The first sentence of the countdown() method defines how data is produced in the countdown stream:

flow { (duration - interval downTo 0 step interval).forEach { emit(it) } }
Copy the code

Flow {} builds a flow instance. Internally, a sequence of values from duration-interval to 0 step is created, which is traversed while calling emit() to emit each value.

After the stream instance is created, a series of methods are called in chain, but there is no collect(). Does that mean that the countdown() method only defines how to produce data but not how to consume it?

Collect () is the consumer of stream data, and the pipeline between producers and consumers can insert “intermediate consumers”, which preferentially consume upstream data before forwarding it to the downstream. It is these intermediate consumers that enable flow to produce an infinite variety of gameplay.

Intermediate consumer

transform()

Transform (), one of the most common intermediate consumers, is an extension of Flow:

public inline fun <T, R> Flow<T>.transform(
    crossinline transform: suspend FlowCollector<R>. (value: T) - >Unit
): Flow<R> = 
    // Build downstream streams
    flow {
        // Collect upstream data (this logic is called when the downstream stream is collected)
        collect { value ->
            // Process upstream data
            return@collect transform(value)
        }
}
Copy the code

Transform () does three things: it builds a new stream (the downstream stream), and when the downstream stream is collected, it immediately collects the upstream stream, and when the upstream data is collected, it passes it to the Transform lambda.

FlowCollector

.(value: T) -> Unit is a lambda with a receiver, which is FlowCollector. When this labMDA is called, you need to specify the receiver. In the context of transform(), the receiver is this, so it is omitted. If it is completed, it will look like this:

public inline fun <T, R> Flow<T>.transform(
    crossinline transform: suspend FlowCollector<R>. (value: T) - >Unit
): Flow<R> = 
    // Build downstream streams
    flow { this ->
        collect { value ->
            return@collect this.transform(value)
        }
}

// The lambda in the flow {} of the build flow is also with a receiver
public fun <T> flow(block: suspend FlowCollector<T>. () - >Unit): Flow<T> = 
    SafeFlow(block)
Copy the code

It is good to have the FlowCollector as the receiver, so that it is easy to access flowCollector-emit () in the lambda, where transform() leaves the policy of “how the downstream stream produces the data” to the incoming lambda. For a detailed explanation of the strategy mode, click on a sentence to summarize the design mode of all paths to the same destination: factory mode =? Policy mode =? Template method pattern), so it can be concluded that:

Transform () establishes a mechanism for intercepting and forwarding on a stream: create a new downstream stream that generates data by collecting upstream data and forwarding it to a lambda with the ability to transmit data. Transform (), the intermediate consumer, intercepts upstream data and then transforms it to the downstream consumer at will.

OnEach () & Map () & Custom intermediate consumer

So transform() is usually used to define a new intermediate consumer, and the definition of onEach() relies on it:

public fun <T> Flow<T>.onEach(action: suspend (T) - >Unit): Flow<T> = transform { value ->
    action(value)
    return@transform emit(value)
}
Copy the code

All intermediate consumers are defined as extension methods to Flow and all return a newly created downstream Flow. This is done so that different intermediate consumers can be easily chained together by chaining calls.

OnEach () builds a downstream stream through transform() and does one extra thing before forwarding each upstream stream, represented by a lambda action.

Map () is also implemented by transform() :

public inline fun <T, R> Flow<T>.map(crossinline transform: suspend (value: T) - >R): Flow<R> = 
    transform { value -> return@transform emit(transform(value)) }
Copy the code

Map () builds a downstream stream through transform(), and transforms the upstream stream data before forwarding it.

The transform() mechanism makes it easy to customize an intermediate consumer:

fun <T, R> Flow<T>.filterMap(
    predicate: (T) - >Boolean, 
    transform: suspend (T) - >R
): Flow<R> = 
    transform { value -> if (predicate(value)) emit(transform(value)) }
Copy the code

FilterMap () only transforms and emits upstream data that meets the predicate condition.

onStart()

OnStart () is also an intermediate consumer, but instead of using transform(), it builds a downstream stream with unsafeFlow() :

public fun <T> Flow<T>.onStart(
    action: suspend FlowCollector<T>. () - >Unit
): Flow<T> = unsafeFlow { // Build downstream streams
    val safeCollector = SafeCollector<T>(this, currentCoroutineContext())
    try {
        safeCollector.action() // Perform the action before collecting upstream stream data
    } finally {
        safeCollector.releaseIntercepted()
    }
    collect(this) // Collect upstream stream data
}

internal inline fun <T> unsafeFlow(crossinline block: suspend FlowCollector<T>. () - >Unit): Flow<T> {
    // Build the new stream
    return object : Flow<T> {
        override suspend fun collect(collector: FlowCollector<T>) {
            collector.block()
        }
    }
}
Copy the code

UnsafeFlow () directly instantiates the Flow interface and defines what to do when the Flow is collected, which is to call a block. So unsafeFlow() and Transform are similar in that they both create a new downstream stream to collect upstream data, but they do one extra thing before collecting the action (before all the data is emitted).

onCompletion()

public fun <T> Flow<T>.onCompletion(
    action: suspend FlowCollector<T>. (cause: Throwable?). ->Unit
): Flow<T> = unsafeFlow { // Build downstream streams
    try {
        collect(this) // 1. Collect upstream stream data first
    } catch (e: Throwable) {
        ThrowingCollector(e).invokeSafely(action, e)
        throw e
    }
    val sc = SafeCollector(this, currentCoroutineContext())
    try {
        sc.action(null) // 2. Perform the action again
    } finally {
        sc.releaseIntercepted()
    }
}
Copy the code

The implementation of onCompletion() is similar to onStart(), except that the action is performed after the data has been collected.

Because onStart() and onCompletion() are both implemented as downstream closets upstream, except in the order in which the data is collected and the action is performed, they would have the following interesting effect:

GlobalScope.launch {
    flow { 
        (1.3).forEach {
            delay(1000)
            emit(it)
        }
    }.onStart { Log.v("test"."start1") }
        .onStart { Log.v("test"."start2") }
        .onCompletion { Log.v("test"."complete1") }
        .onCompletion { Log.v("test"."complete2") }
        .collect { Log.v("test"."$it")}}Copy the code

The output of the above code is as follows:

start2
start1
1
2
3
complete1
complete2
Copy the code

When more than one onStart {action} occurs in a chained call, the next action is executed first, because the downstream flow package built by the subsequent onStart is outside the upstream onStart, and the action is executed before the upstream flow data is collected.

This conclusion does not apply to onCompletion {action}. Although the downstream flow built by onCompletion is wrapped around the upstream onCompletion, the action is always executed before the upstream flow is collected.

End consumer

All of the above extension methods are called “intermediate consumers” because they build a new downstream stream, and they collect upstream streams only when downstream streams are collected. That is, if the downstream flow is not collected, the data in the flow will never be emitted, a feature known as cold flow.

Look at an example of cold flow:

// Execution
suspend fun get(a): List<String> = 
    listof("a"."b"."c").onEach { 
        delay(1000)
        print(it)
    }

// Declarative
fun get(a): Flow<String> = 
    flowOf("a"."b"."c").onEach { 
        delay(1000)
        print(it)
    }
Copy the code

When the two get() methods are called separately, the first gets prints out the result immediately, while the second prints nothing. Because the second get() simply declares how to build a cold stream, it is not collected and therefore does not emit data.

Flow is a cold Flow that does not emit data until it is collected, so cold Flow is “declarative”.

All consumers who can trigger the action of collecting data are called end consumers, which are like sparks lighting firecrackers, causing streams of intermediate consumer dolls to be collected one by one from the outside to the inside (from downstream to upstream) and finally transmitted to the original stream, triggering the emission of data.

The countdown demo’s Reduce () is an end consumer:

val mainScope = MainScope()
mainScope.launch {
    val ret = countdown(60 _000.2 _000) { io(it) }
        .onStart { Log.v("test"."countdown start") }
        .onCompletion { Log.v("test"."countdown end") }
        .reduce { acc, value -> acc + value } // End consumer: calculates the sum of all asynchronous results
    // Because reduce() is a suspend method, the coroutine is suspended and the sum of all asynchronous results is printed until the countdown is complete
    Log.v("test"."coutdown acc ret = $ret")}Copy the code

The source code for reduce() is as follows:

public suspend fun <S, T : S> Flow<T>.reduce(
    operation: suspend (accumulator: S.value: T) - >S // The cumulative algorithm
): S {
    var accumulator: Any? = NULL
    // Collect data
    collect { value ->
        // Add the collected data
        accumulator = if(accumulator ! == NULL) { operation(accumulatoras S, value)
        } else {
            value
        }
    }

    if (accumulator === NULL) throw NoSuchElementException("Empty flow can't be reduced")
    // Return the sum
    return accumulator as S
}
Copy the code

Instead of building a new stream, Reduce () collects the data directly, then adds it all up and returns it.

All end consumers are suspend methods, which means that data collection must take place in coroutines. The demo uses MainScope to start the coroutine, so the sum of the asynchronous results is printed in the main thread.

thread

The last one left in the demo is flowOn(), which is the intermediate consumer and is a bit complicated, but will be analyzed next time due to space constraints. But that doesn’t stop you from understanding its effect: it switches all the upstream threads of code execution, but doesn’t change the downstream threads of code execution.

The countdown() method implements the countdown task in the background through flowOn(dispatchers.default). The reduce() call occurs after flowOn(), so the asynchronous task result is still accumulated on the main thread.

OnStart (), onEach(), onCompletion(), Map (), reduce(), the consumer processing of the data, are wrapped in lambdas decorated with Suspend. This means that using coroutines you can easily switch which threads each consumer is running on. It is precisely because of suspend that downstream consumers running in the same thread do not experience backpressure because the downstream consumer’s suspend method naturally blocks the speed of upstream production data.

conclusion

  1. An asynchronous data flow can be understood as data generated sequentially on a timeline and can be used to express multiple continuous asynchronous processes.
  2. Asynchronous data flows can also be understood using a “producer/consumer” model, as if there is a pipeline between the producer and the consumer, with the producer inserting data from one end of the pipeline and the consumer fetching data from the other. Because of the pipeline, the data is in order, on a first-in, first-out basis.
  3. In the KotlinsuspendMethod is used to express an asynchronous procedure, andFlowUsed to express multiple consecutive asynchronous processes.FlowIs the cold flow, the cold flow does not emit data until the moment it is collected, so the cold flow is “declarative”.
  4. whenFlowThe moment the data is collected, the data is produced and sent out through the stream collectorFlowCollectorPass it on to the consumer. Streams and stream collectors are paired concepts. A stream is a set of data that is produced sequentially by transmitting data through a stream collector, which here acts as a stream data container (although it does not hold any data) and defines how the data is passed to the consumer.
  5. An intermediate consumer can be inserted between a producer and a consumer in an asynchronous data flow. The intermediate consumer establishes an interception and forward mechanism on the stream: a new downstream stream that produces data by collecting upstream data and forwarding it to a lambda with the ability to transmit data. A stream with multiple intermediate consumers is like a “nesting doll”, with the downstream stream outside the upstream stream. The intermediate consumer intercepts the original data in this way, and can make any changes to it before forwarding it to the downstream consumer.
  6. All consumers who can trigger the action of collecting data are called end consumers, which are like sparks lighting firecrackers, causing the flow of intermediate consumer dolls to be collected one by one from the outside to the inside (from the downstream to the upstream) and finally transmitted to the original flow, triggering the emission of data.
  7. By default, production and consumption of data in a stream occur in the same thread. But it can be passedflowOn()Changing the thread of execution of the upstream stream does not affect the thread of execution of the downstream stream.
  8. FlowIn the production and consumption of data operations are wrapped in the use of suspend decorated lambdas, with coroutines can easily achieve asynchronous production, asynchronous consumption.

The next article will continue to introduce how to use Flow to achieve Flow limiting, welcome to pay attention, get timely update reminders ~

Recommended reading

  • Kotlin base | entrusted and its application
  • Kotlin basic grammar | refused to noise
  • Kotlin advanced | not variant, covariant and inverter
  • Kotlin combat | after a year, with Kotlin refactoring a custom controls
  • Kotlin combat | kill shape with syntactic sugar XML file
  • Kotlin base | literal-minded Kotlin set operations
  • Kotlin source | magic weapon to reduce the complexity of code
  • Why Kotlin coroutines | CoroutineContext designed indexed set? (a)
  • Kotlin advanced | the use of asynchronous data stream Flow scenarios