Observables publish data and consume it by the Observer. Consumers should have the ability to give up operating resources. The design of Disposable is decoupled, and the state management is aimed at the Observer end. Therefore, the Disposable interface should be implemented along with the implementation of Observer, so that the consumer end has the ability of state management. This capability alone is not enough. It also needs the cooperation of an Observable.

@CheckReturnValue
@SchedulerSupport("none")
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) {
    ObjectHelper.requireNonNull(onNext, "onNext is null");
    ObjectHelper.requireNonNull(onError, "onError is null");
    ObjectHelper.requireNonNull(onComplete, "onComplete is null");
    LambdaObserver<T> ls = new LambdaObserver(onNext, onError, onComplete, Functions.emptyConsumer());
    this.subscribe((Observer)ls);
    return ls;
}
Copy the code

Observe the Observable core method and see that the method returns a Disposable Observer, which is returned as Disposable and retrieved by our program so that we can control the Observer state.

// State management interface, non-functional interface, cannot be used functionally quickly. public interface Disposable { void dispose(); boolean isDisposed(); }Copy the code

ObservableCache#subscribeActual(Observer<? Super T> T)

protected void subscribeActual(Observer<? super T> t) { ObservableCache.CacheDisposable<T> consumer = new ObservableCache.CacheDisposable(t, this); t.onSubscribe(consumer); this.add(consumer); ObservableCache is actually called, so the Observer and ObservableCache can control state separately. if (! this.once.get() && this.once.compareAndSet(false, true)) { this.source.subscribe(this); } else { this.replay(consumer); }} You can see that CacheDisposable is wrapped first in the subscribeActual() method that actually generates subscriptions: static final class CacheDisposable<T> extends AtomicInteger implements Disposable { private static final long serialVersionUID = 6770240836423125754L; final Observer<? super T> downstream; final ObservableCache<T> parent; ObservableCache.Node<T> node; int offset; long index; volatile boolean disposed; CacheDisposable(Observer<? super T> downstream, ObservableCache<T> parent) { this.downstream = downstream; this.parent = parent; this.node = parent.head; } public void dispose() { if (! this.disposed) { this.disposed = true; this.parent.remove(this); } } public boolean isDisposed() { return this.disposed; // If an Observable is an infinite stream, leaving EMPTY directs the Observable to execute indefinitely, even if the subscriber terminates it. void remove(CacheDisposable<T> consumer) { for (;;) { CacheDisposable<T>[] current = observers.get(); int n = current.length; if (n == 0) { return; } int j = -1; for (int i = 0; i < n; i++) { if (current[i] == consumer) { j = i; break; } } if (j < 0) { return; } CacheDisposable<T>[] next; // Always leave an EMPTY if (n == 1) {next = EMPTY; } else { next = new CacheDisposable[n - 1]; System.arraycopy(current, 0, next, 0, j); System.arraycopy(current, j + 1, next, j, n - j - 1); } if (observers.compareAndSet(current, next)) { return; }}} ObservableSource<T>.subscribe(this) internally calls the Observer onNext(), onError(), or onComplete() methods. For observers, to actively terminate the subscription, ObservableSource<T>.subscribe(this). @override public void onNext(T T) {int tailOffset = this.tailOffset; // if the current tail node is full, create a fresh node if (tailOffset == capacityHint) { Node<T> n = new Node<T>(tailOffset); n.values[0] = t; this.tailOffset = 1; tail.next = n; tail = n; } else { tail.values[tailOffset] = t; this.tailOffset = tailOffset + 1; } size++; for (CacheDisposable<T> consumer : observers.get()) { replay(consumer); }} // When onError() and onComplete() are complete, implement observers. GetAndSet (TERMINATED), set the array of subscribers to null, and return the previous array of subscribers, iterating through to let them end their processing. @SuppressWarnings("unchecked") @Override public void onError(Throwable t) { error = t; done = true; for (CacheDisposable<T> consumer : observers.getAndSet(TERMINATED)) { replay(consumer); } } @SuppressWarnings("unchecked") @Override public void onComplete() { done = true; for (CacheDisposable<T> consumer : observers.getAndSet(TERMINATED)) { replay(consumer); Now let's see if (consumer.prompt) {consumer.node = null; return; } Because ObservableCache implements an Observer, the Observer is ObservableCache.Copy the code

Both Observables and observers should have the ability to unsubscribe, but Observer unsubscribe and Observable unsubscribe should not be confused. So at subscription time, the Observer is first wrapped as CacheDisposable using the Decorator design pattern, which gives the subscriber the ability to unsubscribe. ObservableCache handles CacheDisposable as well as onNext(), onComplete(), onError(), ObservableCache. Finally, source.subscribe(this) connects the two to achieve functional decoupling.