An asynchronous stream of coroutines

Why asynchronous flows?

Suspended functions can asynchronously return a single value, but what if you need to return multiple values? This is where asynchronous flows can be used

In the absence of asynchronous flows, we would have used the traditional approach as follows:

private fun simple(a): List<Int> {
   val list = mutableListOf<Int> ()for (i in 1.3.) {
      TimeUnit.MILLISECONDS.sleep(200)
      list.add(i)
   }
   return list
}

fun main(a) = simple().forEach{ value -> println(value) }
Copy the code

As we continue to learn, there is a better way to achieve asynchronous return:

private fun simple(a): Sequence<Int> = sequence {
   for (i in 1.3.) {
      TimeUnit.MILLISECONDS.sleep(200)
      yield(i)
   }
}


fun main(a): Unit = runBlocking {
    simple().forEach { i -> log("$i")}}Copy the code

As mentioned earlier, the benefits of Sequence are obvious in that there are no additional temporary collections or functions that need to wait for other elements to execute the next step

If you compare the two methods, you will find that the List method is printed simultaneously (almost) while the Sequence method is printed one by one

This involves two sets of operations. One is that the elements of a set complete the filtering effect of a certain function, such as filter function, and then the filtered elements are treated as a whole and manipulated by a new function, such as map function, which is a set type operation

The second is that the elements of each set are independent. Instead of waiting for the whole set to complete the filter function, the elements finish the filter and map by themselves and finally print out the filter and map by forEach without waiting for other elements

Now we learn the coroutines, found that the code above TimeUnit. MILLISECONDS. Sleep (200) is blocking the main thread, this is wrong, if you run this code the UI thread is the thread? Now the UI will block during the sleep, and the user will see that the app is stuck, so we need to modify, with coroutine modification

private fun simple(a) = flow<Int> {
   for (i in 1.3.) {
      delay(100)
      emit(i)
   }
}

fun main(a): Unit = runBlocking {
    simple().collect { value -> log("$value")}}Copy the code

The advantage of this is that the main thread does not block when an operation such as delay is performed, but is handed over to the background thread

Source:

public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>. () - >Unit): Flow<T> = SafeFlow(block)
Copy the code

The suspend parameter is found, so we don’t need to add additional suspend

Flow uses emit to add values and collect values

The flow is the flow of cold

It, like sequence, is executed only when collect(end operation) is called

private fun simple(a): Flow<Int> = flow {
   log("Flow started")
   for (i in 1.3.) {
      delay(100)
      emit(i)
   }
}

fun main(a) = runBlocking<Unit> {
   log("Calling simple function...")
   val flow = simple()
   log("Calling collect...")
   flow.collect { value -> log("$value") }
   log("Calling collect again...")
   flow.collect { value -> log("$value")}}Copy the code

So you’ll see that flow only prints 1, 2, and 3 when you call collect, and the example above calls it twice and prints 2 rounds of 1, 2, and 3

Flow to cancel

Timeout stream cancellation

private fun simple(a) = flow<Int> {
   for (i in 1.3.) {
      delay(1000)
      log("emit $i")
      emit(i)
   }
}


fun main(a): Unit = runBlocking {
   withTimeout(2500) {
      simple().collect { value -> log("$value")}}}Copy the code

Flow cancellation detection

Flow checks ensureActive for each emit, which means loops in our flow can be cancelled

fun foo(a) = flow<Int> {
   for (i in 1.. 5) {
      log("emit $i")
      emit(i)
   }
}

fun main(a): Unit = runBlocking {
   foo().collect { value ->
      if (value == 3) {
         cancel()
      }
      log("main $value")}}Copy the code
[main] emit 1
[main] main 1
[main] emit 2
[main] main 2
[main] emit 3
[main] main 3
[main] emit 4
Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job=BlockingCoroutine{Cancelled}@3712b94
Copy the code

We canceled flow in the main function, so the console print ends on Emit 4

However, in order to improve performance, some flow applications are not cancelled, such as range.asflow,

// This is the way
private fun simple02(a) = (1.20.).asFlow()

fun main(a): Unit = runBlocking {
   simple02().collect {
      if (3 == it) {
         cancel()
      }
      log(it)
   }
}
Copy the code
[main] 1
[main] 2
[main] 3
[main] 4[main] ... Omit the [main]20
Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job=BlockingCoroutine{Cancelled}@5d76b067
Copy the code

Allow busy coroutines to be cancelled

As mentioned above, coroutines can be cancelled and need to be evaluated by ensureActive, so we can consider adding unchecking function to them proactively

private fun simple03(a) = (1.20.).asFlow()

fun main(a): Unit = runBlocking {
    simple03().onEach { currentCoroutineContext().ensureActive() }.collect { value ->
       if (value == 3) {
          cancel()
       }
       log(value)
    }
}
Copy the code

The flow can then be cancelled

Kotlin also provides cancellable as a way to add and cancel tests:

private fun simple03(a) = (1.20.).asFlow()

fun main(a): Unit = runBlocking {
   simple03().cancellable().collect { value ->
      if (value == 3) {
         cancel()
      }
      log(value)
   }
}
Copy the code
[main] 1
[main] 2
[main] 3
Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job=BlockingCoroutine{Cancelled}@4e7dc304
Copy the code

The underlying principle is the same in both cases

How to create a flow

  1. flow {}
  2. flowOf
  3. .asFlow
listOf(1.2.3).asFlow().collect { value -> log("$value")}Copy the code

Intermediate stream operator

These operations operate in the same way as sequence operations, with intermediate operations. During intermediate operations, the stream elements are not operated on until the end stream operations are executed together, and each element is executed separately (except for special functions).

arrayOf("zhazha"."heihei"."xixi").asFlow().filter { s -> s.startsWith("z") }
   .map { value -> value.first().toUpperCase() }.collect { value -> log("$value")}Copy the code

Transformation operator

suspend fun performRequest(request: Int): String {
   delay(1000)
   return "response $request"
}

fun main(a): Unit = runBlocking {
   (1.3.).asFlow().transform { value ->
      this.emit("zhazha")
      this.emit(performRequest(value))
   }.collect { log(it) }
}
Copy the code

Wrap the following elements before performing the flow operation

The length limiting operator

private fun number(a) = flow<Int> {
   try {
      emit(1)
      emit(2)
      log("zhazha")
      emit(3)}finally {
      log("finally xixi")}}fun main(a): Unit = runBlocking {
   val sum = number().take(2).reduce { accumulator, value ->
      accumulator + value
   }
   println(sum)
}
Copy the code

End stream operator

  1. toList toSet
  2. first single
  3. reduce fold

In general, it is similar to a List or sequence

Collect and emit of a flow must be in the same coroutine context

Flow stores data in a coroutine context, and an error is reported if the coroutine is not the same

private fun simple(a) = flow {
   withContext(Dispatchers.Default) {
      for (i in 1.3.) {
         delay(100)
         emit(i)
      }
   }
}

fun main(a): Unit = runBlocking {
   simple().collect { log("$it")}}Copy the code
java.lang.IllegalStateException: Flow invariant is violated:
		Flow was collected in [CoroutineId(1), "coroutine#1":BlockingCoroutine{Active}@1e21b0c, BlockingEventLoop@3a77c1e4],
		but emission happened in [CoroutineId(1), "coroutine#1":DispatchedCoroutine{Active}@342dae2a, Dispatchers.Default].
		Please refer to 'flow' documentation or use 'flowOn' instead
Copy the code

The exception has been resolved using flowOn

flowOn

FlowOn changes the context in which this flow is executed to the given context

private fun simple(a) = flow {
   println("${Thread.currentThread()}: b")
   for (i in 1.3.) {
      kotlinx.coroutines.delay(100)
      emit(i)
   }
}.flowOn(Dispatchers.Default)

fun main(a): Unit = runBlocking {
   println("${Thread.currentThread()}: a")
   simple().collect { println("${Thread.currentThread()}: $it")}}Copy the code

FlowOn can be combined

withContext(Dispatchers.Main) {
    val singleValue = intFlow // will be executed on IO if context wasn't specified before
        .map { ... } // Will be executed in IO
        .flowOn(Dispatchers.IO)
        .filter { ... } // Will be executed in Default
        .flowOn(Dispatchers.Default)
        .single() // Will be executed in the Main
}
Copy the code

Remember to look at the English in the code above

Buffer the buffer

private fun simple(a): Flow<Int> = flow {
   for (i in 1.3.) {
      delay(100)
      emit(i)
   }
}

fun main(a) = runBlocking<Unit> {
   val time = measureTimeMillis {
      simple().collect { value ->
         delay(300)
         println(value)
      }
   }
   println("Collected in $time ms")}Copy the code

Normally, it would take 1200ms+ to execute the above code, but we can consider using buffer

The previous flowOn is implemented in this way, but the previous flowOn is implicit and shown here

private fun simple(a): Flow<Int> = flow {
   for (i in 1.3.) {
      delay(100)
      emit(i)
   }
}

