The author blog

http://www.jianshu.com/u/c50b715ccaeb

preface

The previous tutorial covered the basics of using RxJava2. In this section, we’ll learn about RxJava’s powerful thread control.

To the chase

Using the previous example, two pipes:


Normally, upstream and downstream work in the same thread, which means that the upstream thread sends events and the downstream thread receives events.


By default, all activities of an Activity run in the main thread. For example, we type the name of the current thread in onCreate:

The result:

Back in RxJava, when we create an upstream Observable in the main thread to send events, the upstream Observable sends events in the main thread by default.


When we create a downstream Observer in the main thread to receive events, the downstream Observer receives events in the main thread by default.

Create upstream and downstream separately in the main thread, then concatenate them together and print out their respective threads.

This validates that upstream and downstream work on the same thread by default.


This is definitely not enough for our needs. We would rather have a situation where we do time-consuming operations in the child thread and then go back to the main thread to work on the UI, as illustrated by the following image:


In this figure, we use yellow pipes to represent child threads and dark blue pipes to represent the main thread.


To do this, we need to change the upstream thread that sends the event to send the event in the child thread, and then change the downstream thread to receive the event in the main thread. This can be done easily with RxJava’s built-in thread scheduler. Let’s look at some code:

It’s the same example as before, but we added a little bit too much. Let’s look at the result:

As you can see, the upstream thread that sends the event does change. The event is sent in a thread called RxNewThreadScheduler-2, while the downstream thread still receives the event in the main thread. This shows that our purpose is achieved.


This is just two more lines than the previous code:

As a beginner’s tutorial, will not post a lot of source code to analyze, so only need to let everyone remember a few points, has achieved how to use this purpose is our goal.


In simple terms, subscribeOn() specifies the upstream thread that sends the event, and observeOn() specifies the downstream thread that receives the event.

The threads that are specified upstream multiple times are only valid for the first time, that is, the calls to subscribeOn() are only valid for the first time and the rest are ignored.


It is possible to specify the downstream thread multiple times, meaning that every time observeOn() is called, the downstream thread will switch.


Here’s an example:

This code specifies two upstream threads that send events, newThread and IO thread, and two downstream threads, main and IO thread. The running results are as follows:

RxNewThreadScheduler (RxCachedThreadScheduler); RxCachedThreadScheduler (RxCachedThreadScheduler) The CacheThread is actually one of the IO thread pools.


To get a clearer view of the downstream thread switching process, let’s add log:

After the downstream thread switch, we print the current thread and run the result:

As you can see, the observeOn() thread switches every time we call it, so we know what to do if we have a similar requirement.


In RxJava, there are already many built-in threading options for us to choose from, for example


  • Schedulers.io()

    A thread that represents IO operations, usually used for networking, reading and writing files, and other IO intensive operations


  • Schedulers.computation()

    Represents CPU-intensive operations, such as operations that require a lot of computation


  • Schedulers.newThread()

    Represents a regular new thread


  • AndroidSchedulers.mainThread()

    Represents the main thread of Android


These built-in schedulers are sufficient for our development needs, so we should use these built-in options, and use thread pools to maintain these threads internally in RxJava, all more efficiently.


practice

For us Android developers, often put some time-consuming operations in the background, such as network requests or read and write files, database operations and so on, wait until the operation is finished back to the main thread to update the UI, with the above foundation, so now we can easily do some operations.


Here are a few common scenarios.


Network request

There are only a few well-known web request libraries on Android, and Retrofit stands out from the rest because it supports RxJava invocation. Here’s how it works.


To use Retrofit, add Gradle configuration:

Then define the Api interface:

Then create a Retrofit client:

Making a request is simple:

It seems perfect, but we missed the point that if the Activity exits during the request, and if we go back to the main thread to update the UI, the APP will crash, what do we do? In the last video we talked about Disposable, which is a switch, When its Dispose () method is called, it cuts off the water pipe and the downstream receives no events. Since no events are received, the UI is no longer updated. So we can store this Disposable in the Activity, and when the Activity exits, we can just cut it off.


We have a CompositeDisposable container built into RxJava, and every time we get a Disposable we call CompositeDisposable.add() to add it to the container. At the time of exit, call CompositeDisposable. The clear () can be cut off all the pipes.


Read-write database

Read/write database, read/write database is also a time-consuming operation, so we also had better put in the IO thread to carry out, this example is relatively simple, directly on the code: