preface

The introduction of Flow under Kotlin coroutine is refreshing. Fresh technology is always attractive, and recently a new set of scaffolding was built for future development of new applications, using Flow as the technology selection. The process of building the infrastructure encountered a pitfall in Flow usage, or in API usage, chained calls, and unproficiency with extension functions. In addition, Google officials are showing signs of gradually using Flow. For example, using the Paging 3 implement Paging loading, actual combat | using Flow in Room used in the demo to the Flow.

A brief introduction to Flow and its use

  • FLow: indicates an asynchronous FLow. It’s still a responsive flow conceptually. This is very similar to Rxjava. Developers familiar with Rxjava can quickly adapt to Flow. Flow provides many rich operators, such as map, fliter, count, and so on. Compared to Rxjava, Flow is easier to use and switch threads. Here’s a simple example.

  • Transition operator: The transition operator applies to the upstream stream and returns to the downstream stream. These operators are also cold operators, meaning that they will not be executed if they are not ‘subscribed’. OnStart/catch/onCompletion/map/filter are transition operator, etc.

  • End-stream operator: a suspended function on a stream used to start stream collection. Collect is the basic end operator. First, toList, reduce, and so on are end operators. Subscription for type Rxjava. This is understood to be collecting the final flow result.

  • Cold Flow: A Flow is a cold Flow similar to a sequence – upstream code does not run until the stream is collected, similar to RxJava upstream streams that are subscribed to.

  • Using the demonstration

More details you can refer to the use of coroutines Flow best practice | application based on the Android developer summit

private fun getData(): String { Thread.sleep(3000) return "Flow test" } @Test fun testFlowNormal() { GlobalScope.launch { flow { Emit (getData())}.flowon (dispatchers.io) // Switch threads.onstart {println("onStart")}.catch {emit(getData()).flowon (dispatchers.io) // switch threads.onstart {println("onStart")}.catch { Println ("catch:${it.message}")// Completion {println("oniComplete:${it? .collect {println("result = $it")}} thread.sleep (6000)}Copy the code

Output results:

onStart
result = Flow test
oniComplete:null
Copy the code

Note: Catch and onCompletion are executed in the order in which they were called.

The method that has been called first posts part of the method source code (method name, parameters, return value type), which will be used later

  1. FlowOn:Can be used with context switch, often with thread switch, exceptcontext == EmptyCoroutineContextOther return values are a new Flow object
public fun <T> Flow<T>.flowOn(context: CoroutineContext): Flow<T> {
    checkFlowContext(context)
    return when {
        context == EmptyCoroutineContext -> this
        this is FusibleFlow -> fuse(context = context)
        else -> ChannelFlowOperatorImpl(this, context = context)
    }
}
Copy the code
  1. OnStart: The Flow starts execution and returns a new Flow object
public fun <T> Flow<T>.onStart( action: suspend FlowCollector<T>.() -> Unit ): Flow<T> = unsafeFlow{... }Copy the code
  1. Catch: A Flow exception is caught and the value returned is a new Flow object
public fun <T> Flow<T>.catch(action: suspend FlowCollector<T>.(cause: Throwable) -> Unit): Flow<T> = flow { val exception = catchImpl(this) if (exception ! = null) action(exception) }Copy the code
  1. OnCompletion: Indicates the end of the Flow execution. The value is returned to the new Flow object
public fun <T> Flow<T>.onCompletion( action: suspend FlowCollector<T>.(cause: Throwable?) -> Unit ): Flow<T> = unsafeFlow {... }Copy the code
  1. Collect: subscription to the end operator of type Rxjava
public suspend inline fun <T> Flow<T>.collect(crossinline action: suspend (value: T) -> Unit): Unit =
    collect(object : FlowCollector<T> {
        override suspend fun emit(value: T) = action(value)
    })
Copy the code

Note that all of the above methods are extensions of FLow, and a few specifically specify their return values. So here’s where I start digging my way out of this hole.

Why is the transition operator code not executed?

Simulate the problematic code based on the specific business:

@Test fun testFlowSingle() { GlobalScope.launch { val flow = flow<Any> { emit(getData()) } println("flow obj = ${flow.hashCode()}") val flowOn = flow.flowOn(Dispatchers.IO) println("flowOn obj = ${flowOn.hashCode()}") val catch = Flow. Catch {println (" catch: ${it.message}") } println("catch obj = ${catch.hashCode()}") val onStart = flow.onStart { println("onStart") } println("onStart obj = ${onStart.hashCode()}") val onCompletion = flow.onCompletion { println("onComplete:${it? .message}") } println("onCompletion obj = ${onCompletion.hashCode()}") flow.collect { println("collect result = $it") } println("last obj = ${flow.hashCode()}") } Thread.sleep(6000) }Copy the code

Output results:

flow obj = 1846810080
flowOn obj = 2093635757
catch obj = 438978854
onStart obj = 1552526949
onCompletion obj = 175428485
collect result = Flow test
last obj = 1846810080
Copy the code

As you can see the onStart/catch/onCompletion method fast code in a print. FlowOn also doesn’t have the thread switching effect I was expecting. We also print out the hashCode for each flow object. By comparing the hashCode for each flow object, we find that in addition to the object operated by the end operator and the start flow{… } creates the same object, and all other operators create different objects.

The flow is continuous and cold

In Kotlin official documents seem to find the code above does not perform the answer: www.kotlincn.net/docs/refere…

Each individual collection of streams is performed sequentially, unless the operator doing the particular operation uses multiple streams. The collection runs directly inside the coroutine, which calls the end operator. New coroutines are not started by default. Each transition operator processes each emitted value from upstream to downstream and then passes it to the end operator

Important: Each transition operator processes each emitted value from upstream to downstream and then passes it to the end operator.

So why the above question Why flowOn/onStart/catch/onCompletion didn’t perform to had the answer: the upstream flow transition operator generates to the terminal operator. It’s as simple as that. That’s what the title says, the flow is continuous and it’s cold.

Finally: Connect the upstream and downstream streams

We know that streams need to go to the end operator eventually, so we just connect the streams emitted by each transition operator. One is to start with a chain like the sample, and the second is an object that receives each transition operator. Let’s look at the second one:

@test fun testFlowSingle2() {globalscope.launch {// assign object to flow var flow = flow<Any> {emit(getData())} println("flow obj = ${flow.hashCode()}") flow = flow.flowOn(Dispatchers.IO) println("flowOn obj = ${flow.hashCode()}") Flow = flow.catch {println("catch: ${it.message}") } println("catch obj = ${flow.hashCode()}") flow = flow.onStart { println("onStart") } println("onStart obj = ${flow.hashCode()}") flow = flow.onCompletion { println("onComplete:${it? .message}") } println("onCompletion obj = ${flow.hashCode()}") flow.collect { println("collect result = $it") } println("last obj = ${flow.hashCode()}") } Thread.sleep(6000) }Copy the code

Output results:

flow obj = 1846810080
flowOn obj = 2093635757
catch obj = 438978854
onStart obj = 1552526949
onCompletion obj = 175428485
onStart
collect result = Flow test
onComplete:null
last obj = 175428485
Copy the code

As a result, the transition operators have all been executed. Out of the pit!

Any technical communication is welcome at any time ~