One: What is RxJava?
GitHub about RxJava
a library for composing asynchronous and event-based programs by using observable sequences
What he means is an asynchronous and event-based library composed of observable sequences.
The advent of RxJava eliminates synchronization issues, thread safety, and more
In general, it is convenient for us to program asynchronously.
Two: advantages and disadvantages of RxJava
advantages
asynchronous
Chain call structure
Simplicity can be maintained when using complex asynchronous invocation methods
disadvantages
The cost of learning is relatively high, the threshold of entry is relatively high
Difficult to understand API, you need to look at the source code to understand the specific effects of the API
Three: basic use of RxJava
First understand the basic steps of its use:
- Create an Observable
- Create an Observer
- Subscribe
1. Create the observed
Normal creation of observed:
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("ONE");
emitter.onNext("TWO");
emitter.onNext("THREE"); emitter.onComplete(); }});Copy the code
There are four events: One, Two, Three, and the end.
PS:
Normal create first bullet:
Observable observable = Observable.just("ONE","TWO","THREE");
Normal create second bullet:
String[] values = {"ONE"."TWO"."THREE"};
Observable observable = Observable.fromArray(values);
Copy the code
In fact, this abnormal creation is an internal wrapper that sends this information to the observer as an event like onNext().
2. Create an observer
Normal creation:
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.i("z"."onSubscribe: ");
}
@Override
public void onNext(String s) {
Log.i("z"."onNext: s = " + s);
}
@Override
public void onError(Throwable e) {
Log.i("z"."onError: ");
}
@Override
public void onComplete() {
Log.i("z"."onComplete: "); }};Copy the code
Abnormal creation:
Consumer<String> observer = new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.i("z"."accept: s = "+ s); }};Copy the code
3. To subscribe to
observable.subscribe(observer);
You have noticed something different. Why did the observer subscribe to the observer?
This happens because RxJava keeps chained calls flowing.
4. Asynchronous invocation
Since RxJava is an asynchronous library, of course it is better for asynchronous processing
Before we look at RxJava’s asynchronous invocation, let’s take a look at two of the more important points
- subscribeOn()
- observeOn()
subscribeOn
An Observable is created in a specific environment. It can only be used once.
observeOn
Indicates the context in which event delivery and final processing took place. It can be called multiple times, and after each invocation, the next step takes effect.
Such as:
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("ONE");
emitter.onNext("TWO");
emitter.onNext("THREE"); emitter.onComplete(); }}) // The observed creates.subscribeon (schedulers.newthread ()) in a new thread. ObserveOn (schedulers.io ()).map(new) Function<String, String>() { @Override public String apply(String s) throws Exception {returns.toLowerCase(); }}) // The observer is in the main thread. ObserveOn (AndroidSchedulers. MainThread ()). The subscribe (new Consumer < String > () {@ Override public void accept(String s) throws Exception { } });Copy the code
Four: RxJava basic subscription process
Let’s take a look at the basic invocation:
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
Log.i(TAG, "subscribe: ");
emitter.onNext("ONE");
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.i(TAG, "onSubscribe: ");
}
@Override
public void onNext(String s) {
Log.i(TAG, "onNext: s = " + s);
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "onError: e = " + e.getMessage());
}
@Override
public void onComplete() {
Log.i(TAG, "onComplete: "); }}); }});Copy the code
Results:
onSubscribe:
subscribe:
onNext: s = ONE
Copy the code
Let’s start with the subscribe method, which is the subscribe method
public final void subscribe(Observer<? super T> observer) { ... // Ignore part of the source code subscribeActual(observer); . // Ignore some source code}Copy the code
Go directly to the main method subscribeActual(Observer), which is an abstract method that is implemented in a subclass.
So let’s look at subclasses of Observable:
Let’s go to the create method:
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source."source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
Copy the code
The object that is returned is the ObservableCreate object,
ObservableCreate is a subclass of ObservableCreate, an ObservableCreate subclass of ObservableCreate, an ObservableCreate subclass of Observable.
The subscribeActual method is implemented in ObservableCreate
@Override protected void subscribeActual(Observer<? super T> observer) { CreateEmitter<T> parent = new CreateEmitter<T>(observer); // Trigger observer here#onSubscribe()observer.onSubscribe(parent); Try {// call source.subscribe(parent); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); parent.onError(ex); }}Copy the code
Here the method can see that the onSubscribe of the observer occurs before the callback.
We then call the ObservableOnSubscribe method subscribe
It’s up to the developer to do that. As you can see in this example CreateEmitter is called, go inside CreateEmitter and see how onNext() is implemented
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}
Copy the code
You can see that onNext() of the Observer is called in CreateEmitter’s onNext().
You can then see the call in the case:
@Override
public void onNext(String s) {
Log.i(TAG, "onNext: s = " + s);
}
Copy the code