Coroutine Flow and RxJava are both streaming data processing frameworks, so they are often compared. This article will compare their data types.

The difference between Rx and Flow in single data support

RxJava supports multiple data types

  • Observable: Stream data type
  • FlowableSimilar to Observable, it supports back pressure
  • Single: Single data type. Only one data can and must be sent
  • Maybe: Single data type, sending zero or one data
  • Completable: does not transmit any data, only notifies the end of the stream.

Above, Single

, Maybe

, and Completable can emit at most one data (single-shot data type). Coroutine Flow, on the other hand, only provides Flow

, which refers to the streaming data of Observable

or Flowable

(Flow naturally supports back pressure). As a streaming framework, why does Rx need to support single data types when Flow does not?




Rx supports single data for three main reasons (or purposes) that are not a problem for Flow:

RxJava supports single sending The Flow does not support single Flow
thread RxJava is also an asynchronous framework that providesobserveOn,subscribeOnEtc thread operators. Rx is also one of the best options for single-shot data in the Java era, where there is a lack of multi-threaded tools for weighing hands. By Kotlin’s time, Coroutine provided enough asynchronous processing facilities for single data to be implemented using suspend functions. Flow thread switching is also built on top of Coroutine.
Code directly RxJava’s operators help single data make chained calls, avoiding callbacks. Such as throughzip.concatThe combination of single data, or based onswitchIfEmptyAnd so on to achieve single data selection logic. Coroutine can do asynchrony using synchronous calls, eliminating the need for chain-call syntax to avoid callbacks.
Type conversion Many business scenarios involve converting single and streaming data, and RxJava provides operator support for these conversions. Such astoObservableorflatMapObservableConvert single data into streaming data, and pass the reversefirst.toListEtc. Convert streaming data into single data Flow also provides bidirectional conversions and is much simpler, for exampletoListDirectly output the data type after unpackingT, there is no need to define a specific boxing type for single data.

In summary, RxJava complements the language in many ways, and with great power comes great responsibility. Rx has to take into account both single and streaming data scenarios. Kotlin addresses most of the development needs for asynchronous scenarios with Coroutine. Flow is just about streaming data. Although you can still use Flow to receive and send singletons, this is not officially recommended and there is no additional singletons available.

Next, take a look at how Coroutine supports single data by comparing it to Rx.

thread

Here is an example to compare thread switching between Rx and Coroutine.

First, we simulate a single data request in RxJava:

fun readFromRemoteRx(data: String = "data"): Single<String> {
    return Single.create { it.onSuccess(data) }
        .delay(100, TimeUnit.MILLISECONDS)
        .doOnSuccess { println("read from remote: $it") }
        .subscribeOn(Schedulers.io())
}
Copy the code

As above, delay simulates the delay of I/O and subscribeOn specifies that the data request occurs in the I/O thread.

As mentioned earlier, such things as thread switching are no longer Flow’s primary responsibility. In Coroutine, a single request uses a suspend function:

suspend fun readFromRemote(data: String = "data"): String {
    delay(100)
    println("read from remote: $data")
    return data
}
Copy the code

As above, we use a suspend function to define a single piece of data, which in the coroutine can be switched to the IO thread by using the withContext.

Code directly

Coroutine’s code for handling single data is much cleaner than Rx’s.

Select logic

Let’s look at an example of single-shot data selection logic. In Rx we make selections using the operator:

fun readFromCacheRx(data: String? = null): Maybe<String> {
    return run {
        if (data! =null) Maybe.just(data)
        else Maybe.empty()
    }.delay(100, TimeUnit.MILLISECONDS)
        .doOnSuccess { println("read from cache: $it")}}fun test(a) {
    readFromCacheRx(null) // pass "data" to check when cache has data
        .switchIfEmpty(readFromRemoteRx())
        .subscribeOn(Schedulers.io())
        .test()
}
Copy the code

As above, readFromCacheRx uses the Maybe type to simulate the results of a request from a local data source, requesting network remote data when no data is available locally. Rx does conditional selection logic based on switchIfEmpty, otherwise we can only make judgments in asynchronous callbacks.

In Kotlin’s day, we implemented the selection logic in Coroutine with suspend functions:

suspend fun readFromCache(data: String? = null): String? {
    delay(100)
    println("read from cache: $data")
    return data
}

fun test(a) {
    runBlocking {
        withContext(Dispatchers.IO) {
            val data= readFromCache() ? : readFromRemote() } } }Copy the code

ReadFromCache returns a Nullable type, directly using? Based on the synchronous call advantage of coroutines, any control statement can be written imperatively.

Combinational logic

As an example of combinative logic, Rx uses ZIP to combine two single streams of data into a new single stream:

fun test(a) {
    readFromRemoteRx().zipWith(readFromRemote2Rx()) { res, res2 -> 
        "$res & $res2"
    }.doOnSuccess { println("read from remote: $it") }
        .subscribeOn(Schedulers.io())
        .test()
}

/*
output:
-------------------
read from remote: data & data
*/
Copy the code

Coroutine implements the same logic very simply, using async + await with an imperative statement:

fun test(a) {
    runBlocking {
        val data = async { readFromRemote() }.await() +
                async { readFromRemote2() }.await()
        println("read from remote: $it")}}Copy the code

Type conversion

Next, compare single-shot versus streaming data conversion.

Single > stream

Rx can convert a single type to an Observable using toObservable or flatMapObservable:

