LiveData provides the foundation for responsive programming and builds a data observer framework. However, it is a little thin compared to asynchronous frameworks like RxJava, which is often criticized. Therefore, Flow is born.

Flow, as an asynchronous data Flow framework, is almost equal to RxJava, but with the help of Kotlin syntax sugar and coroutines, and Kotlin’S DSL syntax, you can write Flow very succinctly. It allows you to face the best of human nature, and all the darkness and ugliness is digested by the compiler. Moreover, Flow, as an evolutionary version of LiveData, can be well combined with JetPack as a member of the family bucket, contributing to the unified architecture.

To understand FLow, we first need to understand various operators and basic functions of FLow. If we do not understand these, it will be difficult to use FLow flexibly. Therefore, this section mainly summarizes the basis of FLow.

Flow preface

First, let’s take a look at a new concept — cold Flow and heat Flow. If you read articles about Flow on the Internet, chances are you’ll hear this term.

Flow is cold in the morning, and Channel is hot.

An asynchronous data stream usually consists of three parts:

  • The upstream
  • The operator
  • The downstream

The so-called cold flow, that is, when the downstream has no consumption behavior, the upstream will not generate data, only when the downstream begins to consume, the upstream will generate data from the beginning.

And the so-called heat flow, that is, upstream will generate their own data regardless of whether there is consumption behavior downstream.

Flow operator

Flow, like RxJava, holds up half of the asynchronous data Flow framework with operators. Flow Is a cold Flow by default. That is, a production operation is performed only when downstream consumption occurs.

Therefore, operators are also divided into two types — intermediate operators, which produce no consumption behavior and return Flow, and terminal operators, which produce consumption behavior and trigger production of the Flow.

The Flow of creation

Just creating a Flow does not execute any code in the Flow, but first, let’s look at how to create a Flow.

  • flow

With the Flow {} constructor, you can quickly create a flow, where you can use emit to produce data (or emitAll to produce batch data), as shown in the following example.

flow { for (i in 0.. 3) { emit(i.toString()) } }Copy the code
  • flowOf

Similar to listOf, Flow can be flowOf to produce limited known data.

flowOf(1, 2, 3)
Copy the code
  • asFlow

AsFlow is used to convert a List into a Flow.

ListOf (1, 2, 3). AsFlow ()Copy the code
  • emptyFlow

As shown in the problem, create an empty stream.

Terminal operator

The end operator is called before the code that creates the Flow is executed, much like Sequence.

  • collect

Collect is the most commonly used end operator, as shown in the following example.

The end operators are suspend functions, so they need to be run in coroutine scope.

MainScope().launch { val time = measureTimeMillis { flow { for (i in 0.. 3) { Log.d("xys", "emit value---$i") emit(i.toString()) } }.collect { Log.d("xys", "Result---$it") } } Log.d("xys", "Time---$time") }Copy the code
  • collectIndexed

Collect with subscript, subscript is emit order in Flow.

MainScope().launch { val time = measureTimeMillis { flow { for (i in 0.. 3) { Log.d("xys", "emit value---$i") emit(i.toString()) } }.collectIndexed { index, value -> Log.d("xys", "Result in $index --- $value") } } Log.d("xys", "Time---$time") }Copy the code
  • collectLatest

CollectLatest Deletes unprocessed data and saves only the latest production data.

flowOf(1, 2, 3).collectLatest {
    delay(1)
    Log.d("xys", "Result---$it")
}
Copy the code
  • ToCollection, toSet, toList

These operators are used to convert flows to collections, sets, and lists.

  • launchIn

Execute Flow directly in the specified coroutine scope.

flow { for (i in 0.. 3) { Log.d("xys", "emit value---$i") emit(i.toString()) } }.launchIn(MainScope())Copy the code
  • Last, lastOrNull, first, firstOrNull

Return the last value of Flow (the first value), except that last raises an exception if last is null, whereas lastOrNull can be null.

flow { for (i in 0.. 3) { emit(i.toString()) } }.last()Copy the code

State operator

The state operator does not change anything, but simply returns the state at the appropriate node.

  • OnStart: called before upstream production data
  • OnCompletion: Called when the flow completes or cancels
  • OnEach: called before each emit upstream
  • OnEmpty: called when no data has been generated in the stream
  • Catch: Catches an exception in the upstream
  • Retry and retryWhen: Retries when exceptions occur. RetryWhen indicates the retry times and exceptions
MainScope().launch { Log.d("xys", "Coroutine in ${Thread.currentThread().name}") val time = measureTimeMillis { flow { for (i in 0.. 3) { emit(i.toString()) } throw Exception("Test") }.retryWhen { _, retryCount -> retryCount <= 3 }.onStart { Log.d("xys", "Start Flow in ${Thread.currentThread().name}") }.onEach { Log.d("xys", "emit value---$it") }.onCompletion { Log.d("xys", "Flow Complete") }.catch { error -> Log.d("xys", "Flow Error $error") }.collect { Log.d("xys", "Result---$it") } } Log.d("xys", "Time---$time") }Copy the code

In addition, onCompletion can listen for exceptions, as shown below.

.onCompletion { exception ->
    Log.d("xys", "Result---$exception")
}
Copy the code

The Transform operator

As with RxJava, in data flows, operators can be used to transform the data to meet the different needs of the flow of operations.

  • Map, mapLatest, mapNotNull

The map operator converts the Flow input to the new output through blocks.

flow { for (i in 0.. 3) { emit(i) } }.map { it * it }Copy the code
  • The transform, transformLatest

The transform operator is a bit like the Map operator, but not exactly like the Map operator, which is a one-to-one transformation, while the transform operator has complete control over the flow of data, filtering, regrouping, and so on.

flow { for (i in 0.. 3) { emit(i) } }.transform { value -> if (value == 1) { emit("!!! $value!!!" ) } }.collect { Log.d("xys", "Result---$it") }Copy the code
  • transformWhile

The transformWhile returns a bool that controls the flow’s truncation. If true, the flow continues, or if false, the flow is truncated.

flow { for (i in 0.. 3) { emit(i) } }.transformWhile { value -> emit(value) value == 1 }.collect { Log.d("xys", "Result---$it") }Copy the code

Filter operator

As mentioned above, the filter operator is used to filter data in a stream.

  • Filter, filterInstance, filterNot, filterNotNull

Filter operators can operate on conditions, types, or conditions such as filter negation or non-null.

flow { for (i in 0.. 3) { emit(i) } }.filter { value -> value == 1 }.collect { Log.d("xys", "Result---$it") }Copy the code
  • Drop, drop while, take, takeWhile

Such operators can discard the first n data, or take only the first n data. With the suffix while, it means to judge according to conditions.

  • debounce

The debounce operator is used for shock protection and accepts only the latest value for a specified period of time.

  • sample

The sample operator is similar to the debounce operator, but limits a cycle time. The sample operator fetches the latest data for a cycle, which can be interpreted as adding a cycle limit.

  • distinctUntilChangedBy

The unduplicate operator can be used to unduplicate arguments of the specified type.

Combinatorial operator

The composition operator is used to combine data from multiple flows.

  • Combine, combineTransform

The Combine operator can join two different flows.

val flow1 = flowOf(1, 2).onEach { delay(10) }
val flow2 = flowOf("a", "b", "c").onEach { delay(20) }
flow1.combine(flow2) { i, s -> i.toString() + s }.collect {
    Log.d("xys", "Flow combine: $it")
}
Copy the code

The output is:

D/xys: Flow combine: 1a
D/xys: Flow combine: 2a
D/xys: Flow combine: 2b
D/xys: Flow combine: 2c
Copy the code

It can be found that when the number of two flows is different, the latest element of Flow1 is always started and combined with the latest element of Flow2 to form a new element.

  • merge

The merge operator is used to merge multiple streams.

val flow1 = flowOf(1, 2).onEach { delay(10) }
val flow2 = flowOf("a", "b", "c").onEach { delay(20) }
listOf(flow1, flow2).merge().collect {
    Log.d("xys", "Flow merge: $it")
}
Copy the code

The output is:

D/xys: Flow merge: 1
D/xys: Flow merge: 2
D/xys: Flow merge: a
D/xys: Flow merge: b
D/xys: Flow merge: c
Copy the code

The output of the merge is to emit multiple streams in chronological order.

  • zip

The ZIP operator takes values from both streams, and the zip process is complete when data is fetched from one stream.

