Outline

[TOC]

preface

The previous tutorial covered the basics of using RxJava2. In this section, we’ll learn about RxJava’s powerful thread control.

To the chase

Using the previous example, two pipes:

RxJava

Normally, upstream and downstream work in the same thread, which means that the upstream thread sends events and the downstream thread receives events.

By default, all activities of an Activity run in the main thread. For example, we type the name of the current thread in onCreate:

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);
        Log.d(TAG, Thread.currentThread().getName());
    }Copy the code

The result:

D/TAG: mainCopy the code

Back in RxJava, when we create an upstream Observable in the main thread to send events, the upstream Observable sends events in the main thread by default.

When we create a downstream Observer in the main thread to receive events, the downstream Observer receives events in the main thread by default.

@Override                                                                                       
protected void onCreate(Bundle savedInstanceState) {                                            
    super.onCreate(savedInstanceState);                                                         
    setContentView(R.layout.activity_main);                                                     

    Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {   
        @Override                                                                               
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {            
            Log.d(TAG, "Observable thread is : " + Thread.currentThread().getName());           
            Log.d(TAG, "emit 1");                                                               
            emitter.onNext(1); }}); Consumer<Integer> consumer =new Consumer<Integer>() {                                      
        @Override                                                                               
        public void accept(Integer integer) throws Exception {                                  
            Log.d(TAG, "Observer thread is :" + Thread.currentThread().getName());              
            Log.d(TAG, "onNext: "+ integer); }}; observable.subscribe(consumer); }Copy the code

Create upstream and downstream separately in the main thread, then concatenate them together and print out their respective threads.

D/TAG: Observable thread is : main
D/TAG: emit 1                     
D/TAG: Observer thread is :main   
D/TAG: onNext: 1Copy the code

This validates that upstream and downstream work on the same thread by default.

This is definitely not enough for our needs. We would rather have a situation where we do time-consuming operations in the child thread and then go back to the main thread to work on the UI, as illustrated by the following image:

thread.png

In this figure, we use yellow pipes to represent child threads and dark blue pipes to represent the main thread.

To do this, we need to change the upstream thread that sends the event to send the event in the child thread, and then change the downstream thread to receive the event in the main thread. This can be done easily with RxJava’s built-in thread scheduler. Let’s look at some code:

@Override                                                                                       
protected void onCreate(Bundle savedInstanceState) {                                            
    super.onCreate(savedInstanceState);                                                         
    setContentView(R.layout.activity_main);                                                     

    Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {   
        @Override                                                                               
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {            
            Log.d(TAG, "Observable thread is : " + Thread.currentThread().getName());           
            Log.d(TAG, "emit 1");                                                               
            emitter.onNext(1); }}); Consumer<Integer> consumer =new Consumer<Integer>() {                                      
        @Override                                                                               
        public void accept(Integer integer) throws Exception {                                  
            Log.d(TAG, "Observer thread is :" + Thread.currentThread().getName());              
            Log.d(TAG, "onNext: "+ integer); }}; observable.subscribeOn(Schedulers.newThread()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(consumer); }Copy the code

It’s the same example as before, but we added a little bit too much. Let’s look at the result:

 D/TAG: Observable thread is : RxNewThreadScheduler-2  
 D/TAG: emit 1                                         
 D/TAG: Observer thread is :main                       
 D/TAG: onNext: 1Copy the code

As you can see, the upstream thread that sends the event does change. The event is sent in a thread called RxNewThreadScheduler-2, while the downstream thread still receives the event in the main thread. This shows that our purpose is achieved.

This is just two more lines than the previous code:

.subscribeOn(Schedulers.newThread())                                              
.observeOn(AndroidSchedulers.mainThread())Copy the code

As a beginner’s tutorial, will not post a lot of source code to analyze, so only need to let everyone remember a few points, has achieved how to use this purpose is our goal.

In simple terms, subscribeOn() specifies the upstream thread that sends the event, and observeOn() specifies the downstream thread that receives the event.

The threads that are specified upstream multiple times are only valid for the first time, that is, the calls to subscribeOn() are only valid for the first time and the rest are ignored.

It is possible to specify the downstream thread multiple times, meaning that every time observeOn() is called, the downstream thread will switch.

Here’s an example:

 observable.subscribeOn(Schedulers.newThread())     
         .subscribeOn(Schedulers.io())              
         .observeOn(AndroidSchedulers.mainThread()) 
         .observeOn(Schedulers.io())                
         .subscribe(consumer);Copy the code

This code specifies two upstream threads that send events, newThread and IO thread, and two downstream threads, main and IO thread. The running results are as follows:

D/TAG: Observable thread is : RxNewThreadScheduler-3
D/TAG: emit 1                                       
D/TAG: Observer thread is :RxCachedThreadScheduler-1
D/TAG: onNext: 1Copy the code

RxNewThreadScheduler (RxCachedThreadScheduler); RxCachedThreadScheduler (RxCachedThreadScheduler) The CacheThread is actually one of the IO thread pools.

To get a clearer view of the downstream thread switching process, let’s add log:

     observable.subscribeOn(Schedulers.newThread())
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .doOnNext(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.d(TAG, "After observeOn(mainThread), current thread is: " + Thread.currentThread().getName());
                    }
                })
                .observeOn(Schedulers.io())
                .doOnNext(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.d(TAG, "After observeOn(io), current thread is : " + Thread.currentThread().getName());
                    }
                })
                .subscribe(consumer);Copy the code

