Operator Introduction

Operator: Processes emitted data and sends change propagation – changes are implemented through operators and can be propagated downward

1.RxJava1 operator source analysis

Func1 interface 2.Operator interface

1.1 RxJava1 instance

                Observable.create(new Observable.OnSubscribe<String>() {
                            @Override
                            public void call(Subscriber<? super String> subscriber) {
                                if(! subscriber.isUnsubscribed()) { subscriber.onNext("1");
                                    subscriber.onNext("2"); subscriber.onCompleted(); }}})./ / processing
                        map(new Func1<String, Integer>() {
                            @Override
                            public Integer call(String s) {
                                return Integer.parseInt(s)+2;
                            }
                        }).
                        subscribe(new Observer<Integer>() {
                            @Override
                            public void onCompleted(a) {
                                Log.d("kpioneer"."onCompleted:");
                            }

                            @Override
                            public void onError(Throwable e) {}@Override
                            public void onNext(Integer integer) {
                                Log.d("kpioneer"."onNext:" + integer + ",integer instanceOf"+ integer.getClass()); }});Copy the code
run

06-11 10:10:20.008 14148-14148/com.haocai.rxjavademo D/kpioneer: onNext:3,integer instanceOfclass java.lang.Integer
06-11 10:10:20.008 14148-14148/com.haocai.rxjavademo D/kpioneer: onNext:4,integer instanceOfclass java.lang.Integer
06-11 10:10:20.008 14148-14148/com.haocai.rxjavademo D/kpioneer: onCompleted:
Copy the code
1.2RxJava1 operator source
RxJava1 OnSubscribeMap classes

public final class OnSubscribeMap<T.R> implements OnSubscribe<R> {

    final Observable<T> source;

    final Func1<? super T, ? extends R> transformer;

    public OnSubscribeMap(Observable<T> source, Func1<? super T, ? extends R> transformer) {
        this.source = source;
        this.transformer = transformer;
    }

    @Override
    public void call(final Subscriber<? super R> o) {
        MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);
        o.add(parent);
        source.unsafeSubscribe(parent);
    }

    static final class MapSubscriber<T.R> extends Subscriber<T> {

        final Subscriber<? super R> actual;

        final Func1<? super T, ? extends R> mapper;

        boolean done;

        public MapSubscriber(Subscriber<? super R> actual, Func1<? super T, ? extends R> mapper) {
            this.actual = actual;
            this.mapper = mapper;
        }

        @Override
        public void onNext(T t) {
            R result;

            try {
                result = mapper.call(t);
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                unsubscribe();
                onError(OnErrorThrowable.addValueAsLastCause(ex, t));
                return;
            }

            actual.onNext(result);
        }

        @Override
        public void onError(Throwable e) {
            if (done) {
                RxJavaHooks.onError(e);
                return;
            }
            done = true;

            actual.onError(e);
        }


        @Override
        public void onCompleted(a) {
            if (done) {
                return;
            }
            actual.onCompleted();
        }

        @Override
        public void setProducer(Producer p) { actual.setProducer(p); }}}Copy the code
RxJava1 OnSubscribeLift classes

public final class OnSubscribeLift<T.R> implements OnSubscribe<R> {

    final OnSubscribe<T> parent;

    final Operator<? extends R, ? super T> operator;

    public OnSubscribeLift(OnSubscribe<T> parent, Operator<? extends R, ? super T> operator) {
        this.parent = parent;
        this.operator = operator;
    }