readFromCacheRx()
    .flatMapObservable { Observable.just(it) }
    .doOnNext { println("read from cache: $it") }
    .doOnComplete { println("complete") }
    .test()
    
/ * the output: -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- read from cache: null complete * /
Copy the code

Since readFromCacheRx does not emit any data, there is no log output for doOnNext.

The single flow of coroutines is very simple, flow {… } is a constructor for Flow, and can be called internally directly to suspend functions, or to switch threads with withContext if necessary.

runBlocking { flow { readFromCache()? .let { emit(it) } } .onCompletion { println("complete") }
        .collect { println("next: $it")}}Copy the code

We often combine multiple single rounds of data to implement some business logic. For example, Rx uses merge to combine read results from multiple data sources. When data is in the local Cache, data is sent first, which facilitates quick display of data on the first screen

Observable.merge(
    readFromCacheRx().toObservable(),
    readFromRemoteRx().toObservable()
).test()
Copy the code

The same logic can be implemented in Flow based on suspended functions.

flowOf(
    flow { emit(readFromRemote()) }, flow { emit(readFromRemote()) })
    .flattenMerge()
    .collect { println("$it")}Copy the code

Stream > Single

In Rx we can convert an Observable to Single data:

fun test(a) {
    Observable.just(1.2.3)
        .toList()
        .doOnSuccess { println("$it") }
        .test()
        
    Observable.just(1.2.3)
        .first()
        .doOnSuccess { println("$it") }
        .test()
}

/*
output:
----------
[1, 2, 3]
1
*/
Copy the code

Flow also provides similar operators, such as first and toList, and directly outputs the unpacked data without collecting it through Collect

data = flowOf(1.2.3).toList()
println("$data")
Copy the code

Streaming > Single > streaming

In some business scenarios, multiple conversions such as streaming > single > streaming may be required, often involving asynchronous conversions such as flatMap or concatMap.

Observable.just(1.3.5)
    .concatMapSingle { readFromRemoteRx("$it") }
    .doOnComplete { println("complete") }
    .subscribe { println("next: $it")}/*
output:
---------------------
read from remote: 1
next: 1
read from remote: 3
next: 3
read from remote: 5
next: 5
complete
*/

Copy the code

In the example above, we make three single requests in a data stream in sequence and return the result. Rx also provides a parallel version of flatMapSingle as opposed to the serial concatMapSingle. The same logic can be implemented with Flow as follows:

runBlocking {
    flowOf(1.3.5)
        .flatMapConcat { flow { emit(readFromRemote("$it")) } }
        .onCompletion { println("complete") }
        .collect { println("next: $it")}}Copy the code

The flatMapConcat of Flow has the same function as the method of the same name of Rx, which is to perform the data Flow after flatMap in serial mode again. Flow also provides scenarios where flatMapMerge processes parallelism, equivalent to the flatMap in Rx. For the sake of clear naming, the flatMap method of Flow has been Deprecate renamed flatMapMerge.

FlatMapConcat or flatMapMerge builds a Flow

each time it transforms, which is unnecessary overhead for single data, so we can use map to simplify:

runBlocking {
    flowOf(1.3.5)
        .map { readFromRemote("$it") }
        .onCompletion { println("complete") }
        .collect { println("next: $it")}}Copy the code

The effect is equivalent to flatMapConcat, note that maps cannot be used in parallel scenarios, even if you switch new threads in the map. A suspended function can be called in the map {} of Flow, so asynchronous logic can be implemented based on coroutines, while the map of Rx can only be executed synchronously. Therefore, it is inaccurate to compare the map of Flow to the flatMap of Rx. Because the map of a Flow does not serially transmit the entire data stream, the map suspends until the current data is executed.

Streaming > Comletable

Rx also provides the Completable type, which allows us to insert logic into streaming processing without returning a result, such as the following scenario

fun saveToCacheRx(data: String): Completable {
    return Completable
        .fromAction { println("saved to cache: $data") }
        .delay(100, TimeUnit.MILLISECONDS)
}

Observable.just(1.2.3)
    .flatMapCompletable { saveToCacheRx("$it") }
    .doOnComplete { println("complete") }
    .subscribe { println("next: $it")}/*
output:
-------------------
saved to cache: 1
saved to cache: 2
saved to cache: 3
complete
*/
Copy the code

SaveToCacheRx emulates a data store, and the Completable has no actual return value and is only used to notify that the store has ended, so there is no next in the log, only the final complete.

How does Flow implement the same logic?

suspend fun saveToCache(data: String) {
    delay(100)
    println("saved to cache: $data")
}

runBlocking {
    flowOf(1.2.3)
        .flatMapMerge { flow<String> { saveToCache("$it") } }
        .onCompletion { println("complete") }
        .collect { println("next: $it")}/*
output:
-------------------
saved to cache: 1
saved to cache: 2
saved to cache: 3
complete
*/
Copy the code

As above, the saveToCache of the suspended function returns no value. flow { … } after the execution of the suspended function called in}, subsequent execution of the Flow continues without notification via onComplete as in Rx. Since the suspended function does not return any values, the next log is not printed.

conclusion

In the Era of Java, due to the lack of language ability, RxJava needs to deal with single data, while in the era of Kotlin, suspending functions to deal with single data is enough. Flow is not the best solution to deal with single data, so we should avoid the abuse of Flow in future selection. With a glimpse of the power of Kotlin and coroutines, we can expect to see fewer and fewer RxJava usage scenarios in the future.