Your support means a lot to me!

🔥 Hi, I’m Peng. This article has been included in GitHub · Android-Notebook. Here are Android advanced growth route notes & blog, like-minded friends, welcome to grow with me. (Contact & Group entry on GitHub)

background

  • Kotlin Flow is a data Flow framework based on the basic capabilities of Kotlin coroutines. It is a solution between LiveData and RxJava in terms of functional complexity. Kotlin Flow has more capabilities than LiveData, but it is more streamlined by tailoring RxJava’s many complex operators. And with the support of Kotlin coroutines, Kotlin Flow is currently the data Flow framework promoted by Google.

1. Why use Flow?

LiveData, Kotlin Flow and RxJava all belong to the observable data container class. Observer mode is their same basic design mode. What are the advantages of Kotlin Flow compared with the other two?

LiveData is a component of the Android X package and is a simple life-cycle aware container in the Android ecosystem. Simplicity is both its strength and its limitation, and of course these limitations should not be counted against LiveData, which is designed to be a simple data container. For simple data flow scenarios, there is no problem using LiveData.

  • LiveData can only update data on the main thread: setValue can only be set on the main thread, even if postValue is switched to the main thread.
  • LiveData data playback problem:Registering a new subscriber will re-receive the data stored by LiveData, which in some cases is not expected (you can use a custom LiveData subclassSingleLiveData 或 UnPeekLiveDataSolve, not expand here);
  • LiveData does not shake:Repeat setValue for the same value and the subscriber will receive it multiple timesonChanged()Callbacks (can be useddistinctUntilChanged()Solve, not expand here);
  • LiveData does not support back pressure:When the data production speed is greater than the data consumption speed, LiveData cannot be processed properly. Such as a large number of child threadspostValueWhen the main thread is not consuming enough data, some of the data in the middle is ignored.

RxJava is a component developed by ReactiveX, a third-party organization. Rx is a multi-language data flow framework, including Java, Go and other languages. It is powerful to its advantage, supporting a large number of rich operators, as well as thread switching and back pressure. However, the high learning threshold for Rx is a new burden for development and the risk of misuse.

Kotlin is a component of the Kotlinx package, not a product of the Android ecosystem. So what’s Flow’s advantage?

  • Flow supports coroutines: Based on the basic capability of coroutines, Flow can produce and consume data in a structured and concurrent way, and can realize thread switching (relying on the Dispatcher of coroutines).
  • Flow supports back pressure. SharedFlow, a subclass of Flow, supports the configuration of cache capacity to cope with the situation where the data production speed is higher than the data consumption speed.
  • Flow supports data replay configuration: A subclass of Flow, SharedFlow, supports configuring replay replay, and can customize the configuration of data replay for new subscribers.
  • Flow has a lower learning threshold than RxJava: Flow has more streamlined functions and a higher learning cost ratio. However, Flow is based on coroutines, and there is some learning cost in coroutines, but this should be broken down.

Of course, Kotlin Flow also has some limitations:

  • Flows are not Lifecycle aware components: Flows are not a product of the Android ecosystem, and natural flows do not care about component lifecycles. So how do we ensure that the subscriber does not update the View in the wrong state while listening to the Flow data Flow? This question is addressed in section 6 below.

2. Cold data flow and hot data flow

Kotlin Flow consists of three entities: data producer – (optional) broker – data consumer. The data producer is responsible for emitting data to the data stream, while the data consumer consumes data from the data stream. Kotlin Flow can be divided into cold Flow and hot Flow according to the timing of production data:

  • Plain Flow (cold Flow) : Cold flows are not shared and have no caching mechanism. Cold stream executes the code for transmitting data stream on demand only when the subscriber collects data. Cold flow has a one-to-one relationship with subscribers. The data flow between multiple subscribers is independent of each other and automatically shuts down once the subscriber stops listening or the production code ends.
  • SharedFlow/StateFlow (Heat flow) :The heat flow is shared and cached. The data can be produced and cached whether or not there is subscriber collect data. Hot flows and subscribers have a one-to-many relationship where multiple subscribers can share the same data flow. When a subscriber stops listening, the data stream is not automatically closed (unless usedWhileSubscribedStrategy, more on this later).