val flow1 = flowOf(1, 2).onEach { delay(10) }
val flow2 = flowOf("a", "b", "c").onEach { delay(20) }
flow1.zip(flow2) { i, s -> i.toString() + s }.collect {
    Log.d("xys", "Flow zip: $it")
}
Copy the code

The output is:

D/xys: Flow zip: 1a
D/xys: Flow zip: 2b
Copy the code

thread

In Flow, you can simply use flowOn to specify thread switching. FlowOn applies to all operators upstream and before flowOn.

flow { for (i in 0.. 3) { Log.d("xys", "Emit Flow in ${Thread.currentThread().name}") emit(i) } }.map { Log.d("xys", "Map Flow in ${Thread.currentThread().name}") it * it }.flowOn(Dispatchers.IO).collect { Log.d("xys", "Collect Flow in ${Thread.currentThread().name}") Log.d("xys", "Result---$it") }Copy the code

In this case, both flow and map operations are performed in child threads.

And if so:

flow { for (i in 0.. 3) { Log.d("xys", "Emit Flow in ${Thread.currentThread().name}") emit(i) } }.flowOn(Dispatchers.IO).map { Log.d("xys", "Map Flow in ${Thread.currentThread().name}") it * it }.collect { Log.d("xys", "Collect Flow in ${Thread.currentThread().name}") Log.d("xys", "Result---$it") }Copy the code

The map will then be executed on the main thread.

At the same time, you can call flowOn multiple times to keep switching threads, so that the previous operators are executed in different threads.

Cancel the Flow

Flow can also be cancelled, most commonly with withTimeoutOrNull, as shown below.

MainScope().launch { withTimeoutOrNull(2500) { flow { for (i in 1.. 5) { delay(1000) emit(i) } }.collect { Log.d("xys", "Flow: $it") } } }Copy the code

So when output 1, 2, the Flow is cancelled.

The cancellation of Flow actually depends on the cancellation of coroutines.

A synchronous non-blocking model for Flow

By default, Flow runs in a thread specified in the coroutine scope without switching threads. This is synchronization. What is non-blocking? Emit and collect are both suspend functions. Suspend functions suspend CPU resources, which is non-blocking. Suspend functions suspend CPU resources. Give it to some other function that needs to be executed, and when it’s done, give it back to me.

So, let’s look at this example.

flow { for (i in 0.. 3) { emit(i) } }.onStart { Log.d("xys", "Start Flow in ${Thread.currentThread().name}") }.onEach { Log.d("xys", "emit value---$it") }.collect { Log.d("xys", "Result---$it") }Copy the code

The output is:

D/xys: Start Flow in main
D/xys: emit value---0
D/xys: Result---0
D/xys: emit value---1
D/xys: Result---1
D/xys: emit value---2
D/xys: Result---2
D/xys: emit value---3
D/xys: Result---3
Copy the code

Emit one, collect one, this is synchronous non-blocking, mutual humility, so that anyone can execute, it seems that the code in flow and the code in collect are synchronous execution.

Asynchronous non-blocking model

If we add a thread switch to the Flow and let the Flow execute on a child thread, let’s look at the execution again.

flow { for (i in 0.. 3) { emit(i) } }.onStart { Log.d("xys", "Start Flow in ${Thread.currentThread().name}") }.onEach { Log.d("xys", "emit value---$it") }.flowOn(Dispatchers.IO).collect { Log.d("xys", "Collect Flow in ${Thread.currentThread().name}") Log.d("xys", "Result---$it") }Copy the code

The output is:

D/xys: Start Flow in DefaultDispatcher-worker-1
D/xys: emit value---0
D/xys: emit value---1
D/xys: emit value---2
D/xys: emit value---3
D/xys: Collect Flow in main
D/xys: Result---0
D/xys: Collect Flow in main
D/xys: Result---1
D/xys: Collect Flow in main
D/xys: Result---2
D/xys: Collect Flow in main
D/xys: Result---3
Copy the code

At this point, Flow becomes an asynchronous non-blocking model, which is easier to understand, because it is in a different thread, and non-blocking at this time is meaningless. Since Flow code is executed first, while the code here is executed synchronously because there is no delay, and collect listens on the main thread during execution.

In addition to using flowOn to switch threads, an asynchronous non-blocking model can also be implemented using channelFlow.

I would like to recommend my website xuyisheng. Top/focusing on Android-Kotlin-flutter welcome you to visit