This section describes the Cache operator functions

Caches the arguments passed from the previous operation through onNext, and on the next subscribe, skip the previous step and execute the next step directly.

use

It may be difficult for you to understand the function of Cache only in text. You can imagine, for example, an HTTP request, where I have a delay, and if I didn’t have a cache, every time I subscribe I would actually initiate an HTTP request. But if you add a cache operator to the end, then the next time you subscribe, you actually get it from the cache, without actually making an HTTP request.

Let’s do a little code
  var handler = Handler()

// Cache becomes more effective after a delay

// observable = Observable.create(object : Observable.OnSubscribe<String> {

// override fun call(t: Subscriber<in String>) {

// t.onNext("Test1")

// t.onNext("Test2")

// t.onNext("Test3")

// }

// }).delay(4, TimeUnit.SECONDS).cache()

observable = Observable.create(object : Observable.OnSubscribe<String> {

override fun call(t: Subscriber<in String>) {

t.onNext("Test1")

t.onNext("Test2")

t.onNext("Test3")

}

}).cache()



btSub.setOnClickListener({

observable? .subscribe({ msg ->

handler.post(Runnable {

tvContent.text = tvContent.text.toString() + "\n" + msg



})



})

})

Copy the code

The cache is delayed by 4 seconds after the first click of the subscribe button triggered by Test1,Test2,Test3, second, third… Delay operator Test1,Test2,Test3, delay operator Test1,Test2,Test3, delay operator

With such a long string of introductions, I think you are rightcacheOperators have a certain understanding, so we take their own minds of doubt and guess, in accordance with the order of demo step by step into the source code.

###### Take a look at the source Observable

public final Observable<T> cache() {

return CachedObservable.from(this);

}

Copy the code

CachedObservable

public static <T> CachedObservable<T> from(Observable<? extends T> source) {

return (CachedObservable<T>)from(source, 16);

}

public static <T> CachedObservable<T> from(Observable<? extends T> source, int capacityHint) {

if (capacityHint < 1) {

throw new IllegalArgumentException("capacityHint > 0 required");

}

CacheState<T> state = new CacheState<T>(source, capacityHint);

CachedSubscribe<T> onSubscribe = new CachedSubscribe<T>(state);

return new CachedObservable<T>(onSubscribe, state);

}

Copy the code

Personal understanding: because actually has been introduced to the front, the cache is passed down in front of the parameter, keep up, so is certainly need to have an array or a list, here I see capacityHint, I think CacheState should be the container. It’s not the most important. In Observable subclasses, the two most important things to look at are OnSubscribe,Subscriber, and so on

CachedObservable.CacheState

static final class CacheState<T> extends LinkedArrayList implements Observer<T> {

.

}

Copy the code

CachedObservable.CachedSubscribe

static final class CachedSubscribe<T> extends AtomicBoolean implements OnSubscribe<T> {

.

}

Copy the code

OnSubscribe is pretty obvious, CachedSubscribe. Is CacheState Subscriber? , where CacheState implements the Observer interface Observer

public interface Observer<T> {    

void onCompleted();  

  void onError(Throwable e);    

void onNext(T t);

}

Copy the code

As we have introduced in several previous articles, Subscriber is actually onNext.. And other methods of concrete implementation. So we can view CacheState directly as Subscriber, which helps us a lot.

 public abstract class Subscriber<T> implements Observer<T>, Subscription {

.

}

Copy the code

Subscriber implements the Observer interface. Subscriber can subscribe,unsubscribe, and onStart methods. In the RxJava operator, we are interested in the onNext method of Subscrber and the call method of OnSubscribe, so we have a deep understanding of this operator.

Now that we’ve found two important objects, let’s dig deeper. Observable. Cache gets a CachedObservable, and then clicks subscribe Observable

.

public final Subscription subscribe(final Action1<? super T> onNext) {

if (onNext == null) {

throw new IllegalArgumentException("onNext can not be null");

}



Action1<Throwable> onError = InternalObservableUtils.ERROR_NOT_IMPLEMENTED;

Action0 onCompleted = Actions.empty();

return subscribe(new ActionSubscriber<T>(onNext, onError, onCompleted));

}

.

public final Subscription subscribe(Subscriber<? super T> subscriber) {

return Observable.subscribe(subscriber, this);

}

.

static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {

.

subscriber.onStart();

if (! (subscriber instanceof SafeSubscriber)) {

subscriber = new SafeSubscriber<T>(subscriber);

}

RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);

.

Copy the code

I’m going to post this piece of code, which I’ve actually posted in the last couple of articles, but I’m going to post it again so you can remember it better. Call the subscribe method, is actually obsevable. OnSubscibe. The call (the subscriber) key local, the obsevable here, onSubscribe, what are the subscriber? The more operators you use in a chain, the harder it will be at the end, so we only have one operator here, Observable is CachedObservable and obsevable. OnSubscibe is CachedSubscribe. If you assume subscriber is the same as the CacheState, you are wrong. So what exactly is subscriber? So let’s start with the code and work our way up, and we’ll see. Subscriber is a parameter passed in from outside,

.

