RxJava is easy to implement thread switching, so it is often used to replace AsyncTask, Handler and other native tool classes in Android. It’s easy to use, but if you don’t understand the rationale behind it, you can easily end up writing bugs. This article will take you to briefly understand the implementation principle of RxJava thread switch and the matters needing attention in development

1. Basic Usage

  • Scheduler

If you want to introduce multithreading into your cascade of Observable operators, you can do so by instructing those operators to operate on particular Schedulers. Scheduler allows operators to run on a specified thread to implement multi-threaded scheduling

  • observerOn

Specify the Scheduler on which an observer will observe this Observable specify on which Scheduler an observer will observe the Observable

  • subscribeOn

Specify the Scheduler on which an Observable will operate which Scheduler the Observable itself executes on

Each operator in the RxJava call chain creates a new Observable, and the new Observable generated by the operator registers callbacks with the upper Observable. SubscribeOn and observeOn work in the same way:

  • SubscribeOn subscribe upstream in a specified thread
  • ObserveOn after receiving the data in the specified thread calls the downstream callback method (onNext/onError/onComplete, etc.)

RxJava establishes subscription from bottom to top and then sends data from top to bottom, so subscribeOn can guarantee the thread of data source running even after observeOn appears, because subscription always happens before.

2. subscribeOn

2.1 Implementation Principles

Understand the basic principle of subscribeOn thread switching through the source code

//ObservableSubscribeOn.java
final class ObservableSubscribeOn extends Observable<T> {
    
