Start with the simplest use:

Observable.just("Url")
              .subscribe(new Observer<String>() {
                  @Override
                  public void onSubscribe(Disposable d) {
                      
                  }

                  @Override
                  public void onNext(String value) {

                  }

                  @Override
                  public void onError(Throwable e) {

                  }

                  @Override
                  public void onComplete() {}});Copy the code

**just(“Url”)**f

#Observable:
@SchedulerSupport(SchedulerSupport.NONE)
  public static <T> Observable<T> just(T item) {
      ObjectHelper.requireNonNull(item, "The item is null");
      return RxJavaPlugins.onAssembly(new ObservableJust<T>(item));
  }
Copy the code

See RxJavaPlugins. OnAssembly ()

public static <T> Observable<T> onAssembly(Observable<T> source) {
      Function<Observable, Observable> f = onObservableAssembly;
      if(f ! = null) {return apply(f, source);
      }
      return source;
  }
Copy the code

ObservableJust() returns the new ObservableJust(item) class,ObservableJust (item).

ObservableJust<T> extends Observable<T> implements ScalarCallable<T> {private ObservableJust<T> implements ScalarCallable<T> {private ObservableJust<T> implements ScalarCallable<T>  final T value; public ObservableJust(final T value) { this.value = value; } @Override protected void subscribeActual(Observer<? super T> s) { ScalarDisposable<T> sd = new ScalarDisposable<T>(s, value); s.onSubscribe(sd); sd.run(); } @Override public Tcall() {
      returnvalue; }}Copy the code

Just () holds the ObservableJust class by passing in parameter values via a generic.

Second, subscribe method () source code

@Override
  public final void subscribe(Observer<? super T> observer) {
      ObjectHelper.requireNonNull(observer, "observer is null");
      try {
          observer = RxJavaPlugins.onSubscribe(this, observer);

          ObjectHelper.requireNonNull(observer, "Plugin returned null Observer"); // subscribeActual(observer); } catch (NullPointerException e) { // NOPMD throw e; } catch (Throwable e) {Exceptions. ThrowIfFatal (e); RxJavaPlugins.onError(e); NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
          npe.initCause(e);
          throw npe;
      }
  }

protected abstract void subscribeActual(Observer<? super T> observer);

Copy the code

As you can see, the parameter Observer is passed to the subscribeActual(Observer), but this method is an abstract class of Observable, so we look at the implementation of this method in its subclass Observable:

public final class ObservableJust<T> extends Observable<T> implements ScalarCallable<T> { private final T value; public ObservableJust(final T value) { this.value = value; } @Override protected void subscribeActual(Observer<? super T> s) { ScalarDisposable<T> sd = new ScalarDisposable<T>(s, value); OnSubscribe (sd); // Call observer. onSubscribe(sd); // Then, call sd's run method sd.run(); } @Override public Tcall() {
      returnvalue; }}Copy the code

To get the subscribeActual() method from the code, first create the ScalarDisposable object with the Observer instance and parameter value, then call the Observer onSubscribe method, and then execute the sd.run() method

Finally, we can look at the Run method of ScalarDisposable and complete the process:

public static final class ScalarDisposable<T> extends AtomicInteger implements QueueDisposable<T>, Runnable { .... final Observer<? super T> observer; final T value; public ScalarDisposable(Observer<? super T> observer, T value) { this.observer = observer; this.value = value; }... @Override public voidrun() {
          if(get() == START && compareAndSet(START, ON_NEXT)) {// Call the onNext() method of the Observer interface and write in the parameters we passed. observer.onNext(value);if(get() == ON_NEXT) { lazySet(ON_COMPLETE); observer.onComplete(); // Call the onComplete method of the Observer interface}}}}Copy the code

So, instead of calling the Observer method directly in the Subscribe () method of the Obdervable class, we call it from both classes.

(2) Sometimes we use the simple expression new Consumer.

First, Consumer is an interface that has an Accept method inside it. public interface Consumer<T> { void accept(T t) throws Exception; } Public final Disposable Disposable subscribe(Consumer<? super T> onNext) {returnsubscribe(onNext, Functions.ERROR_CONSUMER, Functions.EMPTY_ACTION, Functions.emptyConsumer()); } we see that it passes our instantiated Consumer as an argument to another subscribe. There are also static values in the Funtions class. Click on this to find two Consumer interface instances and one Action. public static final Consumer<Throwable> ERROR_CONSUMER = new Consumer<Throwable>() { @Override public void accept(Throwable error) { RxJavaPlugins.onError(error); }}; public static final Action EMPTY_ACTION = newAction() {
      @Override
      public void run() { }

      @Override
      public String toString() {
          return "EmptyAction"; }}; static final Consumer<Object> EMPTY_CONSUMER = new Consumer<Object>() { @Override public void accept(Object v) { } @Override public StringtoString() {
          return "EmptyConsumer"; }}; Let's take a look atreturnSubscribe method: public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) { ObjectHelper.requireNonNull(onNext,"onNext is null");
      ObjectHelper.requireNonNull(onError, "onError is null");
      ObjectHelper.requireNonNull(onComplete, "onComplete is null");
      ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null"); // We found that this class implements the Observer interface and passed it into our Consumer implementation class. LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe); // Obsever subscribe(ls);return ls;
  }
Copy the code

From the above analysis, we know that the four methods in the LambadaObserver class, such as onNext, receive the parameter we pass, which is then passed to the method through the Consumer instance.

public final class LambdaObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {

  private static final long serialVersionUID = -7251123623727029452L;
  final Consumer<? super T> onNext;
  final Consumer<? super Throwable> onError;
  final Action onComplete;
  final Consumer<? super Disposable> onSubscribe;

  public LambdaObserver(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
          Action onComplete,
          Consumer<? super Disposable> onSubscribe) {
      super();
      this.onNext = onNext;
      this.onError = onError;
      this.onComplete = onComplete;
      this.onSubscribe = onSubscribe;
  }

  @Override
  public void onSubscribe(Disposable s) {
      if (DisposableHelper.setOnce(this, s)) {
          try {
              onSubscribe.accept(this);
          } catch (Throwable ex) {
              Exceptions.throwIfFatal(ex);
              s.dispose();
              RxJavaPlugins.onError(ex);
          }
      }
  }

  @Override
  public void onNext(T t) {
      try {
          onNext.accept(t);
      } catch (Throwable e) {
          Exceptions.throwIfFatal(e);
          onError(e);
      }
  }

  @Override
  public void onError(Throwable t) {
      dispose();
      try {
          onError.accept(t);
      } catch (Throwable e) {
          Exceptions.throwIfFatal(e);
          RxJavaPlugins.onError(e);
          RxJavaPlugins.onError(t);
      }
  }

  @Override
  public void onComplete() {
      dispose();
      try {
          onComplete.run();
      } catch (Throwable e) {
          Exceptions.throwIfFatal(e);
          RxJavaPlugins.onError(e);
      }
  }

  @Override
  public void dispose() {
      DisposableHelper.dispose(this);
  }

  @Override
  public boolean isDisposed() {
      returnget() == DisposableHelper.DISPOSED; }}Copy the code

## Transform operator map: This is map code that has been transformed twice

.map(new Function<String, Bitmap>() {
                  @Override
                  public Bitmap apply(String s) throws Exception {
                      URL url= new URL(s);
                      HttpURLConnection urlConnection = (HttpURLConnection) url.openConnection();
                      InputStream inputStream= urlConnection.getInputStream();
                      Bitmap bitmap= BitmapFactory.decodeStream(inputStream);
                      return bitmap;
                  }
              })
              .map(new Function<Bitmap, Bitmap>() {
                  @Override
                  public Bitmap apply(Bitmap bitmap) throws Exception {
                      return createWatermark(bitmap,"Rxjava"); }})Copy the code

Flow of events, only upstream and downstream. Defines two interfaces for generics:

public interface Function<T, R> {
 
  R apply(T t) throws Exception;
}
Copy the code
ObservableMap Public Final <R> Observable<R> map(Function<? super T, ? extends R> mapper) { ObjectHelper.requireNonNull(mapper,"mapper is null"); // The onAeesmbly() method looks at the arguments that return it, passing this as an Observable on the call method, and so onfunctionThe instancereturn RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
  }
Copy the code

public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
  final Function<? super T, ? extends U> function; // Save the last Observable calling the method, and this onefunctionInstance public ObservableMap (ObservableSource < T >source, Function<? super T, ? extends U> function) {
      super(source);
      this.function = function; } // the subscribeActual method simply passes the parameters we pass to the Observer onNext method. // If there are more than one map operation, it will traverse this and this method until it finds onesource(for example, ObservableJust) subscribeActual in subscribe() actually sends the arguments we pass to the Observer onNext method @override public void subscribeActual(Observer<? super U> t) { source.subscribe(new MapObserver<T, U>(t,function)); } / / forward: ObservableJust. Subscribe (new MapObserver < T, U > (T,function) : we manually pass the URL to the next mapObserver.onNext () parameter. static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> { final Function<? super T, ? extends U> mapper; // The Observer that will subscribe here, and the current onefunctionMapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) { super(actual); this.mapper = mapper; } @Override public void onNext(T t) {if (done) {
              return;
          }
          if (sourceMode ! = NONE) { actual.onNext(null);return; } // do the event transformation assignment U v; try { v = ObjectHelper.requireNonNull(mapper.apply(t),"The mapper function returned a null value.");
          } catch (Throwable ex) {
              fail(ex);
              return; } // Pass the transformed data actual.onnext (v) to the subscribed Observer; } @Override public int requestFusion(int mode) {return transitiveBoundaryFusion(mode);
      }

      @Override
      public U poll() throws Exception {
          T t = qs.poll();
          returnt ! = null ? ObjectHelper.<U>requireNonNull(mapper.apply(t),"The mapper function returned a null value.") : null; }}}Copy the code