if (! (subscriber instanceof SafeSubscriber)) {

subscriber = new SafeSubscriber<T>(subscriber);

}

.

Copy the code

In fact, we have been told that the final subscriber is actually a SafeSubscriber. The original subscriber is saved in SafeSubscriber as a construction parameter.

So let’s go ahead and see what subscriber was.

.

public final Subscription subscribe(final Action1<? super T> onNext) {

.

return subscribe(new ActionSubscriber<T>(onNext, onError, onCompleted));

}

.

Copy the code

Obviously, the Action we passed in the demo was wrapped as ActionSubscriber.

Look at the picture speak

So it ends up being this subscriber.

So let’s go further.

observable.onSubscriber.call(subscriber)

static final class CachedSubscribe<T> extends AtomicBoolean implements OnSubscribe<T> {

.

@Override

public void call(Subscriber<? super T> t) {

ReplayProducer<T> rp = new ReplayProducer<T>(t, state);

state.addProducer(rp);

t.add(rp);

t.setProducer(rp);

if (! get() && compareAndSet(false, true)) {

state.connect();

}

}

}

Copy the code

Let’s take it line by line.

  1. createReplayProducerobject
       static final class ReplayProducer<T> extends AtomicLong implements Producer, Subscription {

    .

    public ReplayProducer(Subscriber<? super T> child, CacheState<T> state) {

    this.child = child;

    this.state = state;

    }

    @Override

    public void request(long n) {

    .

    }

    .

    Copy the code

ReplayProducer is a Producer. We’ve seen before that Producer. We just need to look at the request method. So let’s take a look at its request.

  1. theReplayProducerAdded to theCacheStateIn the object
    public void addProducer(ReplayProducer<T> p) {

    synchronized (connection) {

    ReplayProducer<? >[] a = producers;

    int n = a.length;

    ReplayProducer<? >[] b = new ReplayProducer<? >[n + 1];

    System.arraycopy(a, 0, b, 0, n);

    b[n] = p;

    producers = b;

    }

    }

    Copy the code

We’re using an array to add a new element, which is pretty simple, but I won’t go into that.

  1. theReplyProducerAs aSubscriptionandSubscriberBind together.
    public final void add(Subscription s) {

    subscriptions.add(s);

    }

    .

    @Override

    public final void unsubscribe() {

    subscriptions.unsubscribe();

    }

    .

    Copy the code

It can be understood that when subscriber. Unsubscribe, the corresponding Producer cancels the subscription. This kind of operation also occurs in the previous operators.

  1. callProducertherequestmethods

    The setProducer operation, as we’ve seen before, is a direct callProducerThe request method

Note: since we only use one operator now, it is relatively simple, so we will understand this first. In the future, we will talk about the mixed use of multiple operators, which may appear more complicated.

.

public void request(long n) {

for (;;) {

long r = get();

if (r < 0) {

return;

}

long u = r + n;

if (u < 0) {

u = Long.MAX_VALUE;

}

if (compareAndSet(r, u)) {

replay();

return;

}

}

}

public void replay() {

synchronized (this) {

if (emitting) {

missed = true;

return;

}

emitting = true;

}

boolean skipFinal = false;

try {

final Subscriber<? super T> child = this.child;



for (;;) {



long r = get();



if (r < 0L) {

skipFinal = true;

return;

}

int s = state.size();

if (s ! = 0) {

// This code is presented for analysis when read

.



}

synchronized (this) {

if (! missed) {

emitting = false;

skipFinal = true;

return;

}

missed = false;

}

}

} finally {

if (! skipFinal) {

synchronized (this) {

emitting = false;

}

}

}

}

}

.

Copy the code

In the request method, the replay method is called. The most important sentence in it is

int s = state.size();

if (s ! = 0) {

.

}

Copy the code

The first call to subscribe didn’t cache anything at all, so state.size()==0, so the first call to replay basically did nothing.

  1. First callsubscribeperform
    if (! get() && compareAndSet(false, true)) {

    state.connect();

    }

    Copy the code

Let’s see first! Get () && compareAndSet(false, true), because CachedSubscribe

extends AtomicBoolean

Get ()==false the first time, and then set to true, ensures that state.connect is not called the next time

Let’s look at the CONNECT method in detail. The most important caching operations are here

.

public void connect() {

Subscriber<T> subscriber = new Subscriber<T>() {

@Override

public void onNext(T t) {

CacheState.this.onNext(t);

}

@Override

public void onError(Throwable e) {

CacheState.this.onError(e);

}

@Override

public void onCompleted() {

CacheState.this.onCompleted();

}

};

connection.set(subscriber);

source.unsafeSubscribe(subscriber);

isConnected = true;

}

.

Copy the code

Here, let’s do it step by step

  1. To create aSubscriber

    Because up front, we’re actually rightcacheThe function has been known, that is, after the first time, to call againsubscribeYou just take the cached data and pass it on.

In this case, we can see that the newly created Subscriber is the one to be directly transferred. And we can actually draw a rough picture here.

Look at the picture speak

