This is the sixth day of my participation in the November Gwen Challenge. Check out the event details: The last Gwen Challenge 2021

Because it is my spare time to write out some opinions, the length is long, so it is divided into several parts published, you can see my home page to get the PDF version

Thread Control — Scheduler (1)

Without specifying a thread, RxJava follows the thread-invariant principle: subscribe() is called on the same thread as subscribe()

Which thread produces the event; Events are consumed in the thread where they are produced. If you need to switch threads, you need to use it

The Scheduler is based on a Scheduler.

1) Scheduler API (I)

In RxJava, the Scheduler — the equivalent of a thread controller — is used by RxJava to specify that each piece of code should run

In what thread. RxJava already has several schedulers built in that are suitable for most usage scenarios:

Schedulers.immediate() : Runs directly on the current thread. This is the default

The Scheduler.

Schedulers.newthread () : new threads are always enabled and perform operations on the newThread.

Schedulers.io() : used for I/O operations (read and write to files, read and write to databases, network information interaction, etc.)

The Scheduler. The behavior pattern is similar to that of newThread(), except that the internal implementation of IO () uses an infinite number

An upper limit to the thread pool and the ability to reuse idle threads makes IO () more efficient than newThread() in most cases. Don’t put the

Computing is done in IO () to avoid creating unnecessary threads.

Schedulers.computation() : The Scheduler used for calculation. This computation refers to CPU intensive computation, i.e

Operations that do not limit performance by operations such as I/O, such as graphics calculations. This Scheduler uses a fixed thread pool, large

Small indicates the number of CPU cores. Don’t try to computation() on I/O operations, or you’ll lose I/O wait time

The CPU.

In addition, as a special AndroidSchedulers Android. The mainThread (), it specifies the operating at will

Android main thread runs.

With these schedulers, threads can be controlled using the subscribeOn() and observeOn() methods

. * subscribeOn() : specifies the thread in which subscribe() occurs when observable. OnSubscribe is activated

The thread in question. Or the thread of event generation. * observeOn() : specifies the thread on which Subscriber is running. Or call

The thread that does the event consumption.

Text narration is always difficult to understand, above code:

Observable.just(1.2.3.4)

.subscribeOn(Schedulers.io()) // Specify subscribe() to the IO thread

.observeOn(AndroidSchedulers.mainThread()) // Specify Subscriber callback occurs on the main thread

.subscribe(new Action1<Integer>() {

@Override

public void call(Integer number) {

Log.d(tag, "number:"+ number); }});Copy the code

In the above code, due to the assignment of subscribeOn(schedulers.io ()), contents 1, 2,

3, 4 will be emitted in the IO thread; As observeOn (AndroidScheculers mainThread ()) of the specified, so

The printing of subscriber numbers will take place on the main thread. In fact, this one says two words before subscribe()

SubscribeOn (Scheduler. IO ()) and observeOn (AndroidSchedulers. MainThread (), the use of the way

Often, it applies to most “background thread fetch, main thread display” program strategies.

In the example above, get the image from the image ID and display it, if you also add these two sentences

int drawableRes = ... ; ImageView imageView = ... ; Observable.create(new OnSubscribe<Drawable>() {

@Override

public void call(Subscriber<? super Drawable> subscriber) {

Drawable drawable = getTheme().getDrawable(drawableRes));

subscriber.onNext(drawable);

subscriber.onCompleted();

}

})

.subscribeOn(Schedulers.io()) // Specify subscribe() to the IO thread

.observeOn(AndroidSchedulers.mainThread()) // Specify Subscriber callback occurs on the main thread

.subscribe(new Observer<Drawable>() {

@Override

public void onNext(Drawable drawable) {

imageView.setImageDrawable(drawable);

}

@Override

public void onCompleted() {

}

@Override

public void onError(Throwable e) {

Toast.makeText(activity, "Error!", Toast.LENGTH_SHORT).show(); }});Copy the code

So, loading images will take place on the IO thread, and setting images will take place on the main thread. This means that even loading images takes time

Tens or even hundreds of milliseconds will not cause the slightest interface lag.

  1. Principle of Scheduler (1)

