I originally published this series of articles in Jane’s Book. He recently moved on to the Nuggets. – It will be published in nuggets in the future.

The previous chapter briefly talked about some advantages and disadvantages of Event/Rx bus. It also mentions how to use RxJava “correctly” rather than reinvent the wheel yourself using RxBus.

Why not use EventBus/RxBus

It also shows a simple way to encapsulate an Observable using the create() method. However, it also leaves many pits, such as memory leakage and failure to Multicast(multiple Subscriber subscribes to the same Observable). So in this article, we’ll take a look at how to encapsulate an Observable using this example.

1. What do static methods do in Observable?

First let’s take a quick look at the static methods that just/from/create provide you with an Observable. Let’s start with just:

public static <T> Observable<T> just(T item) {
    ObjectHelper.requireNonNull(item, "The item is null");
    return RxJavaPlugins.onAssembly(new ObservableJust<T>(item));
}Copy the code

We don’t need to worry about the rxJavaplugins.onAssembly () method for now. More importantly, the Just(T item) method provides you with an instance of ObservableJust(Item), which is an implementation class inside RxJava. In RxJava 2.x, Observable is an abstract class with only one abstract method, subscribeActual(Observer Observer); (There are 13,518 lines of Observable source code!)

public abstract class Observable<T> implements ObservableSource<T>{
  //implemented methods

  protected abstract void subscribeActual(Observer<? super T> observer);

  //other implements/operators
}Copy the code

So what exactly is ObservableJust?

public final class ObservableJust<T> extends Observable<T> implements ScalarCallable<T> {

    private final T value;
    public ObservableJust(final T value) {
        this.value = value;
    }

    @Override
    protected void subscribeActual(Observer<? super T> s) {
        ScalarDisposable<T> sd = new ScalarDisposable<T>(s, value);
        s.onSubscribe(sd);
        sd.run();
    }

    @Override
    public T call(a) {
        returnvalue; }}Copy the code

