Flow thread operation
7.1 More simplified thread switching
Flow is much more thread friendly than RxJava’s multithreaded learning curve.
In the previous Kotlin Coroutines Flow series (1) basic Usage of Flow, we introduced the Flow switching thread and the flowOn operator.
Flow only needs to use the flowOn operator, without the deep understanding of the difference between observeOn and subscribeOn that is required in RxJava.
7.2 flowOn VS RxJava observeOn
RxJava’s observeOn operator, which receives a Scheduler parameter that specifies that downstream operations run on a particular thread Scheduler.
The flowOn operator for Flow, which accepts a CoroutineContext parameter, affects upstream operations.
Such as:
fun main(a) = runBlocking {
flow {
for (i in 1.. 5) {
delay(100)
emit(i)
}
}.map {
it * it
}.flowOn(Dispatchers.IO)
.collect {
println("${Thread.currentThread().name}: $it")}}Copy the code
Both the Flow Builder and map operators are affected by flowOn and use the Dispatchers.io thread pool.
Such as:
val customerDispatcher = Executors.newFixedThreadPool(5).asCoroutineDispatcher()
fun main(a) = runBlocking {
flow {
for (i in 1.. 5) {
delay(100)
emit(i)
}
}.map {
it * it
}.flowOn(Dispatchers.IO)
.map {
it+1
}
.flowOn(customerDispatcher)
.collect {
println("${Thread.currentThread().name}: $it")}}Copy the code
Both the Flow Builder and the map operators are affected by the two Flowons, where the Flow Builder and the first Map operator switch to the specified customerDispatcher thread pool as in the example above.
7.3 Buffer Implements concurrent operations
In Kotlin Coroutines Flow series (2) Flow VS RxJava2, the buffer operator corresponding to the buffer strategy in RxJava Backpressure was introduced.
The fact that the buffer operator can execute tasks concurrently is an alternative to using the flowOn operator, except that Dispatchers cannot be explicitly specified.
Such as:
fun main(a) = runBlocking {
val time = measureTimeMillis {
flow {
for (i in 1.. 5) {
delay(100)
emit(i)
}
}
.buffer()
.collect { value ->
delay(300)
println(value)
}
}
println("Collected in $time ms")}Copy the code
Execution Result:
1
2
3
4
5
Collected in 1676 ms
Copy the code
In the above example, the time taken by all the delays is 2000ms. However, after executing emit concurrently through the buffer operator and then executing collect sequentially, the time taken is around 1700ms.
If you remove the buffer operator.
fun main(a) = runBlocking {
val time = measureTimeMillis {
flow {
for (i in 1.. 5) {
delay(100)
emit(i)
}
}
.collect { value ->
delay(300)
println(value)
}
}
println("Collected in $time ms")}Copy the code
Execution Result:
1
2
3
4
5
Collected in 2039 ms
Copy the code
It took 300 ms more than just now.
7.4 Parallel Operations
Before we look at parallel operations, let’s look at the difference between concurrency and parallelism.
Concurrency, “concurrency,” is when a single processor works on multiple tasks at the same time.
Parallelism: The parallelism of multiple or multicore processors handling different tasks at the same time. Parallelism refers to multiple concurrent events occurring at the same time and has the meaning of concurrency, while concurrency does not necessarily mean parallelism.
RxJava can implement parallelism with the flatMap operator, as well as with the ParallelFlowable class.
The flatMap operator is used as an example to implement RxJava parallelism:
Observable.range(1.100)
.flatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Integer integer) throws Exception {
return Observable.just(integer)
.subscribeOn(Schedulers.io())
.map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
returninteger.toString(); }}); } }) .subscribe(new Consumer<String>() {
@Override
public void accept(String str) throws Exception { System.out.println(str); }});Copy the code
Flow also has the corresponding flatMapMerge operator to achieve parallelism.
fun main(a) = runBlocking {
val result = arrayListOf<Int> ()for (index in 1.100.){
result.add(index)
}
result.asFlow()
.flatMapMerge {
flow {
emit(it)
}
.flowOn(Dispatchers.IO)
}
.collect { println("$it")}}Copy the code
Overall, Flow is a little more compact than RxJava.
Related articles in the series:
Kotlin Coroutines Flow series (1) Basic use of Flow
Kotlin Coroutines Flow series (2) Flow VS RxJava2
Kotlin Coroutines Flow series (3) exception handling
Kotlin Coroutines Flow series (5) Other operators