    @Override
    public void call(Subscriber<? super R> o) {
        try {
            Subscriber<? super T> st = RxJavaHooks.onObservableLift(operator).call(o);
            try {
                // new Subscriber created and being subscribed with so 'onStart' it
                st.onStart();
                parent.call(st);
            } catch (Throwable e) {
                // localized capture of errors rather than it skipping all operators
                // and ending up in the try/catch of the subscribe method which then
                // prevents onErrorResumeNext and other similar approaches to error handlingExceptions.throwIfFatal(e); st.onError(e); }}catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // if the lift function failed all we can do is pass the error to the final Subscriber
            // as we don't have the operator available to uso.onError(e); }}}Copy the code
1.3 Transformation principle (core operator lift) :

1. Accept the current Operator 2 of the original OnSubscribe. Create a new OnSubscribe and return a new Observable


    public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
        return unsafeCreate(new OnSubscribeLift<T, R>(onSubscribe, operator));
    }
Copy the code

3. Package the old Subscriber with the new one


        MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);
        o.add(parent);
        source.unsafeSubscribe(parent);
Copy the code

4. Complete the conversion in the new Subscriber and then transfer it to the old Subscriber


        @Override
        public void onNext(T t) {
            R result;

            try {
                result = mapper.call(t);
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                unsubscribe();
                onError(OnErrorThrowable.addValueAsLastCause(ex, t));
                return;
            }

            actual.onNext(result);
        }
Copy the code

    public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
        return unsafeCreate(new OnSubscribeLift<T, R>(onSubscribe, operator));
    }
Copy the code

Analysis of the

The core implementation uses a proxy mechanism

2.RxJava2 operator source analysis

2.1. RxJava2 instance

                Observable.
                        create(new ObservableOnSubscribe<String>() {
                            @Override
                            public void subscribe(ObservableEmitter<String> e) throws Exception {
                                if(! e.isDisposed()) { e.onNext("1");
                                    e.onNext("2");
                                    e.onComplete();
                                }
                            }
                        }).
                        map(new Function<String, Integer>() {
                            @Override
                            public Integer apply(String s) throws Exception {
                                return Integer.parseInt(s)+2;
                            }
                        }).
                        subscribe(new Observer<Integer>() {
                            @Override
                            public void onSubscribe(Disposable d) {
                                Log.d("kpioneer"."onSubscribe:");
                            }

                            @Override
                            public void onNext(Integer value) {
                                Log.d("kpioneer"."onNext:" + value);
                            }

                            @Override
                            public void onError(Throwable e) {}@Override
                            public void onComplete(a) {
                                Log.d("kpioneer"."onComplete"); }}); Flowable. create(new FlowableOnSubscribe<String>() {
                            @Override
                            public void subscribe(FlowableEmitter<String> e) throws Exception {
                                if(! e.isCancelled()) { e.onNext("1");
                                    e.onNext("2");
                                    e.onComplete();
                                }
                            }
                        }, BackpressureStrategy.DROP).
                        map(new Function<String, Integer>() {
                            @Override
                            public Integer apply(String s) throws Exception {
                                return Integer.parseInt(s)+2;
                            }
                        }).
                        subscribe(new Subscriber<Integer>() {
                            @Override
                            public void onSubscribe(Subscription s) {
                                s.request(Long.MAX_VALUE);
                                Log.d("kpioneer"."onSubscribe");
                            }

                            @Override
                            public void onNext(Integer integer) {
                                Log.d("kpioneer"."onNext:" + integer);
                            }

                            @Override
                            public void onError(Throwable t) {}@Override
                            public void onComplete(a) {
                                Log.d("kpioneer"."onComplete"); }});Copy the code
run

06-11 10:20:56. 688. 16675-16675 / com haocai. Rxjavademo D/kpioneer: onSubscribe: 06-11 10:20:56. 688. 16675-16675 / com haocai. Rxjavademo D/kpioneer: 06-11 10:20:56 onNext: 3. 698. 16675-16675 / com haocai. Rxjavademo D/kpioneer: 06-11 10:20:56 onNext: 4. 698. 16675-16675 / com haocai. Rxjavademo D/kpioneer: The onComplete 06-11 10:20:56. 758, 16675-16675 / com. Haocai. Rxjavademo D/kpioneer: 06-11 onSubscribe 10:20:56. 758, 16675-16675 / com. Haocai. Rxjavademo D/kpioneer: 06-11 10:20:56 onNext: 3. 758. 16675-16675 / com haocai. Rxjavademo D/kpioneer: 06-11 10:20:56 onNext: 4. 768. 16675-16675 / com haocai. Rxjavademo D/kpioneer: the onCompleteCopy the code
2.2.RxJava2 operator source code

Function interface

ObservableMap: No back pressure

public final class ObservableMap<T.U> extends AbstractObservableWithUpstream<T.U> {
    final Function<? super T, ? extends U> function;

    public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
        super(source);
        this.function = function;
    }

    @Override
    public void subscribeActual(Observer<? super U> t) {
        source.subscribe(new MapObserver<T, U>(t, function));
    }


    static final class MapObserver<T.U> extends BasicFuseableObserver<T.U> {
        final Function<? super T, ? extends U> mapper;

        MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
            super(actual);
            this.mapper = mapper;
        }

        @Override
        public void onNext(T t) {
            if (done) {
                return;
            }

            if(sourceMode ! = NONE) { actual.onNext(null);
                return;
            }

            U v;

            try {
                v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
            } catch (Throwable ex) {
                fail(ex);
                return;
            }
            actual.onNext(v);
        }

        @Override
        public int requestFusion(int mode) {
            return transitiveBoundaryFusion(mode);
        }

        @Nullable
        @Override
        public U poll(a) throws Exception {
            T t = qs.poll();
            returnt ! =null ? ObjectHelper.<U>requireNonNull(mapper.apply(t), "The mapper function returned a null value.") : null; }}}Copy the code

1. ObservableMap inherited AbstractObservableWithUpstream abstract class 2. Using the AbstractObservableWithUpstream subscribeActual method in 3. Subscribe to the Observer from the original Observable


public final class ObservableLift<R.T> extends AbstractObservableWithUpstream<T.R> {
    /** The actual operator. */
    final ObservableOperator<? extends R, ? super T> operator;

    public ObservableLift(ObservableSource<T> source, ObservableOperator<? extends R, ? super T> operator) {
        super(source);
        this.operator = operator;
    }

    @Override
    public void subscribeActual(Observer<? super R> s) {
        Observer<? super T> observer;
        try {
            observer = ObjectHelper.requireNonNull(operator.apply(s), "Operator " + operator + " returned a null Observer");
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // can't call onError because no way to know if a Disposable has been set or not
            // can't call onSubscribe because the call might have set a Disposable already
            RxJavaPlugins.onError(e);

            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(e);
            thrownpe; } source.subscribe(observer); }}Copy the code
FlowableMap: Has back pressure

public final class FlowableMap<T.U> extends AbstractFlowableWithUpstream<T.U> {
    final Function<? super T, ? extends U> mapper;
    public FlowableMap(Flowable<T> source, Function<? super T, ? extends U> mapper) {
        super(source);
        this.mapper = mapper;
    }

    @Override
    protected void subscribeActual(Subscriber<? super U> s) {
        if (s instanceof ConditionalSubscriber) {
            source.subscribe(new MapConditionalSubscriber<T, U>((ConditionalSubscriber<? super U>)s, mapper));
        } else {
            source.subscribe(newMapSubscriber<T, U>(s, mapper)); }}static final class MapSubscriber<T.U> extends BasicFuseableSubscriber<T.U> {
        final Function<? super T, ? extends U> mapper;

        MapSubscriber(Subscriber<? super U> actual, Function<? super T, ? extends U> mapper) {
            super(actual);
            this.mapper = mapper;
        }

        @Override
        public void onNext(T t) {
            if (done) {
                return;
            }

            if(sourceMode ! = NONE) { actual.onNext(null);
                return;
            }

            U v;

            try {
                v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
            } catch (Throwable ex) {
                fail(ex);
                return;
            }
            actual.onNext(v);
        }

        @Override
        public int requestFusion(int mode) {
            return transitiveBoundaryFusion(mode);
        }

        @Nullable
        @Override
        public U poll(a) throws Exception {
            T t = qs.poll();
            returnt ! =null ? ObjectHelper.<U>requireNonNull(mapper.apply(t), "The mapper function returned a null value.") : null; }}static final class MapConditionalSubscriber<T.U> extends BasicFuseableConditionalSubscriber<T.U> {
        final Function<? super T, ? extends U> mapper;

        MapConditionalSubscriber(ConditionalSubscriber<? super U> actual, Function<? super T, ? extends U> function) {
            super(actual);
            this.mapper = function;
        }

        @Override
        public void onNext(T t) {
            if (done) {
                return;
            }

            if(sourceMode ! = NONE) { actual.onNext(null);
                return;
            }

            U v;

            try {
                v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
            } catch (Throwable ex) {
                fail(ex);
                return;
            }
            actual.onNext(v);
        }

        @Override
        public boolean tryOnNext(T t) {
            if (done) {
                return false;
            }

            U v;

            try {
                v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
            } catch (Throwable ex) {
                fail(ex);
                return true;
            }
            return actual.tryOnNext(v);
        }

        @Override
        public int requestFusion(int mode) {
            return transitiveBoundaryFusion(mode);
        }

        @Nullable
        @Override
        public U poll(a) throws Exception {
            T t = qs.poll();
            returnt ! =null ? ObjectHelper.<U>requireNonNull(mapper.apply(t), "The mapper function returned a null value.") : null; }}Copy the code

1. FlowableMap inherited AbstractFlowableWithUpstream 2. Use of AbstractFlowableWithUpstream subscribeActual method 3. Subscribe the converted Subscriber FlowableLift with the original Flowable


public final class FlowableLift<R.T> extends AbstractFlowableWithUpstream<T.R> {
    /** The actual operator. */
    final FlowableOperator<? extends R, ? super T> operator;