The first subscribe is realized by going above, and the second subscribe directly goes below the dotted line, and the Subscriber in the middle is the one we are talking about now. After the second time, I’m going to jump right over OnSubscribe in the demo.

Note: There is no reference to the Subscriber in the demo. According to the previous analysis, we know that the Subscriber in the demo has been wrapped as SafeSubscriber and stored in the child variable in ReplyProducer

  1. savesubscriber

    Now that I mentioned, every operation after the first one will use thissubscriberSo we definitely have to put thissubscriberSave it.
    public void set(Subscription s) {

    .

    state.update(s);

    }

    Copy the code

SequentialSubscription

.

public boolean update(Subscription next) {

for (;;) {

Subscription current = get();



if (current == Unsubscribed.INSTANCE) {

if (next ! = null) {

next.unsubscribe();

}

return false;

}



if (compareAndSet(current, next)) {

if (current ! = null) {

current.unsubscribe();

}

return true;

}

}

}

.

Copy the code

As we expected, the subscriber was indeed saved. Next, compareAndSet (current).

  1. For the first time,subscribeAgain, call the previous oneonSubscribethe
    source.unsafeSubscribe(subscriber);

    Copy the code

In fact, is

source.onSubscribe.call(subscriber);

Copy the code

Source is the Observable object passed in to our demo when we create CachedObservable. Create () OnSubscribe is just like onSubscribe in the demo. Now we still focus on subscriber.

In this case, the subscriber we can easily identify from the above code is the newly released one, and onNext is directly called onNext of CacheState.

CachedObservable.CacheState

.

public void onNext(T t) {

if (! sourceDone) {

Object o = NotificationLite.next(t);

add(o);

dispatch();

}

}

.

Copy the code

When we call a subscriber in the demo OnSubscribe. OnNext, is actually to enter CachedObservable. CacheState onNext.

Here we go again, sentence by sentence.

  1. NotificationLite.next(t)
public static <T> Object next(T t) {

if (t == null) {

return ON_NEXT_NULL_SENTINEL;

} else {

return t;

}

}

Copy the code

The argument is obviously null.

  1. add(o)

We mentioned earlier that CacheState is used to hold the parameters passed above. The implementation is right here, add(o).

  1. dispatch()
void dispatch() {

ReplayProducer<? >[] a = producers;

for (ReplayProducer<? > rp : a) {

rp.replay();

}

}

Copy the code

Obviously the most important thing here is rp.replay(). Let’s take a look at what rp.replay does regardless of why the for loop is used.

 public void replay() {

.

int s = state.size();

if (s ! = 0) {

.

if (NotificationLite.accept(child, o)) {

.

}

}

.

}

Copy the code

NotificationLite.accept

public static <T> boolean accept(Observer<? super T> o, Object n) {

.

o.onNext((T) n);

return false;

.

}

Copy the code

Reply is to determine if there is a cache for CacheState and then call child.onNext(o) directly.

So what is a child? That’s what we analyzed

Calling onNext layer by layer eventually leads to our own Action.

Second callsubscribe

We’ve already gone through the whole process of calling SUBSCRIBE for the first time, so let’s just go through the process of calling SUBSCRIBE for the second time.

We start with CachedSubscribe’s call method.

public void call(Subscriber<? super T> t) {

ReplayProducer<T> rp = new ReplayProducer<T>(t, state);

state.addProducer(rp);

t.add(rp);

t.setProducer(rp);

if (! get() && compareAndSet(false, true)) {

state.connect();

}

}

Copy the code
  1. I create another oneReplayProducer
  2. Added to theCacheStateIn the
  3. theReplayProducerTo join theSafeSubscriberAre bound together
  4. callProducertherequestMethod, in this case, is actually a callReplayProducerthereplayMethods (previously analyzed)
  5. Because the first time, it was already set totruesoget()==trueJust skip

Therefore, in summary, we still directly look at ReplayProducer’s replay method.

 public void replay() {

.

int s = state.size();

if (s ! = 0) {

.

if (NotificationLite.accept(child, o)) {

.

}

}

.

}

Copy the code

NotificationLite.accept

public static <T> boolean accept(Observer<? super T> o, Object n) {

.

o.onNext((T) n);

return false;

.

}

Copy the code

Because the first time we saved all the emitted objects in CacheState. In our current demo, Test1,Test2, and Test33 strings are stored in state and SafeSubsciber’s onNext method is called directly.

conclusion

In general, the cache operator actually looks more difficult in the source code than the first two operators, but with the first two operators in the background, it is relatively easy.

additional

We already have a general understanding of the overall cache flow, but we still have a problem. Let’s look at this code again.

void dispatch() {

ReplayProducer<? >[] a = producers;

for (ReplayProducer<? > rp : a) {

rp.replay();

}

}

Copy the code

Why is there a for loop here?

In fact, we can see it from the picture above. Test1,Test2,Test3 are repeated three times. Because I clicked the SUBSCRIBE button 3 times in a row in the demo. Subscribe 3 times, adding 3 ReplayProducer to CacheState, so when we call our Dispatch method, the for loop calls replay 3 times, and then outputs 3 times.


Please give me a thumbs-up if you like it

WeChat pay

Alipay