3. Normal Flow

Normal flows are cold flows where data is not shared and there is no caching mechanism. The data source delays producing data until the consumer starts listening (such as the terminal operation COLLECT {}), and an entirely new data stream is created with each subscription. Once the consumer stops listening or the producer code ends, the Flow closes automatically.

val coldFlow: Emit (result) delay(100)}}. Collect {data ->}Copy the code

The main operations of cold Flow are as follows:

  • Create flow{} : The flow constructor creates a new data flow. Flow {} is the suspend function and needs to be executed in a coroutine;
  • Emit data emit() : emit() sends a new value to the data stream;
  • Collect {} :Trigger data stream consumption to retrieve all emitted values in the data stream. Flow is a cold Flow, and the data Flow will be delayed until the terminal operation COLLECT. Moreover, every time collect is repeatedly called on Flow, the Flow {} will be repeatedly executed to trigger the data sending action.AbstractFlow). Collect is the suspend function and needs to be executed in a coroutine.
  • Catch {} : Catch {} catches exceptions that occur in the data stream;
  • FlowOn () : changes the CoroutineContext CoroutineContext for upstream data operations without affecting downstream operations. If there are multiple flowOn operators, each flowOn will only change the upstream data flow at the current location;
  • OnStart: triggered before data starts to be sent, callback in the data production thread;
  • OnCompletion: Triggered after the data is sent, and called back from the data production thread;
  • State callback onEmpty: Triggered when the data stream is empty (at the end of data sending but no data is actually sent) and called back in the data production thread.

In AbstractFlow, you can see that the collector code block is executed once every time the terminal operation COLLECT is called, i.e., the data production code is re-executed:

AbstractFlow.kt