    public FlowableLift(Flowable<T> source, FlowableOperator<? extends R, ? super T> operator) {
        super(source);
        this.operator = operator;
    }

    @Override
    public void subscribeActual(Subscriber<? super R> s) {
        try {
            Subscriber<? super T> st = operator.apply(s);

            if (st == null) {
                throw new NullPointerException("Operator " + operator + " returned a null Subscriber");
            }

            source.subscribe(st);
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // can't call onError because no way to know if a Subscription has been set or not
            // can't call onSubscribe because the call might have set a Subscription already
            RxJavaPlugins.onError(e);

            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(e);
            thrownpe; }}}Copy the code
2.3 the Operator interface

1. Implement this interface 2. Transform in subscribeActual 3. Use to extend custom operatorsCopy the code
Analysis of the

RxJava2 with and without back pressure core implementations use a proxy mechanism

3.RxJava1 operator function imitation implementation

Operator interface implementation
  1. The Operator interface is an abstract interface to the Operator
  2. The Operator interface is used to handle specific transformations
Lift operator
  1. The basic principle of transformation
  2. Each Operator implements the Operator interface and calls the lift Operator
The map operators
  1. The most basic operator
  2. As the name suggests, used for mapping

public class Caller<T> {
    final OnCall<T> onCall;

    public Caller(OnCall<T> onCall) {
        this.onCall = onCall;
    }

    public static <T> Caller<T> create(OnCall<T> onCall) {
        return new Caller<>(onCall);
    }

    public Calling call(Receiver<T> receiver) {
        this.onCall.call(receiver);
        return receiver;
    }

    public final <R> Caller<R> lift(final Operator<R, T> operator) {
        return create(new OnCallLift<>(onCall, operator));
    }

    public final <R> Caller<R> map(Func1<T, R> func) {
        return lift(new MapOperator<T, R>(func));
    }

    public interface OnCall<T> extends Action1<Receiver<T>> {}public interface Operator<R.T> extends Func1<Receiver<R>, Receiver<T>> {}}Copy the code

public interface Func1<T.R>{
    R call(T t);
}
Copy the code

public class MapOperator<T.R> implements Caller.Operator<R.T> {


    private final Func1<T, R> mapper;

    public MapOperator(Func1<T, R> mapper) {
        this.mapper = mapper;
    }

    @Override
    public Receiver<T> call(Receiver<R> rReceiver) {
        return new MapReceiver<>(rReceiver, this.mapper); }}Copy the code

阿鲁纳恰尔邦

public class MapReceiver<T, R> extends Receiver<T> { private final Receiver<R> actual; private final Func1<T, R> mapper; public MapReceiver(Receiver<R> actual, Func1<T, R> mapper) { this.actual = actual; this.mapper = mapper; } @Override public void onCompleted() { this.actual.onCompleted(); } @Override public void onError(Throwable t) { this.actual.onError(t); } @Override public void onReceive(T t) { R tR = this.mapper.call(t); this.actual.onReceive(tR); }}Copy the code

public class OnCallLift<T.R> implements Caller.OnCall<R> {

    private final Caller.OnCall<T> parent;

    private final Caller.Operator<R, T> operator;

    public OnCallLift(Caller.OnCall<T> parent, Caller.Operator<R, T> operator) {
        this.parent = parent;
        this.operator = operator;
    }

    @Override
    public void call(Receiver<R> rReceiver) {
        Receiver<T> tReceiver = this.operator.call(rReceiver);
        this.parent.call(tReceiver); }}Copy the code
call

public class Lesson2_2Activity extends AppCompatActivity {
    @Override
    protected void onCreate(final Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_custom_test);
        ButterKnife.bind(this);

    }

    @OnClick(R.id.testDo)
    public void onViewClicked(a) {
        Caller.
                create(new Caller.OnCall<String>() {
                    @Override
                    public void call(Receiver<String> stringReceiver) {
                        if(! stringReceiver.isUnCalled()) { stringReceiver.onReceive("1");
                            stringReceiver.onReceive("2");
                            stringReceiver.onCompleted();
                        }
                    }
                }).
                map(new Func1<String, Integer>() {
                    @Override
                    public Integer call(String s) {
                        return Integer.parseInt(s)+2;
                    }
                }).
                call(new Receiver<Integer>() {
                    @Override
                    public void onCompleted(a) {

                        Log.d("kpioneer"."onCompleted");
                    }

                    @Override
                    public void onError(Throwable t) {}@Override
                    public void onReceive(Integer integer) {
                        Log.d("kpioneer"."onReceive:"+ integer); }}); }}Copy the code
The Log output

06-11 16:45:26. 988. 10850-10850 / com haocai. Rxjavademo D/kpioneer: 06-11 16:45:26 onReceive: 3. 988. 10850-10850 / com haocai. Rxjavademo D/kpioneer: 06-11 16:45:26 onReceive: 4. 988. 10850-10850 / com haocai. Rxjavademo D/kpioneer: onCompletedCopy the code

3.RxJava2(no back pressure) operator function imitation implementation

CallerWithUpstream (similar to AbstractObservableWithUpstream)
  1. An abstract class
  2. Have callActual method
  3. The implementation operator implements this method
The map operators
  1. The most basic operator
  2. As the name suggests, used for mapping
CallerOperator interface
  1. I’m doing the transformation in callActual
  2. Can be used to extend operators

Related codes:


public abstract class Caller<T> {
    public static <T> Caller<T> create(CallerOnCall<T> callerOnCall) {
        return new CallerCreate<>(callerOnCall);
    }

    public void call(Callee<T> callee) {
        callActual(callee);
    }

    protected abstract void callActual(Callee<T> callee);

    public <R> Caller<R> lift(CallerOperator<R, T> operator) {
        return new CallerLift<>(this, operator);
    }

    public <R> Caller<R> map(Function<T, R> function) {
        return new CallerMap<>(this, function); }}Copy the code

public interface CallerOperator<T.R> {
    Callee<R> call(Callee<T> callee);
}
Copy the code

/** * Created by Xionghu on 2018/6/11. * Desc: Return source Caller */

public interface CallerSource<T> {
    Caller<T> source(a);
}
Copy the code

阿鲁纳恰尔邦

public abstract class CallerWithUpstream<T, R> extends Caller<R> implements CallerSource<T> { protected final Caller<T> source; public CallerWithUpstream(Caller<T> source) { this.source = source; } @Override public Caller<T> source() { return source; }}Copy the code

public class CallerLift<R.T> extends CallerWithUpstream<T.R> {
    private final CallerOperator<R, T> mOperator;

    public CallerLift(Caller<T> source, CallerOperator<R, T> mOperator) {
        super(source);
        this.mOperator = mOperator;
    }

    @Override
    protected void callActual(Callee<R> callee) { Callee<T> tCallee = mOperator.call(callee); source.call(tCallee); }}Copy the code

public interface Function<T.R> {

    R call(T t);
}
Copy the code

public class CallerMap<T.R> extends CallerWithUpstream<T.R> {
    private Function<T, R> function;

    public CallerMap(Caller<T> source, Function<T, R> function) {
        super(source);
        this.function = function;
    }

    @Override
    protected void callActual(Callee<R> callee) {
        source.call(new MapCallee<>(callee, function));
    }

    static class MapCallee<T.R> implements Callee<T> {
        private final Callee<R> mCallee;
        private final Function<T, R> mFunction;
        public MapCallee(Callee<R> mCallee, Function<T, R> mFunction) {
            this.mCallee = mCallee;
            this.mFunction = mFunction;
        }

        @Override
        public void onCall(Release release) {
            mCallee.onCall(release);
        }

        @Override
        public void onReceive(T t) {
            R tR = mFunction.call(t);
            mCallee.onReceive(tR);
        }

        @Override
        public void onCompleted(a) {
            mCallee.onCompleted();
        }

        @Override
        public void onError(Throwable t) { mCallee.onError(t); }}}Copy the code

/** * Created by Xionghu on 2018/6/11. * Desc: Created by Xionghu on 2018/6/11. ** * Created by Xionghu on 2018/6/11

public class Lesson2_3Activity extends AppCompatActivity {
    @Override
    protected void onCreate(final Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_custom_test);
        ButterKnife.bind(this);

    }

    @OnClick(R.id.testDo)
    public void onViewClicked(a) {
        Caller.
                create(new CallerOnCall<String>() {
                    @Override
                    public void call(CallerEmitter<String> callerEmitter) {
                        callerEmitter.onReceive("1");
                        callerEmitter.onReceive("2");
                        callerEmitter.onCompleted();
                    }
                }).
                map(new Function<String, Integer>() {
                    @Override
                    public Integer call(String s) {
                        return Integer.parseInt(s);
                    }
                }).
                call(new Callee<Integer>() {
                    @Override
                    public void onCall(Release release) {
                        Log.d("kpioneer"."onCall");
                    }

                    @Override
                    public void onReceive(Integer integer) {
                        Log.d("kpioneer"."onReceive:" + integer);
                    }

                    @Override
                    public void onCompleted(a) {
                        Log.d("kpioneer"."onCompleted");
                    }

                    @Override
                    public void onError(Throwable t) {}}); Caller. create(new CallerOnCall<String>() {
                    @Override
                    public void call(CallerEmitter<String> callerEmitter) {
                        callerEmitter.onReceive("3");
                        callerEmitter.onReceive("4");
                        callerEmitter.onCompleted();
                    }
                }).
                lift(new CallerOperator<Integer, String>() {
                    @Override
                    public Callee<String> call(final Callee<Integer> callee) {
                        return new Callee<String>() {
                            @Override
                            public void onCall(Release release) {
                                callee.onCall(release);
                            }

                            @Override
                            public void onReceive(String s) {
                                callee.onReceive(Integer.parseInt(s));
                            }

                            @Override
                            public void onCompleted(a) {
                                callee.onCompleted();
                            }

                            @Override
                            public void onError(Throwable t) { callee.onError(t); }}; } }). call(new Callee<Integer>() {
                    @Override
                    public void onCall(Release release) {
                        Log.d("kpioneer"."onCall");
                    }

                    @Override
                    public void onReceive(Integer integer) {
                        Log.d("kpioneer"."onReceive:" + integer);
                    }

                    @Override
                    public void onCompleted(a) {
                        Log.d("kpioneer"."onCompleted");
                    }

                    @Override
                    public void onError(Throwable t) {
                        Log.d("kpioneer"."onError"); }}); }}Copy the code

06-11 18:03:27. 268. 24409-24409 / com haocai. Rxjavademo D/kpioneer: 06-11 18:03:27 onCall. 268, 24409-24409 / com. Haocai. Rxjavademo D/kpioneer: 06-11 18:03:27 onReceive: 1. 268. 24409-24409 / com haocai. Rxjavademo D/kpioneer: 06-11 18:03:27 onReceive: 2. 268, 24409-24409 / com. Haocai. Rxjavademo D/kpioneer: 06-11 onCompleted 18:03:27. 268, 24409-24409 / com. Haocai. Rxjavademo D/kpioneer: 06-11 18:03:27 onCall. 268, 24409-24409 / com. Haocai. Rxjavademo D/kpioneer: 06-11 18:03:27 onReceive: 3. 268. 24409-24409 / com haocai. Rxjavademo D/kpioneer: 06-11 18:03:27 onReceive: 4. 268. 24409-24409 / com haocai. Rxjavademo D/kpioneer: onCompletedCopy the code

4.RxJava2(with back pressure) operator function imitation write implementation

TelephonerOperator interface
  1. I’m doing the transformation in callActual
  2. Can be used to extend operators
TelephonerWithUpstream (similar to AbstractObservableWithUpstream)
  1. An abstract class
  2. Have callActual method
  3. The implementation operator implements this method
Related to the source code

public abstract class Telephoner<T> {
    public static <T> Telephoner<T> create(TelephonerOnCall<T> telephonerOnCall){
        return new TelephonerCreate<>(telephonerOnCall);
        }

    public void call(Receiver<T> receiver) { callActual(receiver); }protected abstract void callActual(Receiver<T> receiver);

    public <R> Telephoner<R> map(Function<T, R> function) {
        return new TelephonerMap<>(this, function);
    }

    public <R> Telephoner<R> lift(TelephonerOperator<R, T> telephonerOperator) {
        return new TelephonerLift<>(this, telephonerOperator); }}Copy the code

/** * Created by Xionghu on 2018/6/12. * Desc: lift operator */