fun main(a) = runBlocking<Unit> {
   val time = measureTimeMillis {
      simple().buffer().collect { value ->
         delay(300)
         println(value)
      }
   }
   println("Collected in $time ms")}Copy the code

Simple ().buffer().collect Then the execution time will be about 1000ms

for (i in 1.3.) {
  delay(100)
  emit(i)
}
Copy the code

It is similar to three coroutines, waiting for 100 and then emitting together

This way, our code only needs to wait three 300ms plus one 100ms, saving two 100ms waits

To avoid blocking the main thread, we can also have other threads wait for the delay function, as follows:

private fun simple(a): Flow<Int> = flow {
   for (i in 1.3.) {
      delay(100)
      emit(i)
   }
}

fun main(a) = runBlocking<Unit> {
   val time = measureTimeMillis {
      launch {
         simple().buffer().collect { value ->
            delay(300)
            log(value) 
         }
      }
   }
   println("Collected in $time ms")}Copy the code

Skip intermediate time-consuming operations

fun main(a) = runBlocking<Unit> {
   val time = measureTimeMillis {
      simple().conflate().collect { value ->
         delay(300)
         log("$value")
      }
   }
   println("Collected in $time ms")}Copy the code
[main] Collected in 2 ms
[main] 1
[main] 2
[main] 3
Copy the code

Combine multiple streamsflatMapConcat

fun requestFlow(i: Int): Flow<String> = flow {
   emit("$i: first")
   delay(500)
   emit("$i: second")}Copy the code

Normally, if we go like this:

println(measureTimeMillis {
   (1.3.).asFlow().onEach { delay(100) }.map { requestFlow(it) }.collect { value ->
      value.collect {
         log("a $it")}}})Copy the code

If map is used directly, flow

> will appear, so the above need to collect twice

This is where flatMapConcat comes in handy

println(measureTimeMillis {
   (1.3.).asFlow().onEach { delay(100) }.flatMapConcat { requestFlow(it) }.collect { log("b $it")}})Copy the code

flatMapMergeConcurrent combined stream

Another flatness approach, flatMapMerge, takes advantage of the concurrent approach of combining multiple streams into a single stream, but is limited to a maximum of 16 combined streams. We can make this change on the JVM with the DEFAULT_CONCURRENCY_PROPERTY_NAME property

println(measureTimeMillis {
   (1.3.).asFlow().onEach { delay(100) }.flatMapMerge { requestFlow(it) }.collect { value -> log(value) }
})
Copy the code

You’ll notice that the first two ways are 19xxms 18xxms, whereas here we usually have 9xxms, which is less time

Abnormal flow

private fun simple(a): Flow<Int> = flow {
   for (i in 1.3.) {
      delay(100)
      check(i <= 1) { "collection $i" }
      emit(i)
   }
}

fun main(a): Unit = runBlocking {
   try {
      simple().collect { value -> log(value) }
   } catch (e: Exception) {
      log(e)
   }
}
Copy the code
[main] 1
[main] java.lang.IllegalStateException: collection 2
Copy the code
private fun simple(a): Flow<String> = flow {
   for (i in 1.3.) {
      delay(100)
      emit(i)
   }
}.map { value ->
   check(value <= 1) { "Wrong,$value" }
   "zhazha"
}

fun main(a): Unit = runBlocking {
   try {
      simple().collect { value -> log(value) }
   } catch (e: Exception) {
      log(e)
   }
}
Copy the code
[main] zhazha [main] Java lang. An IllegalStateException: error,2
Copy the code

Both of these methods will return an error, but we don’t usually use them that way. We can use the stream’s own catch to catch the exception

private fun simple(a): Flow<String> = flow {
   for (i in 1.3.) {
      delay(100)
      emit(i)
   }
}.catch { cause -> println("cause $cause") }.map { value ->
   check(value <= 1) { "Wrong,$value" }
   "zhazha"
}
Copy the code

But the above code can’t catch the exception because it’s too late, because it throws an exception on the map, and our catch is before the map

private fun simple(a): Flow<String> = flow {
   for (i in 1.3.) {
      delay(100)
      emit(i)
   }
}.map { value ->
   check(value <= 1) { "Wrong,$value" }
   "zhazha"
}.catch { cause -> println("cause $cause")}Copy the code

Exceptions like the one above can be caught

Of course, now that we’ve caught the exception, we have several options, such as:

  1. Emit a session to the stream
  2. Throw an exception directly
  3. Ignore it, write it to the log or do something else

