preface

Introduction to global transformation
  1. Put a bunch of transformations together
  2. Used for fixed transformation scenes

Integral transformation embodies the idea of responsive programming: integral change is realized through Transformer and can be propagated downward after the change.

1.RxJava1 overall transformation instance

                Observable.create(new Observable.OnSubscribe<String>() {
                    @Override
                    public void call(Subscriber<? super String> subscriber) {
                        if(! subscriber.isUnsubscribed()) { subscriber.onNext("test");
                            Log.d(TAG, "currentThread:" + Thread.currentThread());
                            subscriber.onCompleted();
                        }
                    }
                }).compose(new Observable.Transformer<String, String>() {

                    @Override
                    public Observable<String> call(Observable<String> stringObservable) {
                        return stringObservable.
                                subscribeOn(Schedulers.newThread()).
                                observeOn(AndroidSchedulers.mainThread());
                    }
                }).subscribe(new Observer<String>() {
                    @Override
                    public void onCompleted(a) {}@Override
                    public void onError(Throwable e) {}@Override
                    public void onNext(String s) {
                        Log.d(TAG, "onNext:" + s);
                        Log.d(TAG, "currentThread:"+ Thread.currentThread()); }});Copy the code

06-19 14:26:18.275 12255-12350/com.haocai.rxjavademo D/kpioneer: currentThread:Thread[RxNewThreadScheduler-1.5,main]
06-19 14:26:18.275 12255-12255/com.haocai.rxjavademo D/kpioneer: onNext:test
06-19 14:26:18.275 12255-12255/com.haocai.rxjavademo D/kpioneer: currentThread:Thread[main,5,main]
Copy the code
2.RxJava2 overall transformation instance

                   /*--------- no back pressure ---------*/
                Observable.
                        create(new ObservableOnSubscribe<String>() {
                            @Override
                            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                                if(! emitter.isDisposed()) { Log.d(TAG,CurrentThread: No back pressure + Thread.currentThread());
                                    emitter.onNext("test");
                                    emitter.onComplete();
                                }
                            }
                        }).
                        compose(new ObservableTransformer<String, String>() {
                            @Override
                            public ObservableSource<String> apply(Observable<String> upstream) {
                                return upstream.
                                        subscribeOn(Schedulers.newThread()).
                                        observeOn(AndroidSchedulers.mainThread());
                            }
                        }).
                        subscribe(new Observer<String>() {
                            @Override
                            public void onSubscribe(Disposable d) {}@Override
                            public void onNext(String o) {
                                Log.d(TAG, "No back pressure onNext:" + o);
                                Log.d(TAG, CurrentThread: No back pressure + Thread.currentThread());

                            }

                            @Override
                            public void onError(Throwable e) {}@Override
                            public void onComplete(a) {}});/*--------- has back pressure ---------*/
                Flowable.
                        create(new FlowableOnSubscribe<String>() {
                            @Override
                            public void subscribe(FlowableEmitter<String> emitter) throws Exception {
                                if(! emitter.isCancelled()) { Log.d(TAG,"Back pressure currentThread:" + Thread.currentThread());
                                    emitter.onNext("test");
                                    emitter.onComplete();
                                }
                            }
                        }, BackpressureStrategy.DROP).
                        compose(new FlowableTransformer<String, String>() {
                            @Override
                            public Publisher<String> apply(Flowable<String> upstream) {
                                return upstream.
                                        subscribeOn(Schedulers.newThread()).
                                        observeOn(AndroidSchedulers.mainThread());
                            }
                        }).
                        subscribe(new Subscriber<String>() {
                            @Override
                            public void onSubscribe(Subscription s) {
                                s.request(Long.MAX_VALUE);
                            }

                            @Override
                            public void onNext(String s) {
                                Log.d(TAG, "With back pressure onNext:" + s);
                                Log.d(TAG, "Back pressure currentThread:" + Thread.currentThread());
                            }

                            @Override
                            public void onError(Throwable t) {}@Override
                            public void onComplete(a) {}});Copy the code

06-19 14:45:08.515 17301-17407/com.haocai. rxJavaDemo D/kpioneer: No back pressure currentThread:Thread[RxNewThreadScheduler-1.5,main]
06-19 14:45:08.515 17301-17301/com.haocai. rxJavademo D/kpioneer: No back pressure onNext:test06-19 14:45:08.515 17301-17301/com.haocai.rxjavademo D/kpioneer: Backpressure currentThread:Thread[main,5,main]
06-19 14:45:08.525 17301-17408/com.haocai. rxJavaDemo D/kpioneer: Backpressure currentThread:Thread[RxNewThreadScheduler-2.5,main]
06-19 14:45:08.535 17301-17301/com.haocai. rxJavademo D/kpioneer: back pressure onNext:test06-19 14:45:08.535 17301-17301/com.haocai.rxjavademo D/kpioneer: currentThread:Thread[main,5,main]
Copy the code