public class TelephonerLift<R.T> extends TelephonerWithUpstream<T.R> {


    private final TelephonerOperator<R, T> operator;

    public TelephonerLift(Telephoner<T> source, TelephonerOperator<R, T> operator) {
        super(source);
        this.operator = operator;
    }

    @Override
    protected void callActual(Receiver<R> receiver) { Receiver<T> tReceiver = operator.call(receiver); source.call(tReceiver); }}Copy the code

import com.haocai.mylibrary.rxJava2.Function;

/** * Created by Xionghu on 2018/6/12. * Desc: map operator */

public class TelephonerMap<T.R> extends TelephonerWithUpstream<T.R> {

    private Function<T, R> trFunction;

    public TelephonerMap(Telephoner<T> source, Function<T, R> trFunction) {
        super(source);
        this.trFunction = trFunction;
    }

    @Override
    protected void callActual(Receiver<R> receiver) {
        source.call(new MapReceiver<>(receiver, trFunction));
    }

    static class MapReceiver<T.R> implements Receiver<T> {

        private final Receiver<R> rReceiver;
        private final Function<T, R> trFunction;

        public MapReceiver(Receiver<R> rReceiver, Function<T, R> trFunction) {
            this.rReceiver = rReceiver;
            this.trFunction = trFunction;
        }

        @Override
        public void onCall(Drop d) {
            rReceiver.onCall(d);
        }

        @Override
        public void onReceive(T t) {
            R tr = trFunction.call(t);
            rReceiver.onReceive(tr);
        }

        @Override
        public void onError(Throwable t) {
            rReceiver.onError(t);
        }

        @Override
        public void onCompleted(a) { rReceiver.onCompleted(); }}}Copy the code

/** * Created by Xionghu on 2018/6/12. * Desc: operator interface */

public interface TelephonerOperator<T.R> {
    Receiver<R> call(Receiver<T> callee);
}
Copy the code

阿鲁纳恰尔邦

/** * Created by Xionghu on 2018/6/11. * Desc: Telephoner */ public interface TelephonerSource<T> {Telephoner<T> source(); }Copy the code

public abstract class TelephonerWithUpstream<T.R> extends Telephoner<R> implements TelephonerSource {
    protected final Telephoner<T> source;

    public TelephonerWithUpstream(Telephoner<T> source) {
        this.source = source;
    }

    @Override
    public Telephoner source(a) {
        returnsource; }}Copy the code

import android.os.Bundle;
import android.support.v7.app.AppCompatActivity;
import android.util.Log;

import com.haocai.mylibrary.rxJava2.Function;
import com.haocai.mylibrary.rxJava2.backpressure.Drop;
import com.haocai.mylibrary.rxJava2.backpressure.Receiver;
import com.haocai.mylibrary.rxJava2.backpressure.Telephoner;
import com.haocai.mylibrary.rxJava2.backpressure.TelephonerEmitter;
import com.haocai.mylibrary.rxJava2.backpressure.TelephonerOnCall;
import com.haocai.mylibrary.rxJava2.backpressure.TelephonerOperator;
import com.haocai.rxjavademo.R;

