Preface:

Want to see the source code of Rxjava for a long time, has not been able to stick to it. Based on this idea, I plan to write an article while reading it, which is also my first shared article.

Writing is not good, writing may be more dry, understanding is not necessarily in place, but also please forgive me.

I will also write some of my questions into the article, but also look forward to a lot of comments, generous comments.

The creation operator -Create

Observable.subscribe(Observer)

Create an Observable and subscribe to it. This is the most basic operation:

Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> e) throws Exception { e.onNext(1); e.onNext(2); e.onComplete(); } }).subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(Integer i) { Log.d("BlackC0", "onNext: " + i); } @Override public void onError(Throwable e) { Log.d("BlackC0", "onError: "+ e.toString()); } @Override public void onComplete() { Log.d("BlackC0", "onComplete()"); }});Copy the code

So who created Emitter here, and what do they do when they subscribe? So let’s see how it works.

@SchedulerSupport(SchedulerSupport.NONE) @Override public final void subscribe(Observer<? super T> observer) { ObjectHelper.requireNonNull(observer, "observer is null"); / / to empty the try {observer = RxJavaPlugins. OnSubscribe (this, the observer). / / global configuration intercept ObjectHelper. RequireNonNull (the observer, "Plugin returned null observer"); / / to empty subscribeActual (observer); } catch (NullPointerException e) {NOPMD throw e; } catch (Throwable e) { Exceptions.throwIfFatal(e); // can't call onError because no way to know if a Disposable has been set or not // can't call onSubscribe because the call might have set a Subscription already RxJavaPlugins.onError(e); Npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS"); npe.initCause(e); throw npe; } } -------------------------------------------------------------------------------------------- @CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) public static <T> Observable<T> create(ObservableOnSubscribe<T> source) { ObjectHelper.requireNonNull(source, "source is null"); return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source)); / / is via global intercept and then eventually return} -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- @Override protected void subscribeActual(Observer<? super T> observer) { CreateEmitter<T> parent = new CreateEmitter<T>(observer); observer.onSubscribe(parent); // Emitter is the interface that creates and implements Disposable try {source.subscribe(parent); } catch (Throwable ex) {Exceptions. ThrowIfFatal (ex); parent.onError(ex); }}Copy the code

Here first created a observables, and have a ObservableOnSubscribe an anonymous inner class instance, implements the ObservableOnSubscribe. The subscribe () method. We then create an Observer and subscribe to an Observable, using the observable.subscribe () method. The observables. SubscribeActual () the actual subscription method, the observer is Emitter by packaging, In the observables are called ObservableOnSubscribe calls the ObservableOnSubscribe. The subscribe () method, and the emitter to send out.

OnNext(), OnError(), onComplete()

This completes a subscription, and how data is sent to the Observer from an Observable once the subscription starts. We know that data is being sent by an Emitter, so let’s take a look at an example of an emitter.

static final class CreateEmitter<T> extends AtomicReference<Disposable> implements ObservableEmitter<T>, Disposable { private static final long serialVersionUID = -3434801548987643227L; final Observer<? super T> observer; CreateEmitter(Observer<? super T> observer) { this.observer = observer; } @override public void onNext(T T) { If (t == null) {onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources.")); return; } if (! IsDisposed ()) {// Determine whether the subscription ends observer.onnext (t); } } @Override public void onError(Throwable t) { if (! TryOnError (t)) {// Emitter onError() will be intercepted by tryOnError and thrown into the Observer layer rxJavaplugins.onerror (t); Override public Boolean tryOnError(Throwable t) {if (t == null) {t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources."); } if (! isDisposed()) { try { observer.onError(t); } finally { dispose(); // You can see why the subscription is terminated after onError(), because the subscription is unsubscribed} return true; } return false; } Override public void onComplete() {Override public void onComplete() {Override public void onComplete() {Override public void onComplete()} { if (! isDisposed()) { try { observer.onComplete(); Dispose (); dispose(); dispose(); } } } @Override public void setDisposable(Disposable d) { DisposableHelper.set(this, d); } @Override public void setCancellable(Cancellable c) { setDisposable(new CancellableDisposable(c)); } @Override public ObservableEmitter<T> serialize() { return new SerializedEmitter<T>(this); } @Override public void dispose() { DisposableHelper.dispose(this); } @Override public boolean isDisposed() { return DisposableHelper.isDisposed(get()); }}Copy the code

The Observer OnNext(), OnError(), and OnComplete() methods are all called directly by an Emitter. This Emitter is one of several instances of the Emitter. It is also possible to implement custom Emitter intercepts.

Disposable

Subscription, data transfer through the above code, have a certain understanding. So what is the process of active cancellation of subscription? Now we have clues to the two methods emitter implements: Dispose () and isDisposed(). So let’s take a look at what this method says. Because the code is longer, I’ll just extract the main methods.

public enum DisposableHelper implements Disposable { /** * The singleton instance representing a terminal, disposed state, don't leak it. */ DISPOSED ; public static boolean isDisposed(Disposable d) { return d == DISPOSED; } public static boolean dispose(AtomicReference<Disposable> field) { Disposable current = field.get(); Disposable d = DISPOSED; if (current ! = d) { current = field.getAndSet(d); if (current ! = d) { if (current ! = null) { current.dispose(); } return true; } } return false; } @Override public void dispose() { // deliberately no-op } @Override public boolean isDisposed() { return true; }}Copy the code

You can see DisposableHelper itself is an enumeration class, singling out an unsubscribed Disposable. And confirm the nature of the subscription is cancelled, is to use the current to the Disposable and DisposableHelper DISPOSED to do contrast, if the two do not agree, then there is no cancellation, if both agree, will represent the current subscription has been cancelled. And cancel the subscription and will be replaced the Disposable DisposableHelper. The original DISPOSED, since AtomicReference involves my blind area of knowledge, there is no need to do too much.

conclusion

That’s all you need to know about the Create operator