• RxJava vs. Kotlin Coroutines, a quick look
  • By David Karnok
  • The Nuggets translation Project
  • Permanent link to this article: github.com/xitu/gold-m…
  • Translator: PhxNirvana
  • Proofreader: Jamweak, Jerry-Shao

The introduction

Does Kotlin’s coroutines take the shine off RxJava and reactive programming? The answer depends on whom you ask. Fanatics and marketers don’t hesitate to say yes. If that were the case, developers would sooner or later rewrite Rx code in coroutines, or write it in coroutines from the start. Because coroutines are still experimental, current deficiencies, such as performance bottlenecks, will gradually be resolved. Therefore, the focus of this article is on ease of use rather than native performance.

The project design

Suppose you have two functions, f1 and f2, that mimic untrusted services, both of which return a number after a delay. Call both functions, sum up their return values and present them to the user. However, if it does not return within 500ms, it is no longer expected to return a value, so we cancel and retry the request a limited number of times until we finally abandon the request.

Coroutine

Coroutines work much like traditional ExecutorService – and Futuer-based toolsets, except that the underlying coroutines use suspension, state machines, and task scheduling instead of thread blocking.

First, write two functions to implement deferred operations:

suspend fun f1(i: Int) {
    Thread.sleep(if(i ! = 2) 2000Lelse 200L)
    return 1;
}

suspend fun f2(i: Int) {
    Thread.sleep(if(i ! = 2) 2000Lelse 200L)
    return 2;
}
Copy the code

Functions related to coroutine scheduling need to be called with the suspend keyword and from the coroutine context. For demonstration purposes, the function will delay 2s if the argument passed is not 2. This will cause the timeout detection to end and, on the third attempt, succeed within the specified time.

Because asynchrony always leaves the main thread at the end, we need a way to block the business logic before it completes to prevent a direct exit from the JVM. To do this, you can use runBlocking to call a function on the main thread.

fun main(arg: Array<string>) = runBlocking <unit>{

     coroutineWay()

     reactiveWay()
}

suspend func coroutineWay() {
    // TODO implement
}

func reactiveWay() {
    // TODO implement
}</unit> </string>
Copy the code

Compared to RxJava functions, the code written with coroutines is much cleaner in logic, and the code looks linear and synchronous.

suspend fun coroutineWay() {
    val t0 = System.currentTimeMillis()

    var i = 0;
    while (true) {                                       // (1)
        println("Attempt " + (i + 1) + " at T=" +
            (System.currentTimeMillis() - t0))

        var v1 = async(CommonPool) { f1(i) }             // (2)
        var v2 = async(CommonPool) { f2(i) }

        var v3 = launch(CommonPool) {                    // (3)
            Thread.sleep(500)
            println(" Cancelling at T=" +
                (System.currentTimeMillis() - t0))
            val te = TimeoutException();
            v1.cancel(te);                               // (4)
            v2.cancel(te);
        }

        try {
            val r1 = v1.await();                         // (5)
            val r2 = v2.await();
            v3.cancel();                                 // (6)
            println(r1 + r2)
            break;                                       
        } catch (ex: TimeoutException) {                 // (7)
            println(" Crash at T=" +
                (System.currentTimeMillis() - t0))
            if (++i > 2) {                               // (8)
                throw ex;
            }
        }
    }
    println("End at T=" 
        + (System.currentTimeMillis() - t0))             // (9)

}
Copy the code

Some of the output is added to see how the code works.

  1. In the case of linear programming, there is no quick way to retry an operation directly, so we need to set up a loop and retry counter I.
  2. Async (CommonPool) is used to perform asynchronous operations. This function can be started and executed immediately on some background thread. This function returns a Deferred value, which will be used later. If await() is used to get v1 as the final value, the current thread will hang and the calculation of v2 will not start until the previous resume is performed. In addition, we need a method to cancel the current operation in case of a timeout. Refer to Steps 3 and 5.
  3. If we want both operations to time out, it looks like we have to wait on another asynchronous thread. The launch(CommonPool) method returns a Job object that can be used in this case. Unlike async, this execution does not return a value. The returned Job is saved because the previous asynchronous operation may return in time and no longer need to cancel the operation.
  4. In the timed out task, we cancel v1 and v2 with TimeoutException, which will resume any operations that have been suspended waiting for both to return.
  5. Wait for both functions to run. If the timeout occurs, await will re-throw the exception used in step 4.
  6. If there are no exceptions, cancel the timeout task that no longer needs to be executed and break out of the loop.
  7. If there is a timeout, the exception is stereotypically caught and a status check is performed to determine the next step. Note that any other exceptions are thrown directly and exit the loop.
  8. In case it’s the third or more attempts, just throw the exception and do nothing.
  9. If everything goes according to script, print the total running time and exit the current function.