The Scheduler API of RxJava is handy and magical. and

Subscribe () is the outermost method called directly, but it can also be assigned a thread. . However, Scheduler’s principle requires

I’ll do it later, because it’s based on the principles of the Transformations that we saw in the next section.


Observable.just(1.2.3.4)

.subscribeOn(Schedulers.io()) // Specify subscribe() to the IO thread

.observeOn(AndroidSchedulers.mainThread()) // Specify Subscriber callback occurs on the main thread

.subscribe(new Action1<Integer>() {

@Override

public void call(Integer number) {

Log.d(tag, "number:"+ number); }}); int drawableRes = ... ; ImageView imageView = ... ; Observable.create(new OnSubscribe<Drawable>() {

@Override

public void call(Subscriber<? super Drawable> subscriber) {

Drawable drawable = getTheme().getDrawable(drawableRes));

subscriber.onNext(drawable);

subscriber.onCompleted();

}

})

.subscribeOn(Schedulers.io()) // Specify subscribe() to the IO thread

.observeOn(AndroidSchedulers.mainThread()) // Specify Subscriber callback occurs on the main thread

.subscribe(new Observer<Drawable>() {

@Override

public void onNext(Drawable drawable) {

imageView.setImageDrawable(drawable);

}

@Override

public void onCompleted() {

}

@Override

public void onError(Throwable e) {

Toast.makeText(activity, "Error!", Toast.LENGTH_SHORT).show(); }});Copy the code

Well, I didn’t say anything in this video, just to reassure you that I’m not forgetting the principle, but that it fits better

Place.

2. The transform

We’re finally going somewhere cool, and whether you’re excited or not, I am.

RxJava provides support for transforming sequences of events, which is one of its core features and one of the things that most people say is “RxJava is great

The biggest reason for using “. The so-called transformation is to process the object or the whole sequence of events into different events

Or sequence of events. Concepts are always vague and hard to understand. Look at the API.

1) API

Let’s start with an example of map() :


Observable.just("images/logo.png") // Input type String

.map(new Func1<String, Bitmap>() {

@Override

public Bitmap call(String filePath) { // Parameter type String

return getBitmapFromPath(filePath); // Return type Bitmap

}

})