Declarative exception catching

Place exception catching after the expectation declaration and then trigger it with Collect

private fun simple(a) = flow {
   for (i in 1.3.) {
      delay(100)
      emit(i)
   }
}

fun main(a): Unit = runBlocking {
    simple().onEach {
       check(it <= 2)
       log(it)
    }.catch { cause -> log("Cause $cause") }.collect()
}
Copy the code

Functions that start with on in flow are similar to JavaScript events, such as onClick, which is a mouse click, and OnEach, which is triggered during traversal

Completion flows (imperative and declarative)

imperative

private fun simple(a) = (1.3.).asFlow()

fun main(a): Unit = runBlocking {
   try {
      simple().collect { value -> log(value) }
   } finally {
      log("Done")}}Copy the code

declarative

private fun simple(a) = (1.3.).asFlow()

fun main(a): Unit = runBlocking {
   simple().onCompletion { cause ->
      log("Done")
   }.collect { value -> log(value) }
}
Copy the code

The advantage of declarative is that we can also find out if our flow has exceptions

private fun simple(a) = flow {
   for (i in (1.3.)) {
      delay(100)
      check(i <= 1) { "Error" }
      emit(i)
   }
}

fun main(a): Unit = runBlocking {
    simple().onCompletion { cause ->
       if(cause ! =null) {
          log("Flow completed with $cause")}} instead!catch { cause ->
       log("Caught exception $cause")
    }.collect { value -> log(value) }
}
Copy the code

Specifies that flow runs on the given coroutine:launchIn

The disadvantage of the previous collect collector is that the code after collect waits

// Simulate the event stream
fun events(a): Flow<Int> = (1.3.).asFlow().onEach { delay(100)}fun main(a) = runBlocking<Unit> {
   events()
      .onEach { event -> println("Event: $event") }
      .collect() // <-- the thread will wait here
   println("Done") < span style = "max-width: 100%; clear: both; min-height: 1em
}
Copy the code
[main] Event: 1
[main] Event: 2
[main] Event: 3
[main] Done
Copy the code

If you don’t want to collect wait, you can use launchIn(coroutineScope) to have tasks executed in a separate coroutine

// Simulate the event stream
fun events(a): Flow<Int> = (1.3.).asFlow().onEach { delay(100)}fun main(a): Unit = runBlocking {
    events().onEach { event-> log("Event: $event") }
       .launchIn(this)
   log("Done")}Copy the code
[main] Done
[main] Event: 1
[main] Event: 2
[main] Event: 3
Copy the code

Done is printed first

We can also give the stream to another coroutine to execute in the background

fun main(a): Unit = runBlocking {
   val job = events().onEach { event -> log("Event: $event") }
      .launchIn(CoroutineScope(Dispatchers.Default))
   log("Done")
   job.join()
}
Copy the code

The interceptor

Expand knowledge, this space novice best don’t look at, such as learning almost began to analyze the source code, come again

What does the interceptor do?

The interceptor in Kotlin can intercept the recovery call, so it can also intercept our coroutine and hand it off to the thread we want to execute

How does the interceptor work?

fun main(a): Unit = runBlocking {
	launch(LogInterceptor()) {
		log("launch ...")}}class LogInterceptor : ContinuationInterceptor {
	override val key: CoroutineContext.Key<*>
		get() = ContinuationInterceptor
	
	override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> {
		return LogContinuation(continuation)
	}
}

class LogContinuation<T>(private val continuation: Continuation<T>) : Continuation<T> {
	override val context: CoroutineContext
		get() = continuation.context
	
	override fun resumeWith(result: Result<T>) {
		log("intercept before")
		continuation.resumeWith(result)
		log("intercept after")}}Copy the code

First we need to know what a Continuation is.

public interface Continuation<in T> {
    public val context: CoroutineContext

    public fun resumeWith(result: Result<T>)
}
Copy the code

It has an interface that stores a coroutine context and the resumeWith function, which is probably used for callbacks

Now let’s dive right into the source code,

Start with the launch function:

public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope. () - >Unit
): Job {
    // combine the CoroutineContext, where CoroutineContext can be treated as an ArrayList
    val newContext = newCoroutineContext(context)
    val coroutine = if (start.isLazy)
        LazyStandaloneCoroutine(newContext, block) else
        StandaloneCoroutine(newContext, active = true)
    coroutine.start(start, coroutine, block)
    return coroutine
}
Copy the code

Our LogInterceptor is stored in the context, and now we just have to follow it to find out when ContinuationInterceptor intercepts