After the downstream thread switch, we print the current thread and run the result:

D/TAG: Observable thread is : RxNewThreadScheduler-1                                             
D/TAG: emit 1                                                                                    
D/TAG: After observeOn(mainThread), current thread is: main                                      
D/TAG: After observeOn(io), current thread is : RxCachedThreadScheduler-2                        
D/TAG: Observer thread is :RxCachedThreadScheduler-2                                             
D/TAG: onNext: 1Copy the code

As you can see, the observeOn() thread switches every time we call it, so we know what to do if we have a similar requirement.

In RxJava, there are already many built-in threading options for us to choose from, for example

  • Schedulers.io() represents a thread for IO operations, which are typically used for IO-intensive operations such as networking, reading and writing files
  • Schedulers.computation() represents CPU-intensive operations, such as operations that require a lot of computation
  • Schedulers.newthread () represents a regular newThread
  • AndroidSchedulers. MainThread () represents the main thread of Android

These built-in schedulers are sufficient for our development needs, so we should use these built-in options, and use thread pools to maintain these threads internally in RxJava, all more efficiently.

practice

For us Android developers, often put some time-consuming operations in the background, such as network requests or read and write files, database operations and so on, wait until the operation is finished back to the main thread to update the UI, with the above foundation, so now we can easily do some operations.

Here are a few common scenarios.

Network request

There are only a few well-known web request libraries on Android, and Retrofit stands out from the rest because it supports RxJava invocation. Here’s how it works.

To use Retrofit, add Gradle configuration:

    //retrofit
    compile 'com. Squareup. Retrofit2: retrofit: 2.1.0'
    //Gson converter
    compile 'com. Squareup. Retrofit2: converter - gson: 2.1.0'
    //RxJava2 Adapter
    compile 'com. Jakewharton. Retrofit: retrofit2 - rxjava2 - adapter: 1.0.0'
    //okhttp
    compile 'com. Squareup. Okhttp3: okhttp: 3.4.1 track'
    compile 'com. Squareup. Okhttp3: logging - interceptor: 3.4.1 track'Copy the code

Then define the Api interface:

public interface Api {
    @GET
    Observable<LoginResponse> login(@Body LoginRequest request);

    @GET
    Observable<RegisterResponse> register(@Body RegisterRequest request);
}Copy the code

Then create a Retrofit client:

private static Retrofit create(a) {
            OkHttpClient.Builder builder = new OkHttpClient().newBuilder();
            builder.readTimeout(10, TimeUnit.SECONDS);
            builder.connectTimeout(9, TimeUnit.SECONDS);

            if (BuildConfig.DEBUG) {
                HttpLoggingInterceptor interceptor = new HttpLoggingInterceptor();
                interceptor.setLevel(HttpLoggingInterceptor.Level.BODY);
                builder.addInterceptor(interceptor);
            }

            return new Retrofit.Builder().baseUrl(ENDPOINT)
                    .client(builder.build())
                    .addConverterFactory(GsonConverterFactory.create())
                    .addCallAdapterFactory(RxJava2CallAdapterFactory.create())
                    .build();
}Copy the code

Making a request is simple:

        Api api = retrofit.create(Api.class);
        api.login(request)
              .subscribeOn(Schedulers.io())               // Network requests are made in the IO thread
             .observeOn(AndroidSchedulers.mainThread())  // go back to the main thread to process the request result
            .subscribe(new Observer<LoginResponse>() {
                    @Override
                public void onSubscribe(Disposable d) {}

                @Override
                public void onNext(LoginResponse value) {}

                @Override
                public void onError(Throwable e) {
                    Toast.makeText(mContext, "Login failed", Toast.LENGTH_SHORT).show();
                }

                @Override
                public void onComplete(a) {
                    Toast.makeText(mContext, "Login successful", Toast.LENGTH_SHORT).show(); }});Copy the code

It seems perfect, but we missed the point that if the Activity exits during the request, and if we go back to the main thread to update the UI, the APP will crash, what do we do? In the last video we talked about Disposable, which is a switch, When its Dispose () method is called, it cuts off the water pipe and the downstream receives no events. Since no events are received, the UI is no longer updated. So we can store this Disposable in the Activity, and when the Activity exits, we can just cut it off.

We have a CompositeDisposable container built into RxJava, and every time we get a Disposable we call CompositeDisposable.add() to add it to the container. At the time of exit, call CompositeDisposable. The clear () can be cut off all the pipes.

Read-write database

Read/write database, read/write database is also a time-consuming operation, so we also had better put in the IO thread to carry out, this example is relatively simple, directly on the code:

public Observable<List<Record>> readAllRecords() {
        return Observable.create(new ObservableOnSubscribe<List<Record>>() {
            @Override
            public void subscribe(ObservableEmitter<List<Record>> emitter) throws Exception {
                Cursor cursor = null;
                try {
                    cursor = getReadableDatabase().rawQuery("select * from " + TABLE_NAME, new String[]{});
                    List<Record> result = new ArrayList<>();
                    while (cursor.moveToNext()) {
                        result.add(Db.Record.read(cursor));
                    }
                    emitter.onNext(result);
                    emitter.onComplete();
                } finally {
                    if(cursor ! =null) {
                        cursor.close();
                    }
                }
            }
        }).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread());
    }Copy the code

That’s it for this tutorial, which will teach you how to use the powerful operators in RxJava. Using these operators, you can easily achieve a variety of effects. Stay tuned.