    @Override
    public void subscribeActual(final Observer<? super T> s) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
        s.onSubscribe(parent);
        Scheduler.scheduledirect (); // Subscribe is not called directly.
        parent.setDisposable(
            scheduler.scheduleDirect(new SubscribeTask(parent)));
    }
    
    final class SubscribeTask implements Runnable {
        @Override
        public void run(a) {
            // Run () is called on the specified scheduler when the thread has changed while subscribing upstream
            // So the upstream thread is guaranteed to runsource.subscribe(parent); }}static final
    class SubscribeOnObserver<T> implements Observer<T>, Disposable {
       
        @Override
        public void onNext(T t) {
            // No thread transformation is performed after data is receivedactual.onNext(t); }}}Copy the code

2.2 subscribeOn takes effect only once

SubscribeOn changes the thread where Observable.create resides by switching the subscribing thread, thus affecting the data transmitting thread.

Since the subscription process is bottom-up, Observable create is only affected by the latest subscribeOn, and only the first subscribeOn is valid when there are multiple subscribeOn in the call chain. Other subsciBeons can still influence the thread of execution of doOnSubscribe upstream of them.

@Test
fun test(a) {
    Observable.create<Unit> { emitter ->
        log("onSubscribe")
        emitter.onNext(Unit)
        emitter.onComplete()
    }.subscribeOn(namedScheduler("1 - subscribeOn"))
        .doOnSubscribe { log("1 - doOnSubscribe") }
        .subscribeOn(namedScheduler("2 - subscribeOn"))
        .doOnSubscribe { log("2 - doOnSubscribe") }
        .doOnNext { log("onNext") }
        .test().awaitTerminalEvent() // Wait until observable completes
 }
Copy the code

2.3 Correctly understand the meaning of subscribeOn

Even though we added .subscribeOn() that is not enough. SubscribeOn operator only switches the subscribing process to The desired thread, but that doesn’t mean the items will be emitted by that thread. But this does not mean that upstream data must come from this thread

@Test
fun test(a) {
    val observable = Observable.create<Int> { emitter ->
        log("onSubscribe")
        thread(name = "Main thread", isDaemon = false) {
            log("1 - emitting"); emitter.onNext(1)
            log("2 - emitting"); emitter.onNext(2)
            log("3 - emitting"); emitter.onNext(3)
            emitter.onComplete()
        }
    }
    
    observable
        .subscribeOn(Schedulers.computation())
        .doOnNext { log("$it - after subscribeOn") }
        .test().awaitTerminalEvent() // Wait until observable completes
}
Copy the code

Understanding the meaning of subscribeOn correctly helps to avoid some misunderstandings in use:

Not valid for PublishSubject

@Test
fun test(a) {
    val subject = PublishSubject.create<Int> ()val observer1 = subject
        .subscribeOn(Schedulers.io())
        .doOnNext { log("$it - I want this happen on an IO thread") }
        .test()
    val observer2 = subject
        .subscribeOn(Schedulers.newThread())
        .doOnNext { log("$it - I want this happen on a new thread") }
        .test()
    
    sleep(10); 
    subject.onNext(1)
    subject.onNext(2)
    subject.onNext(3)
    subject.onComplete()
    
    observer1.awaitTerminalEvent()
    observer2.awaitTerminalEvent()
}
Copy the code

For a PublishSubject, which thread the upstream data comes from is determined at onNext, so it makes no sense to use subscribeOn for a PublishSubject.

Not valid for Observable.just()

In general subcribeOn can decide Observable. Create {… }, so a mistake many beginners make is in Observable. Just (…) When you do a time-consuming task and mistakenly think it will run in the subscribeOn thread:

As above, readFromDb() is clearly inappropriate in just. Just () is executed immediately on the current thread and is therefore not affected by subscribeOn and should be modified as follows:

//Observable.defer
Observable.defer { Observable.just(readFromDb()) }
    .subscribeOn(Schedulers.io())
    .subscribe { ... }

//Observable.fromCallable
Observable.fromCallable { readFromDb() }
    .subscribeOn(Schedulers.io())
    .subscribe { ... }
Copy the code

Use flatMap to handle concurrency

The subscribing thread of the current Observable that subscribeOn determines, so be careful when using flatMap

Observable.fromIterable(listOf("id1"."id2"."id3"))
    .flatMap { id -> loadData(id) }
    .subscribeOn(Schedulers.io())
    .observeOn(mainThread())
    .toList()
    .subscribe { result -> log(result) }
Copy the code

This is incorrect if we want multiple loadData(ids) to execute concurrently.

SubscribeOn determines the upstream thread of flatMap. The subscription of multiple Observable returned by flatMap takes place in this thread. Multiple loadData can only run in a single thread and cannot be paralleled.

To achieve parallel execution, you need to modify the following:

Observable.fromIterable(listOf("id1"."id2"."id3"))
    .flatMap { id ->
        loadData(id)
            .subscribeOn(Schedulers.io())
    }
    .observeOn(mainThread())
    .toList()
    .subscribe { result -> log(result) }
Copy the code

3.observeOn

3.1 Implementation Principle

Through the source code to understand the basic principle of observeOn thread switch

//ObservableObserveOn.java
final class ObservableObserveOn extends Observable<T> {

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else { 
            Scheduler.Worker w = scheduler.createWorker();
            Subscribe directly upstream without thread switching, which takes place in the Observer
            source.subscribe(
                newObserveOnObserver<T>(observer, w, delayError, bufferSize)); }}static final class ObserveOnObserver<T> implements Observer<T>, Runnable {
        
        @Override
        public void onNext(T t) {
            if (done) {
                return;
            }
            // Put data into queues to increase throughput and improve performance
            if(sourceMode ! = QueueDisposable.ASYNC) { queue.offer(t); }// Perform a thread switch in the schedule method and loop out the data
            // Call back to the downstream, which receives the data in the specified thread
            schedule();
        }
    
        void schedule(a) {
            if (this.getAndIncrement() == 0) {
                // Switch threads
                this.worker.schedule(this); }}}}Copy the code

3.2 observeOn Takes effect several times

Unlike subscribeOn, observeOn can have more than one and each takes effect

  • A thread that subscribeOn switches can be listened on via doOnSubscribe
  • ObserveOn switching threads can be listened on via doOnNext

3.3 Can serial transmission of multiple items be guaranteed?

After observeOn uses Scheduler to schedule threads, does the downstream run in a single thread or multiple threads? Can the order of downstream data be guaranteed?

@Test
fun test(a) {
    Observable.create<Int> { emitter ->
        repeat(10) {
            emitter.onNext(it)
        }
        emitter.onComplete()
    }.observeOn(Schedulers.io())
        .subscribe {
            log("-$it")}}Copy the code

It can be seen from the results that even after Scheduler is scheduled, the downstream still runs in a single thread, which can ensure the order of data in the whole call chain.

So why are schedulers running on a single thread?

4. Scheduler

4.1 Implementation Principle

Scheculer does not directly schedule Runnable, but creates Worker, and then the Worker schedules specific tasks.

SubscribeTask in subscribeOn and ObserveOnObserver in observeOn both implement Runnable, so they are finally executed in Worker.

4.2 Tasks are scheduled by the Worker

A Scheduler can create multiple workers, and a Worker can manage multiple tasks (Runnable)

Workers exist to ensure two things:

  • Tasks created by the same Worker ensure serial execution, and immediately executed tasks comply with the first-in, first-out principle.
  • The Worker binds the Runnable that calls his method, and when the Worker cancels, all tasks based on him are cancelled

4.3 How does Worker guarantee serial?

Very simply, each Worker has only one thread

Now we can answer the question: why does observeOn still run on a single thread after Scheduerl scheduling?

Scheduler assigns a unique Worker to each observeOn, so downstream observeOn can be guaranteed to execute serially on a single thread.

//ObservableObserveOn.java
final class ObservableObserveOn extends Observable<T> {

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else { 
            Scheduler.Worker w = scheduler.createWorker();
            source.subscribe(
                new ObserveOnObserver<T>(observer, w, delayError, bufferSize)); / / to the worker}}... }Copy the code

As above, Worker is held as a member variable of ObserveOnObserver

4.4 the preset Schedulers

Just like Executors provide multiple Types of ThreadPoolExecutor, Schedulers provides multiple preset Schedulers

No. Schedulers & Descriptions
1 Schedulers.single()

Globally unique threads, no matter how many observables there are, share this unique thread.
2 Schedulers.io()

One of the most common schedulers for IO related operations, such as network requests and file operations. The IO scheduler is backed by a thread pool. It starts by creating a worker thread that can be reused for other operations, and when that worker thread (in the case of long-running tasks) cannot be reused, a new thread is created to handle the other operations.
3 Schedulers.computation()

Much like the IO scheduler, it is implemented based on thread pools. The number of threads available is fixed, consistent with the number of CPU cores. When all threads are busy, new tasks can only be in wait state. Therefore, it is not suitable for IO related operations. Suitable for some calculation operations, a single calculation task does not occupy the thread for a long time.
4 Schedulers.newThread()

Each call creates a new thread
5 Schedulers.trampoline()

Execute in the current thread without switching threads.
6 Schedulers.from(java.util.concurrent.Executor executor)

More like a custom IO scheduler. We can create a custom thread pool by specifying its size. The number of observables available for use in scenarios where there are too many IO schedulers,
//Sample of Schedulers.from
fun namedScheduler(name: String): Scheduler {
    return Schedulers.from(
        Executors.newCachedThreadPool { Thread(it, name) }
    )
}
Copy the code

Thread-Safety

5.1 Is the RxJava operator thread-safe?

@Test
fun test(a) {
    val numberOfThreads = 1000
    val publishSubject = PublishSubject.create<Int> ()val actuallyReceived = AtomicInteger()

    publishSubject
        .take(300).subscribe {
            actuallyReceived.incrementAndGet()
        }

    val latch = CountDownLatch(numberOfThreads)
    var threads = listOf<Thread>()

    (0..numberOfThreads).forEach {
        threads += thread(start = false) {
            publishSubject.onNext(it)
            latch.countDown()
        }
    }

    threads.forEach { it.start() }
    latch.await()

    val sum = actuallyReceived.get()
    check(sum == 300) { "$sum! = 300"}}Copy the code

It didn’t turn out as expected becausetakeNot thread-safe

Take a look at the take source code

public final class ObservableTake<T> extends AbstractObservableWithUpstream<T.T> {
    final long limit;

    public ObservableTake(ObservableSource<T> source, long limit) {
        super(source);
        this.limit = limit;
    }
    protected void subscribeActual(Observer<? super T> observer) {
        this.source.subscribe(new ObservableTake.TakeObserver(observer, this.limit));
    }

    static final class TakeObserver<T> implements Observer<T>, Disposable {
        final Observer<? super T> downstream;
        boolean done;
        Disposable upstream;
        long remaining;

        TakeObserver(Observer<? super T> actual, long limit) {
            this.downstream = actual;
            this.remaining = limit;
        }

        public void onNext(T t) {
            if (!this.done && this.remaining-- > 0L) {
                boolean stop = this.remaining == 0L;
                this.downstream.onNext(t);
                if (stop) {
                    this.onComplete();
                }
            }

        }
    }
}
Copy the code

Unsurprisingly remaining– there is no lock operation

5.2 observableOn thread safety

ObservableOn ensures serialization, since takes can run on a single thread

@Test
fun test(a) {
    repeat(10000) {
        val numberOfThreads = 1000
        val publishSubject = PublishSubject.create<Int> ()val actuallyReceived = AtomicInteger()
    
        publishSubject
            .observeOn(Schedulers.io())
            .take(300).subscribe {
                actuallyReceived.incrementAndGet()
            }
    
        val latch = CountDownLatch(numberOfThreads)
        var threads = listOf<Thread>()
    
        (0..numberOfThreads).forEach {
            threads += thread(start = false) {
                publishSubject.onNext(it)
                latch.countDown()
            }
        }
    
        threads.forEach { it.start() }
        latch.await()
    
        check(actuallyReceived.get() = =300)}}Copy the code

Unfortunately, the problem persists after several runs, because observableOn itself is not thread-safe, and observableOn is used in observableOnqueueIs a non-thread-safe queue.

5.3 The observables Contract

Rx explicitly tells us this in the Observable definition:

Observables must issue notifications to observers serially (not in parallel). They may issue these notifications from different threads, But there must be a formal happens-before relationship between the notifications. Reactivex.io /documentati…

As a conclusion, RxJava operators are not thread-safe by default.

However, operators that receive multiple Observables, such as Merge (), combineLatest(), zip(), etc., are thread-safe, so thread-safety issues do not need to be considered even when multiple Observables come from non-threads.