public abstract class AbstractFlow<T> : Flow<T> { @InternalCoroutinesApi public final override suspend fun collect(collector: FlowCollector<T>) { // 1. Val safeCollector = safeCollector (collector, coroutineContext) try {// 2. Execute the flow{} code block collectSafely(safeCollector)} finally {// 3. Release coroutines related parameters safeCollector. ReleaseIntercepted ()}} public abstract suspend fun collectSafely (collector: FlowCollector<T>) } private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() { override suspend fun collectSafely(collector: FlowCollector<T>) { collector.block() } }Copy the code

4. SharedFlow — Advanced edition of LiveData

StateFlow is a subclass of SharedFlow, so we’ll start with SharedFlow. Both SharedFlow and StateFlow are hot flows that can produce and cache data regardless of whether a subscriber (collect) exists. They both have a mutable version, MutableSharedFlow and MutableStateFlow, similar to LiveData and MutableLiveData, that should be used when exposing interfaces.

4.1 Interface between SharedFlow and MutableSharedFlow

Direct to the interface does not understand, here is the first release of the two interfaces for easy viewing:

Public interface SharedFlow<out T> : Flow<T> {public val replayCache: List<T> } public interface MutableSharedFlow<T> : SharedFlow<T>, FlowCollector<T> override suspend fun emit(value: T) // Attempt to emit data (if cache overflow policy is SUSPEND, then false is returned) public fun tryEmit(value: T): Boolean number of active subscribers public val subscriptionCount: Public fun resetReplayCache()} public fun resetReplayCache()} StateFlow<Int>Copy the code

4.2 Constructing a SharedFlow

I will think of SharedFlow as a highly configured version of LiveData, which is shown first in the constructor. The SharedFlow constructor allows us to configure three parameters:

SharedFlow.kt

Public fun <T> MutableSharedFlow(public fun <T> MutableSharedFlow( Int = 0, // Cache overflow policy onBufferOverflow: BufferOverflow = bufferoverflow.suspend): MutableSharedFlow<T> { val bufferCapacity0 = replay + extraBufferCapacity val bufferCapacity = if (bufferCapacity0 < 0) Int.MAX_VALUE else bufferCapacity0 // coerce to MAX_VALUE on overflow return SharedFlowImpl(replay, bufferCapacity, OnBufferOverflow)} public enum class overflow {SUSPEND, // discard the oldest DROP_LATEST, // discard the latest DROP_LATEST}Copy the code
parameter describe
reply Number of replay data, which will be replayed when a new subscriber registers
extraBufferCapacity Extra cache capacity, extra cache capacity of SharedFlow, extra cache capacity of SharedFlow, extra cache capacity of SharedFlow, extra cache capacity of SharedFlow, extra cache capacity of SharedFlow, extra cache capacity of SharedFlow, extra cache capacity of SharedFlow, extra cache capacity of SharedFlow, extra cache capacity of SharedFlow, extra cache capacity of SharedFlow, extra cache capacity of SharedFlow
onBufferOverflow Cache overflow policy: SUSPEND, DROP_OLDEST, and DROP_LAST when the cache capacity is full

SharedFlow default capacity is 0, replay is 0, and the cache overflow policy is SUSPEND, which means that when transmitting data, the data is directly discarded and suspended (read the emit source code), and the subscriber receives no data at all.

A simple comparison between SharedFlow and LiveData shows why SharedFlow can be understood as “high edition” LiveData:

  • Capacity problem: The LiveData capacity is fixed to one, while the SharedFlow capacity can be configured to zero or multiple.
  • Back pressure: LiveData can’t handle back pressure, while SharedFlow has cache space to handle back pressure;
  • Replaying problem: LiveData replays 1 data, while SharedFlow supports replaying 0 or more data.
  • Threading issues: LiveData can only be subscribed in the main thread, while SharedFlow supports subscriptions in any thread (via the Dispatcher of the coroutine).

Of course, SharedFlow is not a complete victory. LiveData can deal with life cycle security issues, while SharedFlow cannot (because Flow itself is not a component of the pure Android ecosystem). Improper use will lead to unnecessary operations and waste of resources. And the risk of updating the View in the wrong state. Don’t worry though, this can be solved by Lifecycle API in Section 6.

4.3 Convert common Flow to SharedFlow

As mentioned earlier, cold flow is not shared and there is no caching mechanism. Using flow. shareIn or flow.statein, you can turn a cold stream into a hot stream, both to share data with multiple subscribers and to add buffering.

Share.kt

Public fun <T> Flow<T>. ShareIn (// CoroutineScope scope: CoroutineScope, // Started policy: SharingStarted, // Control the number of data playback replay: Int = 0 ): SharedFlow<T> { val config = configureSharing(replay) val shared = MutableSharedFlow<T>( replay = replay, extraBufferCapacity = config.extraBufferCapacity, onBufferOverflow = config.onBufferOverflow ) @Suppress("UNCHECKED_CAST") scope.launchSharing(config.context, Upstream, shared, started, NO_VALUE as T) return shared.assharedflow ()} public companion object {// Start immediately, and terminate when the scope specified by scope ends: SharingStarted = StartedEagerly() // Lazy: starts when the first subscriber is registered and terminates when the scope specified by scope ends public val Lazily: SharingStarted = StartedLazily() public fun WhileSubscribed( stopTimeoutMillis: Long = 0, replayExpirationMillis: Long = Long.MAX_VALUE ): SharingStarted = StartedWhileSubscribed(stopTimeoutMillis, replayExpirationMillis) }Copy the code

SharedIn parameters scope and replay need not be explained too much, mainly introduced started: SharingStarted strategy, divided into three types:

  • Sometimes, data flows are started immediately and are held until scope ends.

  • Lazily: starts when the first subscriber registers and keeps data flowing (until scope ends);

  • WhileSubscribed() : Started when the first subscriber registers and keeps the data flow until completed when the last subscriber logs out (or until the scope specified by scope ends). The WhildSubscribed() strategy stops the data stream when there are no subscribers, avoiding unnecessary resource waste such as constantly reading data from a database or sensor.

    WhileSubscribed () also provides two configuration parameters:

    • StopTimeoutMillis Timeout period (ms) : The timeout period for which the data stream is retained after the last subscriber unsubscribes. the default value is 0, indicating that the data stream is stopped immediately. This parameter helps prevent the data stream from being shut down immediately after a subscriber logs out for a short time. If you want to stop the data flow after five seconds if there are no subscribers, you can use whileSubscribed(5000).
    • ReplayExpirationMillis Playback expiration time (ms) : MAX_VALUE specifies the expiration date of a data stream. The default value is long. MAX_VALUE. ReplayExpirationMillis time occurs after stopTimeoutMillis. If you want to stop the data flow after five seconds and if the data you wait five seconds is considered stale, you can use whileSubscribed(5000, 5000).

StateFlow — Alternative to LiveData

StateFlow is a sub-interface of SharedFlow and can be understood as a special SharedFlow. However, their inheritance relationship is only the interface inheritance relationship, the internal implementation class SharedFlowImpl and StateFlowImpl are actually separate, just to keep in mind.

5.1 Interface between StateFlow and MutableStateFlow

Here are two interfaces for viewing:

Public interface StateFlow<out T> : SharedFlow<T> {val value: T} public interface StateFlow<T> : StateFlow<T>, MutableSharedFlow<T> {public override var value: Public fun compareAndSet(expect: T, update: T): Boolean} public fun compareAndSet(expect: T, update: T): BooleanCopy the code

5.2 Constructing a StateFlow

The StateFlow constructor is much simpler and takes only one mandatory argument, representing the initial value:

public fun <T> MutableStateFlow(value: T): MutableStateFlow<T> = StateFlowImpl(value ? : NULL)Copy the code

5.3 Special SharedFlow

StateFlow is a special configuration of SharedFlow. A line of code like MutableStateFlow(initialValue) is essentially the same as using SharedFlow as follows:

val shared = MutableSharedFlow(
    replay = 1,
    onBufferOverflow = BufferOverflow.DROP_OLDEST
)
shared.tryEmit(initialValue) // emit the initial value
val state = shared.distinctUntilChanged() // get StateFlow-like behavior
Copy the code
  • Initial value: The initial value must be passed in when StateFlow is initialized.
  • Capacity 1: StateFlow stores only one value;
  • Replay 1: StateFlow replays the latest value to the new subscriber;
  • ResetReplayCache () is not supported:StateFlow’s resetReplayCache() method throwsUnsupportedOperationException
  • The cache overflow policy is DROP_OLDEST: indicates that new data will overwrite old data each time.

In general, StateFlow requires an initial value to be passed in, and only supports saving the latest data, replaying the latest value to new subscribers, and does not allow resetting to slow down storage. It is no exaggeration to say that StateFlow is an alternative to LiveData. In addition, StateFlow supports several additional features:

  • Data buffeting: it means that the callback will be made only when the updated value changes. If the updated value does not change, the callback will not be made. In fact, a layer of interception is added when transmitting data:

StateFlow.kt

public override var value: T get() = NULL.unbox(_state.value) set(value) { updateState(null, value ? : NULL) } override fun compareAndSet(expect: T, update: T): Boolean = updateState(expect ? : NULL, update ? : NULL) private fun updateState(expectedState: Any? , newState: Any): Boolean { var curSequence = 0 var curSlots: Array<StateFlowSlot? >? = this.slots // benign race, we will not use it synchronized(this) { val oldState = _state.value if (expectedState ! = null && oldState ! = expectedState) return false // CAS support if (oldState == newState) return true But CAS returns true _state.value = newState... return true } }Copy the code
  • CAS operation: atomic comparison and set operation that returns true only if the old value is the same as Expect.

5.4 Converting Common Flow to StateFlow

Like SharedFlow, regular flows can be converted to StateFlow:

Share.kt

Public fun <T> Flow<T>. StateIn (scope: CoroutineScope, // Share started policy: SharingStarted, // initialValue: T): StateFlow<T> { val config = configureSharing(1) val state = MutableStateFlow(initialValue) scope.launchSharing(config.context, config.upstream, state, started, initialValue) return state.asStateFlow() }Copy the code

6. Safely observe the Flow data Flow

As mentioned above, Flow does not have the life cycle awareness of LiveData, so subscribers will have the problem of life cycle safety when they listen to Flow data Flow. Google recommends using the Lifecycle#repeatOnLifecycle API:

/ / since 2.4.0 support Lifecycle# repeatOnLifecycle API implementation "androidx. Lifecycle: lifecycle - runtime - KTX: against 2.4.1." "Copy the code
  • LifecycleOwner#addRepeatingJob: automatically creates and starts a coroutine execution block when the life cycle reaches the specified state and automatically removes the coroutine when the life cycle falls below that state. Because addRepeatingJob is not a suspend function, it does not follow the rules of structured concurrency. Currently deprecated and replaced by repeatOnLifecycle() below (the reason for deprecating the addRepeatingJob is the story behind designing the repeatOnLifecycle API);
  • Lifecycle#repeatOnLifecycle: repeatOnLifecycle does the same thing except that it is a suspend function and needs to be executed in a coroutine;
  • Flow#flowWithLifecycle: Flow#flowWithLifecycle works the same way internally based on repeatOnLifecycle API.
class LocationActivity : AppCompatActivity() { override fun onCreate(savedInstanceState: Bundle?) { super.onCreate(savedInstanceState) lifecycleOwner.addRepeatingJob(Lifecycle.State.STARTED) { locationProvider.locationFlow().collect { // update UI } } } } class LocationActivity : AppCompatActivity() { override fun onCreate(savedInstanceState: Bundle?) {super.oncreate (savedInstanceState) // repeatOnLifecycle is to suspends, So lifecycleScope. Launch {// When lifecycleScope's lifecycle is higher than STARTED, Start a new coroutine and execute a block of code // When the lifecycleScope lifecycle is less than STARTED, RepeatOnLifecycle (Lifecycle.state.started) {// The current Lifecycle must be higher than the STARTED State, and the data can be safely fetched from the data stream. And update the View locationProvider. LocationFlow (). Collect {/ / update the UI}} / / structured concurrent: }}} Class LocationActivity: AppCompatActivity() { override fun onCreate(savedInstanceState: Bundle?) { super.onCreate(savedInstanceState) locationProvider.locationFlow() .flowWithLifecycle(this, Lifecycle.State.STARTED) .onEach { // update UI } .launchIn(lifecycleScope) } }Copy the code

What exactly happens if you don’t use the Lifecycle#repeatOnLifecycle API?

  • Activity. LifecycleScope. Launch: immediately start coroutines, and destroyed the Activity in disappear assist process;
  • Fragments. LifecycleScope. Launch: immediately start coroutines, and took a disappear assist cheng in the fragments destroyed;
  • Fragments. ViewLifecycleOwner. LifecycleScope. Launch: immediately start coroutines, and views in the Fragment is destroyed in disappear assist process.

As you can see, these coroutine apis only remove the coroutine when the last component/view is destroyed. The coroutine is not cancelled when the view is in the background, and Flow continues to produce data and trigger an update to the view.

  • LifecycleContinueScope. LaunchWhenX: state specified in the life cycle to immediately start coroutines execution code block, suspends in the life cycle is lower than the state, not cancel coroutines, in the life cycle to higher than the specified state, automatic recovery the coroutines.

As you can see, these coroutine apis suspend the coroutine when the view leaves a state, avoiding updating the view. However, Flow continues to produce data, which can also lead to unnecessary operations and resource consumption (CPU and memory). Although the coroutine can be fetched manually when the view goes into the background, the template code is obviously added and not as concise as the repeatOnLifecycle API.

Class LocationActivity: AppCompatActivity() {// Coroutine controller private var locationUpdatesJob: Job? = null override fun onStart() { super.onStart() locationUpdatesJob = lifecycleScope.launch { LocationProvider. LocationFlow (). Collect {/ / update the UI}}} to override fun onStop () {/ / in the view's background in disappear assist locationUpdatesJob cheng? .cancel() super.onStop() } }Copy the code

Looking back, how does repeatOnLifecycle implement lifecycle awareness? Lifecycle#addObserver to listen for lifecycle changes:

RepeatOnLifecycle.kt

suspendCancellableCoroutine<Unit> { cont -> // Lifecycle observers that executes `block` when the lifecycle reaches certain state, and // cancels when it falls below that state. val startWorkEvent = Lifecycle.Event.upTo(state) val cancelWorkEvent = Lifecycle.Event.downFrom(state) val mutex = Mutex() observer = LifecycleEventObserver { _, event -> if (event == startWorkEvent) { // Launch the repeating work preserving the calling context launchedJob = [email protected] { // Mutex makes invocations run serially, // coroutineScope ensures all child coroutines finish mutex.withLock { coroutineScope { block() } } } return@LifecycleEventObserver } if (event == cancelWorkEvent) { launchedJob? .cancel() launchedJob = null } if (event == Lifecycle.Event.ON_DESTROY) { cont.resume(Unit) } } [email protected](observer as LifecycleEventObserver) }Copy the code

7. The Channel Channel

In addition to the Flow API mentioned above, there is also a Channel API. A Channel is a data structure in Kotlin that implements cross-coroutine data transmission, similar to a BlockQueue in Java. The difference is that blockqueues block threads, whereas channels are suspended threads. Google recommends using Flow rather than Channel. The main reason is that Flow shuts down data Flow more automatically, and once a Channel is not shut down properly, it is easy to cause resource leakage. In addition, Flow provides more explicit constraints and operators than Channel, and is more flexible.

Channel operations are as follows:

  • Create Channel: Create a Channel object with Channel(channel.unlimited), or create a producer coroutine directly with produce{};
  • Close Channel: Channel#close();
  • Sending data: Channel#send() sends a data to a Channel. The send() operation is suspended if the Channel capacity is insufficient. The default capacity of a Channel is 1.
  • Receive data: Pull a piece of data from a Channel through Channel#receive(), or create a consumer coroutine directly through actor. The receive() operation will hang if there is insufficient data in the Channel.
  • BroadcastChannel (deprecated, using SharedFlow) : In a BroadcastChannel, data is received by only one consumer. A BroadcastChannel allows multiple consumers to receive data.
Public fun <E> Channel(// Buffer capacity, onBufferOverflow reject policy if capacity is exceeded) Int = RENDEZVOUS, // Buffer overflow policy, default to hang, and DROP_OLDEST and DROP_LATEST onBufferOverflow: BufferOverflow = bufferoverflow.suspend, // Handle elements that fail to be delivered successfully, such as subscriber cancellation or exception thrown onUndeliveredElement: ((E) -> Unit)? = null ): Channel<E>Copy the code

8. Taste it

LiveData, Flow and Channel have all been covered here. How to use them in real scenarios?

  • Event: The Event is valid once. The new subscriber should not receive the old Event, so the Event data is suitable for SharedFlow(replay=0);
  • State: State is recoverable and new subscribers are allowed to receive old State data, so State data is suitable for use with StateFlow.

Android UI architecture evolution: From MVC to MVP, MVVM, and MVI

BaseViewModel.kt

interface UiState interface UiEvent interface UiEffect abstract class BaseViewModel<State : UiState, Event : UiEvent, Effect: UiEffect> : ViewModel() {private val initialState: State by lazy {createInitialState()} // The required State of the page, ViewState corresponding to MVI mode private val _uiState = MutableStateFlow<State>(initialState) // Use immutable version val uiState = for external interfaces _uistate.asStateflow () // A "side effect" of a page state change, like a one-time event, State changes that do not need to be replayed (e.g. Toast) private val _effect = MutableSharedFlow<Effect>() // Use the immutable version of the external interface val Effect = _effect.assharedFlow () Intent private val _event = MutableSharedFlow<Event>() init {viewModelscope.launch {_event.collect { HandleEvent (it)}} // Initial state protected Abstract fun createInitialState(): State protected abstract fun handleEvent(event: event) /** * event entry */ fun sendEvent(event: Event) {viewModelscope.launch {_event.emit(Event)}} State) {_uistate. value = newState} /** ** protected fun setEffect(effect: effect) {_effect.send(effect)}}Copy the code

The resources

  • Coroutines Flow best practice | application based on the Android developer summit – Android official documentation
  • The story behind the design of the repeatOnLifecycle API – Official Android documentation
  • A more secure way to collect Android UI data streams — Official Android documentation
  • Flow operators shareIn and stateIn usage notes — Official Android documentation
  • Migrating from LiveData to Kotlin Data Streams – Official Android documentation
  • Solving development Pain Points with Kotlin Flow by Duliang Ren
  • Kotlin – Unwinding Flow in coroutines – Nine hearts
  • Kotlin Flow practice summary! — Written by an enchanted wax gourd
  • Android — Kotlin-Channel by HQK

Your likes mean a lot to me! Wechat search public number [Peng Xurui], I hope we can discuss technology together, find like-minded friends, we will see you next time!