import butterknife.ButterKnife;
import butterknife.OnClick;

/** * Created by Xionghu on 2018/6/11. * Desc: Created by Xionghu on 2018/6/11. ** * Created by Xionghu on 2018/6/11

public class Lesson2_4Activity extends AppCompatActivity {
    @Override
    protected void onCreate(final Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_custom_test);
        ButterKnife.bind(this);

    }

    @OnClick(R.id.testDo)
    public void onViewClicked(a) {

        Telephoner.
                create(new TelephonerOnCall<String>() {
                    @Override
                    public void call(TelephonerEmitter<String> telephonerEmitter) {
                        telephonerEmitter.onReceive("1");
                        telephonerEmitter.onReceive("2");
                        telephonerEmitter.onCompleted();
                    }
                }).
                map(new Function<String, Integer>() {
                    @Override
                    public Integer call(String s) {
                        return Integer.parseInt(s);
                    }
                }).
                call(new Receiver<Integer>() {
                    @Override
                    public void onCall(Drop d) {
                        d.request(Long.MAX_VALUE);
                        Log.d("kpioneer"."onCall");
                    }

                    @Override
                    public void onReceive(Integer integer) {
                        Log.d("kpioneer"."onReceive:" + integer);
                    }

                    @Override
                    public void onError(Throwable t) {

                        Log.d("kpioneer"."onError");
                    }

                    @Override
                    public void onCompleted(a) {
                        Log.d("kpioneer"."onCompleted"); }}); Telephoner. create(new TelephonerOnCall<String>() {
                    @Override
                    public void call(TelephonerEmitter<String> telephonerEmitter) {
                        telephonerEmitter.onReceive("3");
                        telephonerEmitter.onReceive("4");
                        telephonerEmitter.onCompleted();
                    }
                }).
                lift(new TelephonerOperator<Integer, String>() {
                    @Override
                    public Receiver<String> call(final Receiver<Integer> receiver) {
                        return new Receiver<String>() {
                            @Override
                            public void onCall(Drop d) {
                                receiver.onCall(d);
                            }

                            @Override
                            public void onReceive(String s) {
                                receiver.onReceive(Integer.parseInt(s));
                            }

                            @Override
                            public void onError(Throwable t) {
                                receiver.onError(t);
                            }

                            @Override
                            public void onCompleted(a) { receiver.onCompleted(); }}; } }). call(new Receiver<Integer>() {
                    @Override
                    public void onCall(Drop d) {
                        d.request(Long.MAX_VALUE);
                        Log.d("kpioneer"."onCall");
                    }

                    @Override
                    public void onReceive(Integer integer) {
                        Log.d("kpioneer"."onReceive:" + integer);
                    }

                    @Override
                    public void onError(Throwable t) {
                        Log.d("kpioneer"."onError");
                    }

                    @Override
                    public void onCompleted(a) {
                        Log.d("kpioneer"."onCompleted"); }}); }}Copy the code

6-12 09:56:50. 108, 22364-22364 / com. Haocai. Rxjavademo D/kpioneer: 06-12 09:56:50 onCall. 108, 22364-22364 / com. Haocai. Rxjavademo D/kpioneer: 06-12 09:56:50 onReceive: 1. 108. 22364-22364 / com haocai. Rxjavademo D/kpioneer: 06-12 09:56:50 onReceive: 2. 108. 22364-22364 / com haocai. Rxjavademo D/kpioneer: OnCompleted 06-12 09:56:50. 108, 22364-22364 / com. Haocai. Rxjavademo D/kpioneer: 06-12 09:56:50 onCall. 108, 22364-22364 / com. Haocai. Rxjavademo D/kpioneer: 06-12 09:56:50 onReceive: 3. 108. 22364-22364 / com haocai. Rxjavademo D/kpioneer: 06-12 09:56:50 onReceive: 4. 108. 22364-22364 / com haocai. Rxjavademo D/kpioneer: onCompletedCopy the code