It seems simple enough, although the cancellation mechanism could make big news: what if V2 crashes due to some other exception (such as a network IOException)? Of course we have to handle these situations to make sure that tasks can be cancelled under various circumstances (for example, try resources in Kotlin?). . However, this happens in the context of v1 returning in time and not being able to cancel v1 or detect a crash of V2 until we try await.

Never mind the details, the program runs, and the result is as follows:

Attempt 1 at T=0
    Cancelling at T=531
         Crash at T=2017
Attempt 2 at T=2017
    Cancelling at T=2517
         Crash at T=4026
Attempt 3 at T=4026
3
End a
Copy the code

Three attempts were made, and the last one was successful, with a value of 3. Is it exactly like the script? Not at all! We can see the approximate time of the cancellation event, approximately 500 ms after two unsuccessful requests, whereas the exception catch occurred approximately 2000 ms later! We know cancel() was called successfully because we caught the exception. However, it appears that Thread.sleep() in the function is not broken or, in coroutine parlance, recovered when the exception is broken. This could be part of CommonPool, the call to Future.cancel(false) is in the infrastructure, or simply a program restriction.

responsive

Let’s look at how RxJava 2 does the same thing. Unfortunately, functions cannot be called normally if they are suspended, so we have to rewrite the two functions in a normal way:

fun f3(i: Int) : Int {
    Thread.sleep(if(i ! = 2) 2000Lelse 200L)
    return 1
}

fun f4(i: Int) : Int {
    Thread.sleep(if(i ! = 2) 2000Lelse 200L)
    return2}Copy the code

To match the ability to block the external environment, we use BlockingScheduler in RxJava 2 Extensions to provide the ability to return to the main thread. As the name implies, it blocks the initial caller/main thread until a task is committed and run via the scheduler.

fun reactiveWay() {
    RxJavaPlugins.setErrorHandler({ })                         // (1)

    val sched = BlockingScheduler()                            // (2)
    sched.execute {
        val t0 = System.currentTimeMillis()
        val count = Array<Int>(1, { 0 })                       // (3)

        Single.defer({                                         // (4)
            val c = count[0]++;
            println("Attempt " + (c + 1) +
                " at T=" + (System.currentTimeMillis() - t0))

            Single.zip(                                        // (5)
                    Single.fromCallable({ f3(c) })
                        .subscribeOn(Schedulers.io()),
                    Single.fromCallable({ f4(c) })
                        .subscribeOn(Schedulers.io()),
                    BiFunction<Int, Int> { a, b -> a + b }               // (6)
            )
        })
        .doOnDispose({                                         // (7)
            println(" Cancelling at T=" + 
                (System.currentTimeMillis() - t0))
        })
        .timeout(500, TimeUnit.MILLISECONDS)                   // (8)
        .retry({ x, e ->
            println(" Crash at " + 
                (System.currentTimeMillis() - t0))
            x < 3 && e is TimeoutException                     // (9)
        })
        .doAfterTerminate { sched.shutdown() }                 // (10)
        .subscribe({
            println(it)
            println("End at T=" + 
                (System.currentTimeMillis() - t0))             // (11)
        },
        { it.printStackTrace() })
    }
}
Copy the code

