Introduction to Kotlin Flow

The Flow library was added after the release of Kotlin Coroutines 1.3.2.

The official document gives a simple one-sentence introduction:

Flow — Cold asynchronous Stream with Flow Builder and comprehensive operator set (Filter, map, etc);

Flow from the documentation, it is similar to an RxJava Observable. Observable can be Cold or Hot.

Ii. Basic use of Flow

Flow can return multiple asynchronously computed values, such as the following Flow Builder:

        flow {
            for (i in 1.. 5) {
                delay(100)
                emit(i)
            }
        }.collect{
            println(it)
        }
Copy the code

The Flow interface has only one collect function

public interface Flow<out T> {

    @InternalCoroutinesApi
    public suspend fun collect(collector: FlowCollector<T>)
}
Copy the code

If you are familiar with RxJava, you can understand that collect() corresponds to subscribe() and emit() corresponds to onNext().

2.1 create a flow

In addition to the flow Builder just shown, there are several other ways to create flows:

flowOf()

    flowOf(1.2.3.4.5)
        .onEach {
            delay(100)
        }
        .collect{
            println(it)
        }
Copy the code

asFlow()

    listOf(1.2.3.4.5).asFlow()
        .onEach {
            delay(100)
        }.collect {
            println(it)
        }
Copy the code

channelFlow()

    channelFlow {
        for (i in 1.. 5) {
            delay(100)
            send(i)
        }
    }.collect{
        println(it)
    }
Copy the code

The final channelFlow Builder and flow Builder is a certain difference.

Flow is Cold Stream. In the absence of switching threads, the producer and consumer are synchronous and non-blocking. Channel is Hot Stream. ChannelFlow implements the producer and consumer asynchronous non-blocking model.

The following code, which shows how to use flow Builder, takes approximately 1 second:

fun main(a) = runBlocking {

    val time = measureTimeMillis {
        flow {
            for (i in 1.. 5) {
                delay(100)
                emit(i)
            }
        }.collect{
            delay(100)
            println(it)
        }
    }

    print("cost $time")}Copy the code

In the case of channelFlow Builder, it takes approximately 700 milliseconds:

fun main(a) = runBlocking {

    val time = measureTimeMillis{
        channelFlow {
            for (i in 1.. 5) {
                delay(100)
                send(i)
            }
        }.collect{
            delay(100)
            println(it)
        }
    }

    print("cost $time")}Copy the code

Of course, flow takes about 700 milliseconds to switch between threads, which is similar to channelFlow Builder.

fun main(a) = runBlocking {

    val time = measureTimeMillis{
        flow {
            for (i in 1.. 5) {
                delay(100)
                emit(i)
            }
        }.flowOn(Dispatchers.IO)
            .collect {
                delay(100)
                println(it)
            }
    }

    print("cost $time")}Copy the code

2.2 Switching Threads

Compared with RxJava, which requires observeOn and subscribeOn to switch threads, Flow is simpler. Just use flowOn. The following example shows that both the Flow Builder and map operators are affected by flowOn.

    flow {
        for (i in 1.. 5) {
            delay(100)
            emit(i)
        }
    }.map {
        it * it
    }.flowOn(Dispatchers.IO)
        .collect {
            println(it)
        }
Copy the code

When collect() specifies which thread, you need to see which CoroutineScope the entire flow is under.

For example, the following code collect() is in the main thread:

fun main(a) = runBlocking {

    flow {
        for (i in 1.. 5) {
            delay(100)
            emit(i)
        }
    }.map {
        it * it
    }.flowOn(Dispatchers.IO)
        .collect {
            println("${Thread.currentThread().name}: $it")}}Copy the code

Execution result:

main: 1
main: 4
main: 9
main: 16
main: 25
Copy the code

It’s important to note that you don’t use withContext() to switch flow threads.

2.3 the flow to cancel

If flow is suspended within a suspended function, then flow can be cancelled, otherwise it cannot be cancelled.

fun main(a) = runBlocking {

    withTimeoutOrNull(2500) {
        flow {
            for (i in 1.. 5) {
                delay(1000)
                emit(i)
            }
        }.collect {
            println(it)
        }
    }

    println("Done")}Copy the code

Execution result:

1
2
Done
Copy the code

2.4 Terminal flow operators

The Flow API is somewhat similar to the Java Stream API. It also has Intermediate Operations and Terminal Operations.

The Terminal operator of Flow can be the suspend function, such as collect, single, Reduce, toList, etc. It can also be the launchIn operator to use flow within a specified CoroutineScope.

@ExperimentalCoroutinesApi // tentatively stable in 1.3.0
public fun <T> Flow<T>.launchIn(scope: CoroutineScope): Job = scope.launch {
    collect() // tail-call
}
Copy the code

Clean up the Flow Terminal operator

  • collect
  • single/first
  • toList/toSet/toCollection
  • count
  • fold/reduce
  • launchIn/produceIn/broadcastIn

Related articles in this series:

Kotlin Coroutines Flow Series (ii) Flow VS RxJava2

Kotlin Coroutines Flow Series (3) Exception handling

Kotlin Coroutines Flow series (4) Thread operations

Kotlin Coroutines Flow series (5) Other operators