.subscribe(new Action1<Bitmap>() {

@Override

public void call(Bitmap bitmap) { // Parameter type BitmapshowBitmap(bitmap); }});Copy the code

There is a class called Func1. Similar to Action1, it is also an RxJava interface used to wrap contents

A method with one parameter. Func1 differs from Action in that Func1 wraps a method that returns a value. In addition, and

FuncX, like ActionX, has multiple methods for different numbers of arguments. The difference between FuncX and ActionX is FuncX

The wrapper is a method that returns a value.

As you can see, the map() method converts the String in the argument to a Bitmap and returns it after passing through the map() side

The parameter type of the event is changed from String to Bitmap. This direct transformation of the object and return, is the most common and most generous

Easy to understand transformation. But RxJava transforms beyond that, not just for event objects, but for entire event queues,

This makes RxJava very flexible. Let me list a few common transformations:

Map () : A direct transform of the event object, as described above. It is the most commonly used RxJava transformation. The map ()

Schematic diagram:

FlatMap () : This is a useful but very difficult transformation to understand, so I decided to cover it in more detail. So let’s first assume that

A requirement: Suppose you have a data structure “students”, now you need to print out the names of a group of students. The implementation is simple:

Student[] students = ... ; Subscriber<String> subscriber = new Subscriber<String>() {

@Override

public void onNext(String name){ Log.d(tag, name); }... }; Observable.from(students) .map(new Func1<Student, String>() {

@Override

public String call(Student student) {

return student.getName();

}

})

.subscribe(subscriber);
Copy the code

Very simple. And suppose: what if you were to print out the names of all the courses each student needs to take? (The difference in requirements is that each

The student has only one name, but has multiple courses.) First, it can be implemented like this:

Student[] students = ... ; Subscriber<Student> subscriber =new Subscriber<Student>() {

@Override

public void onNext(Student student) {

List<Course> courses = student.getCourses();

for (int i = 0; i < courses.size(); i++) { Course course = courses.get(i); Log.d(tag, course.getName()); }}... }; Observable.from(students) .subscribe(subscriber);Copy the code

It’s still very simple. So if I don’t want to use the for loop in Subscriber, I want the order passed in directly in Subscriber

What about the Course object of the. (important for code reuse)? This is obviously not possible with map(), because map() is one-to-one

Conversion, and now I’m asking for one-to-many conversion. So how do you convert one Student into multiple courses?

At this point, flatMap() is needed:

Student[] students = ... ; Subscriber<Course> subscriber =new Subscriber<Course>() {

@Override

public void onNext(Course course){ Log.d(tag, course.getName()); }... }; Observable.from(students) .flatMap(new Func1<Student, Observable<Course>>() {

@Override

public Observable<Course> call(Student student) {

return Observable.from(student.getCourses());

}

})

.subscribe(subscriber);
Copy the code

As you can see from the above code, flatMap() has one thing in common with map() : it also converts the parameters passed in and returns another

An object. Note, however, that unlike map(), flatMap() returns an Observable, and this

The Observables are not sent directly to the Subscriber callback method. FlatMap () works like this

:

  1. Create an Observable using the incoming event object.

  2. Instead of sending the Observable, it activates it, so it starts sending events;

  3. Events sent by each created Observable are merged into the same Observable, which is responsible for transferring these events to Subscriber’s callback method.

This three-step process splits events into two levels, “flattening” the original object through a set of newly created Observables and distributing it through a common path

Down. This “flat” is what a flatMap() calls a flat.

FlatMap () :

Extension: Since asynchronous code can be added to a nested Observable, flatMap() is also commonly used for nested asynchronous operations, such as

Nested network requests. Sample code (Retrofit + RxJava) :


networkClient.token() Observable
      
        returns Observable
       
         that requests a token when subscribing and sends a response
       
      

token

.flatMap(new Func1<String, Observable<Messages>>() {

@Override

public Observable<Messages> call(String token) {

Observable
      
        returns Observable
       
         that requests a list of Messages when subscribing and sends the requested message when responding
       
      The list ofreturn networkClient.messages();

}

})

.subscribe(new Action1<Messages>() {

@Override

public void call(Messages messages) {

// Process the display message listshowMessages(messages); }});Copy the code

Traditional nested requests require nested Callback to be implemented. With flatMap(), you can write nested requests in a chain

In order to keep the program logic clear.

  • ThrottleFirst () : Dismisses new events within a certain amount of time after each event. Commonly used for jitter filtering, such as button click listeners: Rxview.clickevents (button) // RxBinding, ThrottleFirst (500, timeunit.milliseconds) // Set the subscriber interval to 500ms. Subscribe; Mom is no longer afraid of my user’s hand shaking open two duplicate screens.

In addition, RxJava also provides many convenient methods to implement the transformation of the sequence of events, here are not one example.

2) Transformation principle: lift()

Although these transformations have different functions, they are essentially for the processing and retransmission of event sequences. Inside RxJava, they are

A transformation method based on the same foundation: Lift (Operator). First take a look at the internal implementation of Lift () (core code only) :

// Note: this is not the source code for Lift (), but the core code after removing the code related to performance, compatibility, and extensibility.

// If you need to see the source code, you can go to RxJava GitHub repository to download.


public <R> Observable<R> lift(Operator<? extends R, ? super T> operator) {

return Observable.create(new OnSubscribe<R>() {

@Override

public void call(Subscriber subscriber){ Subscriber newSubscriber = operator.call(subscriber); newSubscriber.onStart(); onSubscribe.call(newSubscriber); }}); }Copy the code

This code is interesting: it generates and returns a new Observable, and the parameters used to create the new Observable

The implementation of the OnSubscribe callback method call() actually looks like an observable. subscribe()

The sample! The key difference is in the second line onSubscribe. Call (subscriber)

OnSubscribe refers to a different object (high alert: the next few words may cause serious physical discomfort)

Subscribe () onSubscribe refers to an onSubscribe object in an Observable, which doesn’t

There are problems, but things get a little more complicated after lift().

With lift() :

  1. Lift () creates one Observable, plus the original one

Observables.

2. Likewise, the new OnSubscribe in the new Observable adds the original in the original Observable

OnSubscribe, so you have two OnSubscribe;

3. When the user calls subscribe() in the Observable after lift(), it uses the return from lift()

The new Observable, so the onSubscribe. Call (subscriber) it triggers, is also new

The new OnSubscribe in Observable, the one generated in lift();

4. The OnSubscribe in the new OnSubscribe call() method refers to the original Observable

The original OnSubscribe, in this call() method, uses the new OnSubscribe

Operator. call(subscriber) generates a new subscriber (operator is here, via the

Your call() method associates the new Subscriber with the original Subscriber and inserts its own “transform” code

To transform), and then subscribes to the original Observable using the new Subscriber.

This implements the lift() process, which is sort of like a proxy mechanism that transforms the sequence of events through event interception and processing.

To simplify the details, an Observable that executes the Lift (Operator) method returns a new one

The new Observable will act as a proxy for what the original Observable sends

And send to Subscriber after processing.

If you prefer concrete thinking, here’s a picture:

The same goes for double and multiple lift(), as shown below:

Give a concrete Operator implementation. Here is an example of converting an Integer object in an event to a String, only

For reference:


observable.lift(new Observable.Operator<String, Integer>() {

@Override

public Subscriber<? super Integer> call(final Subscriber<? super String>

subscriber) {

// Convert the Integer object in the event sequence to a String object

return new Subscriber<Integer>() {

@Override

public void onNext(Integer integer) {

subscriber.onNext("" + integer);

}

@Override

public void onCompleted() {

subscriber.onCompleted();

}

@Override

public void onError(Throwable e){ subscriber.onError(e); }}; }});Copy the code

The rationale behind Lift () is simply to give you a better understanding of RxJava so you can use it better. But whether or not you

Knowing how lift() works, RxJava does not recommend that developers build custom operators to use lift() directly

Try to use a combination of existing lift() wrapping methods (such as map() flatMap(), etc.) to achieve the requirements, because straight

When you use lift(), it’s easy to make some hard-to-spot errors.

3) Compose: transform the whole Observable

In addition to lift(), an Observable has a transform called compose(Transformer). And lift it ()

The difference is that lift() is for event items and event sequences, while compose() is for observables themselves

Change. For example, suppose you have multiple Observables in your application, and they all need to apply the same set of Lift () transforms.

You can write it like this:


observable1

.lift1()

.lift2()

.lift3()

.lift4()

.subscribe(subscriber1);

observable2

.lift1()

.lift2()

.lift3()

.lift4()

.subscribe(subscriber2);

observable3

.lift1()

.lift2()

.lift3()

.lift4()

.subscribe(subscriber3);

observable4

.lift1()

.lift2()

.lift3()

.lift4()

.subscribe(subscriber1);
Copy the code

You think this is too unsoftware engineering, so you change it to this:

private Observable liftAll(Observable observable) {
return observable

.lift1()

.lift2()

.lift3()

.lift4();

}

liftAll(observable1).subscribe(subscriber1);

liftAll(observable2).subscribe(subscriber2);

liftAll(observable3).subscribe(subscriber3);

liftAll(observable4).subscribe(subscriber4);
Copy the code

Readability and maintainability have been improved. Observable, however, is wrapped in a method that is flexible enough to Observale

Sex does seem to add a bit of a limit. How to do? At this point, you should compose() :


public class LiftAllTransformer implements Observable.Transformer<Integer.String> {

@Override

public Observable<String> call(Observable<Integer> observable) {

returnobservable .lift1() .lift2() .lift3() .lift4(); }}... Transformer liftAll =new LiftAllTransformer();

observable1.compose(liftAll).subscribe(subscriber1);

observable2.compose(liftAll).subscribe(subscriber2);

observable3.compose(liftAll).subscribe(subscriber3);

observable4.compose(liftAll).subscribe(subscriber4);
Copy the code

An Observable makes use of the call method of the Transformer object that is passed in using the compose() method, as above

It handles itself directly, so it doesn’t have to be wrapped in a method.

For compose(), the principle is simple.