Flow thread operation

7.1 More simplified thread switching

Flow is much more thread friendly than RxJava’s multithreaded learning curve.

In the previous Kotlin Coroutines Flow series (1) basic Usage of Flow, we introduced the Flow switching thread and the flowOn operator.

Flow only needs to use the flowOn operator, without the deep understanding of the difference between observeOn and subscribeOn that is required in RxJava.

7.2 flowOn VS RxJava observeOn

RxJava’s observeOn operator, which receives a Scheduler parameter that specifies that downstream operations run on a particular thread Scheduler.

The flowOn operator for Flow, which accepts a CoroutineContext parameter, affects upstream operations.

Such as:

fun main(a) = runBlocking {

    flow {
        for (i in 1.. 5) {
            delay(100)
            emit(i)
        }
    }.map {
            it * it
        }.flowOn(Dispatchers.IO)
        .collect {
            println("${Thread.currentThread().name}: $it")}}Copy the code

Both the Flow Builder and map operators are affected by flowOn and use the Dispatchers.io thread pool.

Such as:

val customerDispatcher = Executors.newFixedThreadPool(5).asCoroutineDispatcher()

fun main(a) = runBlocking {

    flow {
        for (i in 1.. 5) {
            delay(100)
            emit(i)
        }
    }.map {
            it * it
        }.flowOn(Dispatchers.IO)
        .map {
            it+1
        }
        .flowOn(customerDispatcher)
        .collect {
            println("${Thread.currentThread().name}: $it")}}Copy the code

Both the Flow Builder and the map operators are affected by the two Flowons, where the Flow Builder and the first Map operator switch to the specified customerDispatcher thread pool as in the example above.

7.3 Buffer Implements concurrent operations

In Kotlin Coroutines Flow series (2) Flow VS RxJava2, the buffer operator corresponding to the buffer strategy in RxJava Backpressure was introduced.

The fact that the buffer operator can execute tasks concurrently is an alternative to using the flowOn operator, except that Dispatchers cannot be explicitly specified.

Such as:

fun main(a) = runBlocking {
    val time = measureTimeMillis {
        flow {
            for (i in 1.. 5) {
                delay(100)
                emit(i)
            }
        }
        .buffer()
        .collect { value ->
            delay(300)
            println(value)
        }
    }
    println("Collected in $time ms")}Copy the code

Execution Result:

1
2
3
4
5
Collected in 1676 ms
Copy the code

In the above example, the time taken by all the delays is 2000ms. However, after executing emit concurrently through the buffer operator and then executing collect sequentially, the time taken is around 1700ms.

If you remove the buffer operator.

fun main(a) = runBlocking {
    val time = measureTimeMillis {
        flow {
            for (i in 1.. 5) {
                delay(100)
                emit(i)
            }
        }
        .collect { value ->
            delay(300)
            println(value)
        }
    }
    println("Collected in $time ms")}Copy the code

Execution Result:

1
2
3
4
5
Collected in 2039 ms
Copy the code

It took 300 ms more than just now.

7.4 Parallel Operations

Before we look at parallel operations, let’s look at the difference between concurrency and parallelism.

Concurrency, “concurrency,” is when a single processor works on multiple tasks at the same time.

Parallelism: The parallelism of multiple or multicore processors handling different tasks at the same time. Parallelism refers to multiple concurrent events occurring at the same time and has the meaning of concurrency, while concurrency does not necessarily mean parallelism.

RxJava can implement parallelism with the flatMap operator, as well as with the ParallelFlowable class.

The flatMap operator is used as an example to implement RxJava parallelism:

        Observable.range(1.100)
                .flatMap(new Function<Integer, ObservableSource<String>>() {
                    @Override
                    public ObservableSource<String> apply(Integer integer) throws Exception {
                        return Observable.just(integer)
                                .subscribeOn(Schedulers.io())
                                .map(new Function<Integer, String>() {

                                    @Override
                                    public String apply(Integer integer) throws Exception {
                                        returninteger.toString(); }}); } }) .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String str) throws Exception { System.out.println(str); }});Copy the code

Flow also has the corresponding flatMapMerge operator to achieve parallelism.

fun main(a) = runBlocking {

    val result = arrayListOf<Int> ()for (index in 1.100.){
        result.add(index)
    }

    result.asFlow()
        .flatMapMerge {
            flow {
                emit(it)
            }
            .flowOn(Dispatchers.IO)
        }
        .collect { println("$it")}}Copy the code

Overall, Flow is a little more compact than RxJava.

Related articles in the series:

Kotlin Coroutines Flow series (1) Basic use of Flow

Kotlin Coroutines Flow series (2) Flow VS RxJava2

Kotlin Coroutines Flow series (3) exception handling

Kotlin Coroutines Flow series (5) Other operators