It’s a bit long to implement, and it might look a bit scary to those unfamiliar with Lambda.

  1. It is well known that RxJava 2 passes exceptions no matter what. On Android, cannot pass exceptions can make the application crashes, unless you use RxJavaPlugins. SetErrorHandler to capture. At this point, because we knew that the cancellation event would interrupt Thread.sleep(), the call stack would be a mess and we wouldn’t notice so many exceptions.
  2. Set up BlockingScheduler and distribute the first task to execute, along with the remaining main thread execution logic. This is because once locked, start() adds a live lock state to the main thread until any subsequent events break the lock.
  3. Set a heap variable to record the number of retries.
  4. Once there is a subscription through Single.defer, the counter plus prints the Attempt string. This operator allows the state of each subscription to be preserved, which is what we expect from the Retry () operator we execute downstream.
  5. Use the ZIP operator to perform the evaluation of two elements asynchronously, both of which perform their own functions in background threads.
  6. When both are complete, add the results.
  7. To cancel the timeout, use the doOnDispose operator to print the current state and time.
  8. Use the timeout operator to define a timeout for the sum. A TimeoutException is sent if a timeout occurs (such as when there is no feedback in this scenario).
  9. The retry operator overload provides the retry time as well as the current error. After printing an error, you should return true — that is, you must retry — if the number of retries is less than three and the current error is a TimeoutException. Any other errors will only terminate rather than trigger a retry.
  10. Once done, we need to close the scheduler to free the main thread and exit the JVM.
  11. Of course, we need to print the sum and the total operation time before we finish.

It might be argued that this is much more complicated than the implementation of coroutines. But… At least it ran:

Cancelling at T=4527 Attempt 1 at T=72 Cancelling at T=587 Crash at 587 Attempt 2 at T=587 Cancelling at T=1089 Crash at  1090 Attempt 3 at T=1090 Cancelling at T=1291 3 End at T=1292Copy the code

Interestingly, Cancelling at T=4527 is printed when calling coroutineWay() if both functions are called simultaneously in main: Even though there is no time consumption at all in the end, the cancellation event itself is wasted on computations that cannot be stopped, thus adding additional cost to cancelling tasks that have already been completed.

RxJava, on the other hand, at least cancels and retries functions in a timely manner. However, there is also an almost unnecessary Cancelling at T=1291 printed out. In Single. Timeout this is done: If this is done without delay, the internal CompositeDisposable agents the upstream Disposable and cancels it along with the operator, regardless of the actual condition of the operator.

conclusion

Finally, let’s look at the power of reactive design with one small improvement: why retry the whole process when all we need to do is retry functions that are not responding? Improvements can also be easily found in RxJava: put doOnDispose().timeout().retry() in each function call chain (perhaps transfomer can avoid code duplication) :

val timeoutRetry = SingleTransformer<Int, Int> { 
    it.doOnDispose({
        println(" Cancelling at T=" + 
            (System.currentTimeMillis() - t0))
    })
    .timeout(500, TimeUnit.MILLISECONDS)
    .retry({ x, e ->
        println(" Crash at " + 
            (System.currentTimeMillis() - t0))
        x < 3 && e is TimeoutException
    })
}

// ...

Single.zip(
    Single.fromCallable({ f3(c) })
        .subscribeOn(Schedulers.io())
        .compose(timeoutRetry)
    ,
    Single.fromCallable({ f4(c) })
        .subscribeOn(Schedulers.io())
        .compose(timeoutRetry)
    ,
    BiFunction<Int, Int> { a, b -> a + b }
)
// ...
Copy the code

Readers are welcome to get their hands on it and update the implementation of the coroutine to achieve the same behavior (try various other forms of cancellation mechanisms, incidentally). One of the nice things about reactive programming is that in most cases you don’t have to worry about annoying things like threads, the passing of cancel information, and the structure of operators. Libraries such as RxJava have designed their apis and encapsulated the underlying problems, and often programmers just need to use them.

So, does coroutine work at all? Of course it does, but in general, I think performance is a huge limitation, and I wonder what coroutines can do to replace reactive programming in general.


The Nuggets Translation Project is a community that translates quality Internet technical articles from English sharing articles on nuggets. The content covers Android, iOS, React, front-end, back-end, product, design and other fields. If you want to see more high-quality translation, please continue to pay attention to the Project, official Weibo, Zhihu column.