3. Principle analysis of RxJava1 Transformer

  1. Inheriting from the Func1 interface, the generic parameters are two Observables

    public interface Transformer<T.R> extends Func1<Observable<T>, Observable<R>> {
        // cover for generics insanity
    }
Copy the code
  1. Is the input parameter to the compose method

    public <R> Observable<R> compose(Transformer<? super T, ? extends R> transformer) {
        return ((Transformer<T, R>) transformer).call(this);
    }
Copy the code
  1. Passing in an Observable returns an Observable

4. Principle analysis of RxJava2 Transformer

4.1.RxJava2(No back pressure) ObservableTransformer
  1. There’s an apply method
  2. Passing in an Observable returns a new Observable
  3. Is the input parameter to the compose method


/**
 * Interface to compose Observables.
 *
 * @param <Upstream> the upstream value type
 * @param <Downstream> the downstream value type
 */
public interface ObservableTransformer<Upstream.Downstream> {
    /**
     * Applies a function to the upstream Observable and returns an ObservableSource with
     * optionally different element type.
     * @param upstream the upstream Observable instance
     * @return the transformed ObservableSource instance
     */
    @NonNull
    ObservableSource<Downstream> apply(@NonNull Observable<Upstream> upstream);
}
Copy the code

    /** * Wraps an ObservableSource into an Observable if not already an Observable. * * <dl> * <dt><b>Scheduler:</b></dt> *  <dd>{@code wrap} does not operate by default on a particular {@link Scheduler}.</dd>
     * </dl>
     *
     * @param <T> the value type
     * @param source the source ObservableSource instance
     * @return the new Observable instance or the same as the source
     * @throws NullPointerException if source is null
     */
    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.NONE)
    public static <T> Observable<T> wrap(ObservableSource<T> source) {
        ObjectHelper.requireNonNull(source, "source is null");
        if (source instanceof Observable) {
            return RxJavaPlugins.onAssembly((Observable<T>)source);
        }
        return RxJavaPlugins.onAssembly(new ObservableFromUnsafeSource<T>(source));
    }
Copy the code

    public final <R> Observable<R> compose(ObservableTransformer<? super T, ? extends R> composer) {
        return wrap(((ObservableTransformer<T, R>) ObjectHelper.requireNonNull(composer, "composer is null")).apply(this));
    }
Copy the code
4.2.RxJava2(with back pressure) FlowableTransformer
  1. There’s an apply method
  2. Passing in a Flowable returns a new Flowable
  3. Is the input parameter to the compose method

public interface FlowableTransformer<Upstream.Downstream> {
    /**
     * Applies a function to the upstream Flowable and returns a Publisher with
     * optionally different element type.
     * @param upstream the upstream Flowable instance
     * @return the transformed Publisher instance
     */
    @NonNull
    Publisher<Downstream> apply(@NonNull Flowable<Upstream> upstream);
}
Copy the code

    @CheckReturnValue
    @BackpressureSupport(BackpressureKind.PASS_THROUGH)
    @SchedulerSupport(SchedulerSupport.NONE)
    @SuppressWarnings("unchecked")
    public static <T> Flowable<T> fromPublisher(final Publisher<? extends T> source) {
        if (source instanceof Flowable) {
            return RxJavaPlugins.onAssembly((Flowable<T>)source);
        }
        ObjectHelper.requireNonNull(source, "publisher is null");

        return RxJavaPlugins.onAssembly(new FlowableFromPublisher<T>(source));
    }
Copy the code

    @SuppressWarnings("unchecked")
    @CheckReturnValue
    @BackpressureSupport(BackpressureKind.PASS_THROUGH)
    @SchedulerSupport(SchedulerSupport.NONE)
    public final <R> Flowable<R> compose(FlowableTransformer<? super T, ? extends R> composer) {
        return fromPublisher(((FlowableTransformer<T, R>) ObjectHelper.requireNonNull(composer, "composer is null")).apply(this));
    }
Copy the code

5. RxJava1 Transformer copy writing


    public interface Converter<T.R> extends Func1<Caller<T>, Caller<R>> {}public final <R> Caller<R> unify(Converter<T, R> converter) {
        return converter.call(this);
    }
Copy the code
The instance

public class Lesson4_2Activity extends AppCompatActivity {
    public static final String TAG = "kpioneer";

    @Override
    protected void onCreate(final Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_custom_test);
        ButterKnife.bind(this);

    }

    @OnClick(R.id.testDo)
    public void onViewClicked(a) {
        Caller.
                create(new Caller.OnCall<String>() {
                    @Override
                    public void call(Receiver<String> stringReceiver) {
                        stringReceiver.onReceive("test");
                        Log.d(TAG, "currentThread:" + Thread.currentThread());
                    }
                }).
                unify(new Caller.Converter<String, String>() {
                    @Override
                    public Caller<String> call(Caller<String> stringCaller) {
                        return stringCaller.
                                callOn(new NewThreadSwitcher()).
                                callbackOn(new LooperSwitcher(getMainLooper()));
                    }
                }).
                call(new Receiver<String>() {
                    @Override
                    public void onCompleted(a) {}@Override
                    public void onError(Throwable t) {}@Override
                    public void onReceive(String s) {
                        Log.d(TAG, "onNext:" + s);
                        Log.d(TAG, "currentThread:"+ Thread.currentThread()); }}); }}Copy the code

