The flow. shareIn and flow. stateIn operators convert a cold stream to a hot stream: they broadcast information from an upstream cold data stream to multiple collectors. These two operators are commonly used to improve performance: adding buffering when there is no collector; Or as a caching mechanism.

Note: Cold flows are created on demand and send data when they are observed; Heat flows are always active, sending data whether they’re being observed or not.

This article will familiarize you with the shareIn and stateIn operators through examples. You’ll learn how to configure them for specific use cases and avoid common pitfalls that you may encounter.

The underlying data flow producer

Continuing with the example I used in previous articles – using the underlying data flow producer to issue location updates. It is a cold flow implemented using callbackFlow. Each new collector will trigger producers code blocks of data flow, and at the same time new callback will be added to the FusedLocationProviderClient.

class LocationDataSource(
    private val locationClient: FusedLocationProviderClient
) {
    val locationsSource: Flow<Location> = callbackFlow<Location> {
        val callback = object : LocationCallback() {
            override fun onLocationResult(result: LocationResult?).{ result ? :return
                try { offer(result.lastLocation) } catch(e: Exception) {}
            }
        }
        requestLocationUpdates(createLocationRequest(), callback, Looper.getMainLooper())
            .addOnFailureListener { e ->
                close(e) // in case of exception, close the Flow
            }
        // Clean up when Flow finishes collecting
        awaitClose {
            removeLocationUpdates(callback)
        }
    }
}
Copy the code

Let’s look at how you can optimize the locationsSource data flow using shareIn and stateIn in different use cases.

ShareIn or stateIn?

The first topic we’ll discuss is the difference between shareIn and stateIn. The shareIn operator returns SharedFlow and stateIn returns StateFlow.

Note: For more information about StateFlow and SharedFlow, check out our documentation.

StateFlow is a special configuration of SharedFlow designed to optimize sharing state: the last items to be sent are re-sent to the new collector, and they are merged using any.equals. You can see more about this in the StateFlow documentation.

The main difference between the two is that the StateFlow interface allows you to access its last emitted value synchronously by reading the value property. And that’s not how SharedFlow is used.

Improve performance

These apis can improve our performance by sharing the same instance of data flow that all collectors want to observe (rather than creating new instances of the same data flow on demand).

In the following example, LocationRepository consumes the locationsSource data stream exposed by the LocationDataSource, using the shareIn operator, This allows each collector interested in user location information to collect data from the same data flow instance. Here only one instance of the locationsSource data flow is created and shared by all collectors:

class LocationRepository(
    private val locationDataSource: LocationDataSource,
    private val externalScope: CoroutineScope
) {
    val locations: Flow<Location> = 
        locationDataSource.locationsSource.shareIn(externalScope, WhileSubscribed())
}
Copy the code

WhileSubscribed Sharing strategy is used to cancel the upstream data stream when no collector is available. This way, we can avoid wasting resources when no application is interested in location updates.

Android app tips! In most cases, you can use WhileSubscribed(5000) to keep the upstream data flow active for five seconds after the last collector has disappeared. This avoids the need to restart the upstream data flow under certain circumstances, such as configuration changes. This technique is especially useful when upstream data streams are expensive to create, or when these operators are used in the ViewModel.

Buffer events

In the following example, our requirements change. We are now required to keep listening for position updates and display the last 10 positions on the screen when the application returns from the background:

class LocationRepository(
    private val locationDataSource: LocationDataSource,
    private val externalScope: CoroutineScope
) {
    val locations: Flow<Location> = 
        locationDataSource.locationsSource
            .shareIn(externalScope, SharingStarted.Eagerly, replay = 10)}Copy the code

We set the parameter replay to 10 to keep the last 10 items sent in memory and resend them each time a collector observes the data stream. To keep the internal data flow active and send location updates, we used the sharing strategy SharingStarted.

Cache data

Our requirements have changed again, and this time we no longer need to constantly listen for location updates while the application is in the background. However, we need to cache the last item sent so that the user can see some data on the screen (even if the data is old) when they retrieve the current location. In this case, we can use the stateIn operator.

class LocationRepository(
    private val locationDataSource: LocationDataSource,
    private val externalScope: CoroutineScope
) {
    val locations: Flow<Location> = 
        locationDataSource.locationsSource.stateIn(externalScope, WhileSubscribed(), EmptyLocation)
}
Copy the code

Flow.statein can cache the last item sent and play it back to a new collector.

Attention! Do not create a new instance with every function call

Never use shareIn or stateIn to create a new data flow when a function call returns. This creates a new SharedFlow or StateFlow each time a function is called, and they remain in memory until scoped out or garbage collected when there are no references.

class UserRepository(
    private val userLocalDataSource: UserLocalDataSource,
    private val externalScope: CoroutineScope
) {
    // Do not use shareIn or stateIn in functions like this
    // This will create a new SharedFlow or StateFlow on each invocation, and these will not be reused.
    fun getUser(a): Flow<User> =
        userLocalDataSource.getUser()
            .shareIn(externalScope, WhileSubscribed())    

    // You can use shareIn or stateIn in properties
    val user: Flow<User> = 
        userLocalDataSource.getUser().shareIn(externalScope, WhileSubscribed())
}
Copy the code

The data stream that needs to be entered

Data flows that require input parameters such as userId cannot simply be shared using shareIn or stateIn. Take the open source project – Google I/O Android application Iosched as an example, you can see in the source code, the data flow from Firestore to get user events is implemented through callbackFlow. Because it accepts the userId as a parameter, it cannot simply be reused using the shareIn or stateIn operators.

class UserRepository(
    private val userEventsDataSource: FirestoreUserEventDataSource
) {
    // The new collector is registered in Firestore as the new callback.
    // Since this function depends on a 'userId', so in this function
    // Data streams cannot be reused by calling shareIn or stateIn.
    // This causes a new SharedFlow or StateFlow to be created each time the function is called
    fun getUserEvents(userId: String): Flow<UserEventsResult> =
        userLocalDataSource.getObservableUserEvents(userId)
}
Copy the code

How you optimize this use case depends on the requirements of your application:

  • Do you allow you to receive events from multiple users at the same time? If the answer is yes, you may need toSharedFlowStateFlowExample to create a map in thesubscriptionCountWhen it is 0, the reference is removed and the upstream data stream exits.
  • If you only allow one user, and the collector needs to be updated to watch new users, you can use one common to all collectorsSharedFlowStateFlowSend event updates and use the public data stream as a variable in the class.

The shareIn and stateIn operators can be used with cold flow to improve performance, and you can use them to add buffering when there is no collector, or directly as a caching mechanism. Use them with care and don’t create new instances of data flow every time a function is called — this can lead to wasted resources and unexpected problems!