# # the Subscribe. On () the source code:

SubscribeOn (Schedulers. IO ()) / / before the execution of the above in the child thread. ObserveOn (AndroidSchedulers. MainThread ()) / / after the implementation of the below in the main threadCopy the code

Let’s get straight to the highlights:

ObservableSubscribeOn public Final Observable<T> subscribeOn { ObjectHelper.requireNonNull(scheduler,"scheduler is null");
  returnRxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler)); } / / main see subscribeActual method public final class ObservableSubscribeOn < T > extends AbstractObservableWithUpstream < T, T> { final Scheduler scheduler; public ObservableSubscribeOn(ObservableSource<T>source, Scheduler scheduler) {
      super(source); this.scheduler = scheduler; } @Override public void subscribeActual(final Observer<? // create a SubscribeOnObserver Final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s); // Call the proxy Observer onSubscribe(parent); // readdisposable (scheduler.scheduleDirect(new SubscribeTask(parent))); Disposable disposable = scheduler.scheduleDirect(new SubscribeTask(parent)); parent.setDisposable(disposable); Final class implements Runnable {private final; // implements Runnable {private final SubscribeOnObserver<T> parent; SubscribeTask(SubscribeOnObserver<T> parent) { this.parent = parent; } @Override public voidrun() { source.subscribe(parent); } // Schedulers.io() returns ioholder.default static final class IOTask implements Callable<Scheduler> {@override public Scheduler call() throws Exception {returnIoHolder.DEFAULT; Static final class IoHolder {static final class IoHolder DEFAULT = new IoHolder (); } / / scheduler scheduleDirect eventually call this method to generate the Disposable / / thread pool + + Runnable thread @ NonNull public the Disposable ScheduleDirect (@nonNULL Runnable run, long delay, @nonNULL TimeUnit unit) {final Worker w = createWorker(); final Runnable decoratedRun = RxJavaPlugins.onSchedule(run); // Delegate DisposeTask task = new DisposeTask(decoratedRun, w); // Delegate DisposeTask task = new DisposeTask(decoratedRun, w); // Use the thread pool to execute tasks w.schedule(task, delay, unit);returntask; } // Execute source.subscribe(parent) in the child thread; // Go up again, this is the child thread processing logicCopy the code

If you do not call the observeOn() method, then the last Observer instance will run in the previous thread:

ObservableObserveOn public Final Observable<T> observeOn() int bufferSize) { ObjectHelper.requireNonNull(scheduler,"scheduler is null");
  ObjectHelper.verifyPositive(bufferSize, "bufferSize");
  returnRxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize)); } / / mainly see subscribeActual method public final class ObservableObserveOn < T > extends AbstractObservableWithUpstream < T, T> { @Override protected void subscribeActual(Observer<? super T> observer) { // ...... // Create a scheduler. Worker(HandlerWorker) scheduler. Worker w = scheduler.createWorker(); // The Worker object and the next observer pass source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize)); // Schedule () static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T> implements Observer<T>, Runnable { voidschedule() {
      if(getAndIncrement() == 0) {// Alter worker.schedule(this); }} //handlerMessage to process. public voidrun() {
          if (outputFused) {
              drainFused();
          } else {
              drainNormal();
          }
      }

1. void drainNormal() {
          int missed = 1;

          final SimpleQueue<T> q = queue;
          final Observer<? super T> a = actual;

          for (;;) {
              if (checkTerminated(done, q.isEmpty(), a)) {
                  return;
              }

              for (;;) {
                  boolean d = done;
                  T v;

                  try {
                      v = q.poll();
                  } catch (Throwable ex) {
                      Exceptions.throwIfFatal(ex);
                      s.dispose();
                      q.clear();
                      a.onError(ex);
                      return;
                  }
                  boolean empty = v == null;

                  if (checkTerminated(d, empty, a)) {
                      return;
                  }

                  if (empty) {
                      break;
                  }

                  a.onNext(v);
              }

              missed = addAndGet(-missed);
              if (missed == 0) {
                  break;
              }
          }
      }

      2.void drainFused() {
          int missed = 1;

          for (;;) {
              if (cancelled) {
                  return;
              }

              boolean d = done;
              Throwable ex = error;

              if(! delayError && d && ex ! = null) { actual.onError(error); worker.dispose();return;
              }

              actual.onNext(null);

              if (d) {
                  ex = error;
                  if(ex ! = null) { actual.onError(ex); }else {
                      actual.onComplete();
                  }
                  worker.dispose();
                  return;
              }

              missed = addAndGet(-missed);
              if (missed == 0) {
                  break; }}}} // MAIN_THREAD public Final Class AndroidSchedulers {private static final class MainHolder {// new Handler(looper.getMainLooper ()) Static final Scheduler MAIN_THREAD= new HandlerScheduler(new Handler(Looper.getMainLooper())); } / / so AndroidSchedulers mainThread () the last return is HandlerScheduler object public static SchedulermainThread() {
      return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
  }
}
final class HandlerScheduler extends Scheduler {
  private final Handler handler;
  HandlerScheduler(Handler handler) {
      this.handler = handler;
  }
  @Override
  public Worker createWorker() {
      returnnew HandlerWorker(handler); Private static final class HandlerWorker extends Worker {@override public Disposable Schedule (Runnable run, long delay, TimeUnit unit) {schedule(scheduled run, long delay, TimeUnit unit) {schedule(scheduled run, long delay, TimeUnit unit) {schedule(scheduled run, long delay, TimeUnit unit) { scheduled); message.obj = this; // Used as tokenfor batch disposal of this worker// But the handler does not copy the handleMessage method, so how did it call the method? Everything in the Handler source Handler. SendMessageDelayed (message, Math. Max (0, l unit.and toMillis (delay))); }}Copy the code