06-19 15:55:38.735 477-477/com.haocai.rxjavademo D/kpioneer: onNext:test
06-19 15:55:38.735 477-477/com.haocai.rxjavademo D/kpioneer: currentThread:Thread[main,5,main]
06-19 15:55:38.735 477-1180/com.haocai.rxjavademo D/kpioneer: currentThread:Thread[ NewThreadWorker,5,main]
Copy the code

6. RxJava2 Transformer copy writing

6.1 RxJava2 no back pressure

    public <R> Caller<R> unify(CallerConverter<T,R> callerConverter){
        return callerConverter.convert(this);
    }
Copy the code

** * Created by Xionghu on 2018/6/19. * Desc: Created by Xionghu on 2018/6/19

public interface TelephonerConverter<T.R> {
    Telephoner<R> convert(Telephoner<T> telephoner);
}
Copy the code
6.2 RxJava2 with back pressure

    public <R> Telephoner<R> unify(TelephonerConverter<T, R> tTelephonerConverter) {
        return tTelephonerConverter.convert(this);
    }
Copy the code

** * Created by Xionghu on 2018/6/19. * Desc: Created by Xionghu on 2018/6/19

public interface TelephonerConverter<T.R> {
    Telephoner<R> convert(Telephoner<T> telephoner);
}
Copy the code
6.3 instance

/** * Created by Xionghu on 2018/6/11. * Desc: RxJava2 */

public class Lesson4_3Activity extends AppCompatActivity {
    public static final String TAG = "kpioneer";

    @Override
    protected void onCreate(final Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_custom_test);
        ButterKnife.bind(this);

    }

    @OnClick(R.id.testDo)
    public void onViewClicked(a) {
        Caller.
                create(new CallerOnCall<String>() {
                    @Override
                    public void call(CallerEmitter<String> callerEmitter) {
                        callerEmitter.onReceive("test");
                        Log.d(TAG, CurrentThread: No back pressure + Thread.currentThread());
                    }
                }).
                unify(new CallerConverter<String, String>() {
                    @Override
                    public Caller<String> convert(Caller<String> caller) {
                        return caller.
                                callOn(new NewThreadSwitcher()).
                                callbackOn(new LooperSwitcher(getMainLooper()));
                    }
                }).
                call(new Callee<String>() {
                    @Override
                    public void onCall(Release release) {}@Override
                    public void onReceive(String s) {
                        Log.d(TAG, "No back pressure onReceive:" + s);
                        Log.d(TAG, CurrentThread: No back pressure + Thread.currentThread());
                    }

                    @Override
                    public void onCompleted(a) {}@Override
                    public void onError(Throwable t) {}}); Telephoner. create(new TelephonerOnCall<String>() {
                    @Override
                    public void call(TelephonerEmitter<String> telephonerEmitter) {
                        telephonerEmitter.onReceive("test");
                        Log.d(TAG, "Back pressure currentThread:" + Thread.currentThread());
                    }
                }).
                unify(new TelephonerConverter<String, String>() {
                    @Override
                    public Telephoner<String> convert(Telephoner<String> telephoner) {
                        return telephoner.
                                callOn(new NewThreadSwitcher()).
                                callbackOn(new LooperSwitcher(getMainLooper()));
                    }
                }).
                call(new Receiver<String>() {
                    @Override
                    public void onCall(Drop d) {
                        d.request(Long.MAX_VALUE);
                    }

                    @Override
                    public void onReceive(String s) {
                        Log.d(TAG, "There is back pressure onReceive:" + s);
                        Log.d(TAG, "Back pressure currentThread:" + Thread.currentThread());
                    }

                    @Override
                    public void onError(Throwable t) {}@Override
                    public void onCompleted(a) {}}); }}Copy the code

06-19 16:28:23.215 11452-11698/com.haocai.rxjavademo D/kpioneer: No back pressure currentThread:Thread[NewThreadWorker,5,main]
06-19 16:28:23.235 11452-11703/com.haocai.rxjavademo /kpioneer: backpressure currentThread:Thread[NewThreadWorker,5,main]
06-19 16:28:23.235 11452-11452/com.haocai. rxJavademo D/kpioneer: No back pressure onReceive:test06-19 16:28:23.235 11452-11452/com.haocai.rxjavademo D/kpioneer: Backpressure currentThread:Thread[main,5,main]
06-19 16:28:23.235 11452-11452/com.haocai. rxJavademo D/kpioneer: back pressure onReceive:test06-19 16:28:23.235 11452-11452/com.haocai.rxjavademo D/kpioneer: currentThread:Thread[main,5,main]
Copy the code