RxJava thoroughly fire this year, one of the most awesome is the operator, before only know how to use, these days look at the source code, roughly figure out the working process of the operator, today to share with you. If there is something wrong, please give us more advice. Today we have filter as an example, look at the code:

The Integer [] datas =,2,3,4,5,6,7,8,9,10 {1}; Observable.from(datas) .filter(new Func1() { @Override public Boolean call(Integer integer) { return integer>=5; } }) .subscribe(new Action1() { @Override public void call(Integer integer) { mText.append(integer.toString()+","); }});Copy the code

A very simple small example uses the filter operator to find numbers greater than or equal to 5. Let’s click on the source code and see what filter does.

 public final Observable filter(Func1 predicate) {
            return create(new OnSubscribeFilter(this, predicate));
        }Copy the code

Call create(), wait a minute, do we use create() at some point, when we create an Observable, we create a new Observable and return it, So our subscribers are actually subscribing to this new Observable. Let’s look at the create method. The create method takes an OnSubscribe object, So we can make sure that OnSubscribeFilter is an implementation class of OnSubscribe, so let’s click on that.

public final class OnSubscribeFilter implements OnSubscribe {

        final Observable source;

        final Func1 predicate;

        public OnSubscribeFilter(Observable source, Func1 predicate) {
            this.source = source;
            this.predicate = predicate;
        }Copy the code

As expected, OnSubscribeFilter is an implementation class of OnSubscribe. If we look at its constructor, it passes two parameters. The first parameter, Observable, and Func1, is an Observable that we create ourselves. The second argument makes Func1, which we wrote outside, and then saved. We all know that when we subscribe(), we call the call() method OnSubscribe. So what does the OnSubscribeFilter call() method do

@Override
        public void call(final Subscriber child) {
            FilterSubscriber parent = new FilterSubscriber(child, predicate);
            child.add(parent);
            source.unsafeSubscribe(parent);
        }Copy the code
There's a FilterSubscriber, what the hell, let's see what the hell it isCopy the code
static final class FilterSubscriber extends Subscriber { final Subscriber actual; final Func1 predicate; boolean done; public FilterSubscriber(Subscriber actual, Func1 predicate) { this.actual = actual; this.predicate = predicate; request(0); } @Override public void onNext(T t) { boolean result; try { result = predicate.call(t); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); unsubscribe(); onError(OnErrorThrowable.addValueAsLastCause(ex, t)); return; } if (result) { actual.onNext(t); } else { request(1); } } @Override public void onError(Throwable e) { if (done) { RxJavaHooks.onError(e); return; } done = true; actual.onError(e); } @Override public void onCompleted() { if (done) { return; } actual.onCompleted(); } @Override public void setProducer(Producer p) { super.setProducer(p); actual.setProducer(p); }}}Copy the code

A Subscriber subclass, we look at its constructor, two parameters, one Subscriber and Func1. When we create the object, the Subscriber object is the observer that we really transmit from the outside. Func1 is the object that we pass in when we create the OnSubscribeFilter, which is Func1 that we define outside. Let’s go back to the Call method for OnSubscribeFilter. We see source.unsafesubscribe (parent), which is our original external Observable that subscribed to the FilterSubscriber object. We see in his onNext method that he uses the return value of func1.call(t) to decide whether to call the onNext method to a true observer outside of us. Did you see it in here? What? I don’t even know what you’re talking about. Well, our overall

When subscribing (), Subscriber is not subscribing to the Observable written by ourselves, but to the new Observable returned by filter method. So when we subscribe, we call the Call method of OnSubscribeFilter, which is the onSubscribe object of the observed. In the Call () method of OnSubscribeFilter, We make our wrapped FilterSubscriber subscribe to our original Observable, which is the Observable we generated outside. The observer we get in the call method of the onSubscribe object of the external Observable is the FilterSubscriber object. The onNext we call is called back to the onNext method of the FilterSubscriber. In the onNext method of FilterSubscriber, we judge whether to call back the onNext method of the real Subscriber according to the Func1 passed by us. When it is true, we call back the onNext method of our external observer, thus playing a filtering role. This is the whole flow of Filter.

Let’s test our little conclusion:

Observable.create(new Observable.OnSubscribe() {
                @Override
                public void call(Subscriber subscriber) {
                    Log.e("call:subscriber", "" + subscriber.getClass().getCanonicalName());
                    subscriber.onNext(5);
                }
            }).filter(new Func1() {
                @Override
                public Boolean call(Integer integer) {
                    return integer > 0;
                }
            }).subscribe(new Action1() {
                @Override
                public void call(Integer integer) {

                }
            });Copy the code

Running results:

Recently applied for a public account, you can follow me oh.



I don’t know if you can understand it. I am very willing to discuss and study with you. Please leave a message