We first see that the constructor assigns value directly to the member of ObservableJust. That’s why the code in Observables. Just () is run directly, rather than when there is a Subscriber like the create() method. (Observables. Just (T item1,T item2)

public static <T> Observable<T> just(T item1, T item2) {
    ObjectHelper.requireNonNull(item1, "The first item is null");
    ObjectHelper.requireNonNull(item2, "The second item is null");

    return fromArray(item1, item2);
}Copy the code

Aye? How do you draw a wind change? Not ObservableJust? All just methods except just with one item call fromArray. So let’s look at this fromArray:

public static <T> Observable<T> fromArray(T... items) {
    //NullCheck
    return RxJavaPlugins.onAssembly(new ObservableFromArray<T>(items));
}Copy the code

We ignored some of the previous checks, but here we find some familiar figures ObservableFromArray(items). Another Observable implementation class.

public final class ObservableFromArray<T> extends Observable<T> {
    final T[] array;
    public ObservableFromArray(T[] array) {
        this.array = array;
    }
    @Override
    public void subscribeActual(Observer<? super T> s) {
        FromArrayDisposable<T> d = new FromArrayDisposable<T>(s, array);
        s.onSubscribe(d);
        d.run();
    }

    static final class FromArrayDisposable<T> extends BasicQueueDisposable<T> {
      //implements}}Copy the code

Is it more familiar? Almost all Observable static methods and operators do this, and even some well-known RxJava libraries such as RxBinding use this encapsulation method. Implement the subscribeActual() method of Observable internally. Only static methods are provided to generate an Observable for you. Why do this, let’s look at the subscribeActual() method.

2. What exactly is subscribeActual()?

SubscribeActual () is the bridge between Observable and Observer. The Observer(Subscriber) is the class you write in the observable.subscribe () method, or the Consumer(which only handles onNext methods).

public final void subscribe(Observer<? super T> observer) {
        //NullCheck&Apply plugin
        subscribeActual(observer);

}Copy the code

We can see that in addition to Check and Apply, this method only has one line subscribeActual(Observer), which connects Observable and observer. So we know that the code that subscribeActual() calls only after subscribe() is called.

So how do they link? Simply call observer.onxx () line by line according to your logic. ObservableJust, for example:

@Override
public void run(a) {
    if (get() == START && compareAndSet(START, ON_NEXT)) {
        observer.onNext(value);
        if(get() == ON_NEXT) { lazySet(ON_COMPLETE); observer.onComplete(); }}}Copy the code

Another example is our ObservableFromArray:

void run(a) {
    T[] a = array;
    int n = a.length;

    for (int i = 0; i < n && ! isDisposed(); i++) { T value = a[i];if (value == null) {
            actual.onError(new NullPointerException("The " + i + "th element is null"));
            return;
        }
        actual.onNext(value);
    }
    if (!isDisposed()) {
        actual.onComplete();
    }
}Copy the code

More complex examples, such as how to encapsulate a button’s OnClick event:

@Override protected void subscribeActual(Observer<? super Object> observer) {
  if(! checkMainThread(observer)) {return;
  }
  Listener listener = new Listener(view, observer);
  observer.onSubscribe(listener);
  view.setOnClickListener(listener);
}

static final class Listener extends MainThreadDisposable implements OnClickListener {
  private final View view;
  private final Observer<? super Object> observer;

  Listener(View view, Observer<? super Object> observer) {
    this.view = view;
    this.observer = observer;
  }

  @Override public void onClick(View v) {
    if (!isDisposed()) {
      observer.onNext(Notification.INSTANCE);
    }
  }

  @Override protected void onDispose(a) {
    view.setOnClickListener(null); }}}Copy the code

But if you are careful, you will see that in every subscribeActual() method, you will find the sentence observe.onsubscribe (disposable). So what does this sentence do? According to the Observable Contract,onSubscribe says it’s ready to receive an item. And it passes the Disposable back to Subscriber in this way. Disposable is basically what controls you to cancel subscriptions. He has only two methods dispose() to unsubscribe and isDisposed() to inform him if he has unsubscribed. When unsubscribing, release resources as needed. Be logical in subscribeActual(), such as onComplete() without onNext(). There are a lot of points to note, so maybe that’s why RxJava recommends that you use static methods to generate observables. If you are interested, you can read it directly

Observables Contract.

3 Observable.create()

The create() method is a relic of history. Because of this name, many people think that observable.create () should be the first way to generate Obseravble. This is an error in RxJava 1.x, and observable.create () was almost condemned in the 1.x version. It’s not that he’s bad, it’s that he’s hard to handle. RxJava must follow the Observable Contract to perform as expected, and with create() you can ignore this rule entirely. You can continue sending the onNext event after onComplete, and the downstream will still receive the event. To use observable.create () properly in 1.x you must first understand almost all the rules. So the use of Observable.create in RxJava 1.x has been deprecated. (In the new version of RxJava 1.3, the create() method has been marked @deprecated.)

After the failure of 1.x, RxJava 2.x provides a secure create() method. Instead of processing, he uses ObservableEmitter as an intermediary. So that even if you don’t refer to ObservableContract in Emitter, the downstream will still work as expected.

4 About operators

Just,from,create, and so on are operators that generate observables. What’s the difference between operators like map,filter, and so on? Let’s take a look at the source: map:

public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
    ObjectHelper.requireNonNull(mapper, "mapper is null");
    return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}Copy the code

filter:

public final Observable<T> filter(Predicate<? super T> predicate) {
    ObjectHelper.requireNonNull(predicate, "predicate is null");
    return RxJavaPlugins.onAssembly(new ObservableFilter<T>(this, predicate));
}Copy the code

As we can see, the difference is that when a new Observable is generated, two parameters are needed, one is the Observable itself, that is, this in the code, and the other is the interface implementation that needs to be operated on (of course, there are more parameters, such as Schduler, etc., which are similar without further details). And the Observable itself is what we call upstream. Upstream and downstream are defined by the operator. For an operator, everything before the operator is upstream and everything after the operator is downstream. For example, our map:

public final class ObservableMap<T.U> extends AbstractObservableWithUpstream<T.U> {
    @Override
    public void subscribeActual(Observer<? super U> t) {
        source.subscribe(newMapObserver<T, U>(t, function)); }}Copy the code

Source is upstream of us. The MapObserver is our middleman (or operator itself), processing the data on demand and sending it downstream. Operators are very complex, and MAP is the simplest. I recommend looking at the publish(selector) and other complex operators if you’re interested. Understand operators in more depth. Of course, students with perseverance can also pay attention to the RxJava principal blog series (pure English, and very difficult to understand, not English difficult to understand, is the principle is very difficult to understand).

Advanced Reactive Java

About the lift

Those of you who have read the introduction to The Sling God article should have an idea of Lift. Almost all RxJava 1.x operators are done based on Lift. But RxJava 2.x can say that lift is almost invisible. Lift is currently used only as an interface to provide custom operators (although the simple, easy-to-use compose is preferred because lift needs to replicate seven abstract methods). . A few final points:

  • Flowable:Floawble is similar to An Observable in implementation, except that an Observable uses Disposable to cancel subscriptions. Flowable is also Subscription. The Request () method is also required to control traffic. Specifically, I recommend this article

    RxJava2.0 tutorial for beginners

Conclusion:

  • From the perspective of source code analysis, RxJava 2.x also links Observable and Observer(Subscriber) with subscribeActual. It’s not much different from a Listener in nature. However, RxJava is really the brainchild of a number of front-line Java/Android developers. Rich operators, thread scheduling and many more advantages. And it’s type-safe. Thanks again, we’re standing on their shoulders

Reactive Streams and RxJava

RxJava is not the only implementation of Reactive Programming in Java. Other famous ones include Project Reactor and Google Agera. However, considering comprehensively, RxJava is the best on Android platform in terms of performance and scalability. Because it’s all on the JVM, everyone has decided to unify the interfaces, so Reactive Streams defines a set of basic interfaces:

Includes:

// Correspond to Flowable in RxJava
public interface Publisher<T> {
    public void subscribe(Subscriber<? super T> s);
}

//RxJava does not have a direct counterpart, but rather various forms of implementation classes.
public interface Subscriber<T> {
    public void onSubscribe(Subscription s);
    public void onNext(T t);
    public void onError(Throwable t);
    public void onComplete();
}

// Again, RxJava uses Subscription directly in Flowable
public interface Subscription {
    public void request(long n);
    public void cancel();
}

//Flowable version of Subject
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}Copy the code

Because of the naming of these four interfaces. This RxJava 1.x Observable is renamed Flowable. RxJava 2.x Observable has no backpressure support at all. Due to naming conflicts, Subscription was changed to Disposable and Subscriber to Observer.