Start: CoroutineStart = CoroutineStart.DEFAULT

NewContext can be used as an ArrayList to combine multiple Element parameters (as mentioned earlier, coroutine context can combine).

The following are the functions of the coroutine context combination, which can be omitted

public operator fun plus(context: CoroutineContext): CoroutineContext =
    if (context === EmptyCoroutineContext) this else // fast path -- avoid lambda creation
        context.fold(this) { acc, element ->
            val removed = acc.minusKey(element.key)
            if (removed === EmptyCoroutineContext) element else {
                // make sure interceptor is always last in the context (and thus is fast to get when present)
                val interceptor = removed[ContinuationInterceptor]
                if (interceptor == null) CombinedContext(removed, element) else {
                    val left = removed.minusKey(ContinuationInterceptor)
                    if (left === EmptyCoroutineContext) CombinedContext(element, interceptor) else
                        CombinedContext(CombinedContext(left, element), interceptor)
                }
            }
        }
Copy the code

Back to the core code that needs to be analyzed

Coroutine. Start (start, coroutine, block). Remember that the second parameter contains our LogInterceptor interceptor

internal fun <T, R> startCoroutineImpl(
    start: CoroutineStart,
    receiver: R,
    completion: Continuation<T>,
    onCancellation: ((cause: Throwable) - >Unit)? , block:suspend R. () - >T
) = when (start) {
    CoroutineStart.DEFAULT -> block.startCoroutineCancellable(receiver, completion, onCancellation)
    CoroutineStart.ATOMIC -> block.startCoroutine(receiver, completion)
    CoroutineStart.UNDISPATCHED -> block.startCoroutineUndispatched(receiver, completion)
    CoroutineStart.LAZY -> Unit // will start lazily
}
Copy the code

Start: CoroutineStart = CoroutineStart.DEFAULT go

CoroutineStart.DEFAULT -> block.startCoroutineCancellable(receiver, completion, onCancellation)

Finally we found this code:

internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(
    receiver: R, completion: Continuation<T>,
    onCancellation: ((cause: Throwable) -> Unit)? = null
) =
    runSafely(completion) {
        createCoroutineUnintercepted(receiver, completion).intercepted().resumeCancellableWith(Result.success(Unit), onCancellation)
    }
Copy the code

CreateCoroutineUnintercepted not analysis, but doesn’t look like, we will follow up intercepted

public actual fun <T> Continuation<T>.intercepted(a): Continuation<T> =
    (this as? ContinuationImpl)? .intercepted() ?:this
Copy the code

Enter and see the final code:

public fun intercepted(a): Continuation<Any? > = intercepted ? : (context[ContinuationInterceptor]? .interceptContinuation(this) ?: this)
            .also { intercepted = it }
Copy the code

ContinuationInterceptor is just like our previous one

class LogInterceptor : ContinuationInterceptor {
	override val key: CoroutineContext.Key<*>
		get() = ContinuationInterceptor
}
Copy the code

The key value is the same, and the operation above is the same as the operation below

List<Element> context = new ArrayList();
intercepted = context[ContinuationInterceptor];
Copy the code

We finally got our LogInterceptor and returned it

It will eventually come back to this:

internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(
    receiver: R, completion: Continuation<T>,
    onCancellation: ((cause: Throwable) -> Unit)? = null
) =
    runSafely(completion) {
        createCoroutineUnintercepted(receiver, completion).intercepted().resumeCancellableWith(Result.success(Unit), onCancellation)
    }
Copy the code

We change the code above slightly, better understand LogInterceptor. ResumeCancellableWith (Result. Success (Unit), onCancellation)

ResumeCancellableWith ()

public fun <T> Continuation<T>.resumeCancellableWith(
    result: Result<T>,
    onCancellation: ((cause: Throwable) - >Unit)? = null
): Unit = when (this) {
    is DispatchedContinuation -> resumeCancellableWith(result, onCancellation)
    else -> resumeWith(result)
}
Copy the code

The LogInterceptor resumeCancellableWith resumeCancellableWith is a DispatchedContinuation. Our code will just go else -> resumeWith(result)

And eventually you’ll notice that it’s back in our code

class LogContinuation<T>(private val continuation: Continuation<T>) : Continuation<T> {
	override val context: CoroutineContext
		get() = continuation.context
	
    // The code returns to this function
	override fun resumeWith(result: Result<T>) {
		log("intercept before")
		continuation.resumeWith(result)
		log("intercept after")}}Copy the code

Then we finish resumeWith the function we defined