preface

  • RxjavaDue to itsChain call based on event flow, simple logic & easy to useThe characteristics of the deepAndroidDeveloper welcome.


Making screenshots

If you are not familiar with RxJava, check out Android: a clear and easy-to-understand introduction to RxJava

  • RxJava 2.0It was officially released on October 29, 2016. YesRxJava 1.0Made a major upgrade: actual useAPIAnd methods are very different

But RxJava 2.0 is very similar to RxJava 1.0

  • At the same time, due toRxJava 2.0RxJava 1.0Can’t be in one project, so if you’re usingRxJava 1.0Need to upgrade toRxJava 2.0Some changes need to be made
  • Today, I will bring you a summary of RxJava 2.0 relative to RxJava 1.0 and upgrade from RxJava 1.0 to RxJava 2.0 need to pay attention to the pit, I hope you will enjoy
  1. This series of articles is based onRxjava 2.0
  2. In the coming days, I will continue to publish a series of articles on Rxjava 2.0 in Android, including principles, operators, application scenarios, back pressure, etc. If you are interested, please continue to follow Carson_Ho’s Android development notes!!


Schematic diagram


directory


Schematic diagram


1. Dependency packages are changed

  • Due to theRxJava 2.0RxJava 1.0The dependencies cannot coexist in one project and need to be replaced
  • Change the following
/ / : originally ` RxJava 1.0 ` depend on the compile 'IO. Reactivex: rxandroid: 1.2.0' compile 'IO. Reactivex: RxJava: 1.1.5' / / changes: 2.0 ` ` RxJava depend on the compile 'IO. Reactivex. Rxjava2: rxandroid: 2.0.1' compile 'IO. Reactivex. Rxjava2: RxJava: 2.0.7' / / note: RxJava2 and RxJava1 cannot coexist, that is, dependencies cannot exist at the same timeCopy the code

2. Added a new implementation of the observed: Flowable

  • Due to theRxJava 1.0The observed inObservableDoes not support back pressure well (Backpressure)
  • So, inRxJava 2.0Added a new implementation of the observedFlowableTo support back pressureBackpressure
  1. And the old implementation of the observedObservableBack pressure is no longer supportedBackpressure
  2. FlowableThe use of andObservableVery similar, see article for details on how to use:Android RxJava back pressure strategy: graphics + examples full parsing

3. Difference between creating an Observable and an Observer

Observables and Observers are created differently in RxJava 2.0:

  • For creating observed (Observable)
<-- > Observable Observable = observable. create(new observable. OnSubscribe<String>() {@override public void call(Subscriber<? super String> subscriber) { subscriber.onNext("Hello"); subscriber.onNext("Hi"); subscriber.onNext("Aloha"); subscriber.onCompleted(); }}); <-- create observed in RxJava 2.0 --> ObservableOnSubscribe Observable<Integer> Observable =Observable.create(new) ObservableOnSubscribe<Integer>() {ObservableOnSubscribe<Integer>() {ObservableOnSubscribe (ObservableEmitter) The parameters have also changed, Subscriber -> ObservableEmitter = ObservableEmitter @override public void subscribe(ObservableEmitter<Integer> e) throws Exception { OnNext (T value), onComplete(), and onError(Throwable e) e.onnext (1); e.onNext(2); E.onror (new Exception(" error ")); e.onComplete(); }});Copy the code
  • For creating an observer (Observer)
< p style = "box-sizing: border-box! Important; word-wrap: break-word! Important; Observer<String> Observer = new Observer<String>() {@override public void onNext(String s) {log.d (tag, "Item: " + s); } @Override public void onCompleted() { Log.d(tag, "Completed!" ); } @Override public void onError(Throwable e) { Log.d(tag, "Error!" ); }}; // Subscriber interface (an abstract class that implements the Observer interface) // Differences between Subscriber interface and Observer interface: OnStart (), unsubscribe (), Subscriber<String> Subscriber = new Subscriber<String>() {@override public void onNext(String s) {log.d (tag,  "Item: " + s); } @Override public void onCompleted() { Log.d(tag, "Completed!" ); } @Override public void onError(Throwable e) { Log.d(tag, "Error!" ); }}; <-- Observer<Integer> Observer = new Observer<Integer>() {// change 1: Add callback method onSubscribe() Override public void onSubscribe(Disposable d) {// The Disposable argument that is passed in is used as a switch of the subscription relationship. // Note: Call Dispose () = The observer cannot receive the event, } @override public void onNext(Integer value) {} @override public void onError(Throwable e) {} Public void onComplete() {}}Copy the code

4. Simplify subscription methods

  • In terms of simplifying subscriptions,RxJava 1Mainly adoptsActionXInterface &FuncXinterface
  • inRxJava 2Is the name of the series of interfacesIn accordance with theJava8Naming rules ofHas been modified, but the use method remains the same

4.1 ActionX and FuncX were renamed

  • forActionXThe interface name changed
RxJava 1 RxJava 2
Action0 Action
Action1 Consumer (accepts 1 parameter)
Action2 BiConsumer (accepts 2 parameters)
ActionN Consumer<Object[]> (accepts multiple parameters)
Action3 – Action9 No longer use
  • forFuncXThe interface name changed
RxJava 1 RxJava 2
Func Function (for transforming objects)
Func2 BiFunction
Func3 – Func9 Function3 – Function9
FuncN Function<Object[], R>
  • Specific as follows


Schematic diagram

  • The sample
<-- example 1 --> Disposable < observable. Subscribe (new Consumer<Integer>() {@override public void accept(Integer) Integer) throws Exception {// Here to receive data items}}, New Consumer<Throwable>() {@override public void accept(Throwable) throws Exception {// New Action() {@override public void run() throws Exception {// Accept onComplete. }}); <-- example 2 --> Flowable. Subscribe (new Consumer<String>() {// equivalent to onNext @override public void accept(String s) throws Exception { } }, New Consumer<Throwable>() {onerror@override public void accept(Throwable) throws Exception {}}, New Action() {// equivalent to onComplete, Override public void run() throws Exception {}} New Consumer<Subscription>() {// equivalent to onSubscribe @override public void accept(Subscription Subscription) throws Exception {}});Copy the code

4.2 RxJava2 interface methods all allow exceptions to be thrown

Throws Exception is added to the interface method

// Action interface public interface Action {void run() throws Exception; } // Consumer interface public interface Consumer<T> {void accept(T T) throws Exception; RxJava 2.0 no longer supports null values. A NullPointerException will be thrown if a null is passedCopy the code

5. Operator changes

  • For operators,RxJava 1.0withRxJava 2.0Most agree on naming & behavior
  • It’s important to emphasize thatThe first (),subscribeWith() andcompose()The operator

5.1 The first () operator

  • Change the following
RxJava 1.0 RxJava 2.0
The first () Rename to: firstElement ()
first(Func1) Deprecated, use: filter(predicate).first()
firstOrDefault(T) Renamed: First (T)
firstOrDefault(Func1, T) Renamed: First (T)
  • The sample
<-- RxJava 1.0 --> Observable. concat(observable. from(list)). First (new Func1<Data, Boolean>() { @Override public Boolean call(Data data) { return DataUtils.isAvailable(data); } }).publish(); <-- RxJava 2.0 --> Observable. Concat (Observable. FromIterable (list)). Filter (new Predicate<Data>() {@override public boolean test(@NonNull Data data) throws Exception { return DataUtils.isAvailable(data); } }).firstElement().toObservable().publish();Copy the code

5.2 subscribeWith () operator

Please see the following figure for details:




Schematic diagram

5.3 Compose () operator

The main changes are:

  1. RxJava 1.0What is achieved is:rx.Observable.Transformerinterface

Inherit from Func1

, Observable

Private static <T> Observable.Transformer<T, T> createIOSchedulers() { return new Observable.Transformer<T, T>() { @Override public Observable<T> call(Observable<T> tObservable) { return tObservable.subscribeOn(Schedulers.io()) .unsubscribeOn(AndroidSchedulers.mainThread()) .observeOn(AndroidSchedulers.mainThread()); }}; } public static <T> Observable.Transformer<JsonResult<T>,T> applySchedulers() {return createIOSchedulers (); } Action1<Integer> onNext = null; String[] items = { "item1", "item2", "item3" }; Subscription subscription = Observable.from(items) .compose(RxUtil.<String>applySchedulers()) .map(new Func1<String, Integer>() { @Override public Integer call(String s) { return Integer.valueOf(s); } }) .subscribe(onNext);Copy the code
  1. RxJava 2.0Implementation isio.reactivex.ObservableTansformer<Upstream, Downstream>

A separate interface

Public static <T> ObservableTransformer<T, T> io2MainObservable() { return new ObservableTransformer<T, T>() { @Override public ObservableSource<T> apply(Observable<T> upstream) { return upstream.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()); }}; } public static <T> ObservableTransformer<T, T> applySchedulers() {return io2MainObservable (); } Consumer<Integer> onNext = null; String[] items = { "item1", "item2", "item3" }; Disposable disposable = Observable.fromArray(items) .compose(RxUtil.<String>applySchedulers()) .map(new Function<String,  Integer>() { @Override public Integer apply(String s) throws Exception { return Integer.valueOf(s); } }) .subscribe(onNext);Copy the code

6. Additional

6.1 the new Processor

  • It works like thisSubject& inherited fromFlowable= Support back pressure control

Subject does not support back pressure control

  • Use the following
//Processor AsyncProcessor<String> processor = AsyncProcessor.create(); processor.subscribe(o -> Log.d("JG",o)); //three processor.onNext("one"); processor.onNext("two"); processor.onNext("three"); processor.onComplete(); //Subject AsyncSubject<String> subject = AsyncSubject.create(); subject.subscribe(o -> Log.d("JG",o)); //three subject.onNext("one"); subject.onNext("two"); subject.onNext("three"); subject.onComplete();Copy the code

6.2 change Single

  • SingleThe action is similar toObservable= Send data, but the difference is that the subscription can only be received once
  • Change the following
<-- source code analysis --> // Change 1: Single has been redesigned to reaction-Streams architecture, with SingleSubscriber changed to: SingleObserver interface SingleObserver<T> {// Change 2: onSubscribe() void onSubscribe(Disposable d); void onSuccess(T value); void onError(Throwable error); } <-- > Single<Long> Single = single.just (1l); single.subscribe(new SingleObserver<Long>() { @Override public void onSubscribe(Disposable d) { } @Override public void } @override public void onError(Throwable e) {}}); // common Observable can be converted toSingle by toSingle(), that is, observable.just (1).tosingle ()Copy the code

6.3 change Completable

  • CompletableThe action is similar toObservable= Send data, but the difference is that the subscription can only be acceptedCompleteonErrorThe event
  • Change the following
// Change 1: Completable has been redesigned to reaction-Streams, i.e. CompletableSubscriber: CompletableObserver interface CompletableObserver<T> { void onSubscribe(Disposable d); void onComplete(); void onError(Throwable error); Completable<Long> Completable = completable.just (1l); Completable.subscribe(new CompletableObserver<Long>() { @Override public void onSubscribe(Disposable d) { } @Override public void onComplete(Long value) { } @Override public void onError(Throwable e) { } }); // A normal Observable can be converted to a Completable object by toCompletable().Copy the code

7. Usage suggestions

My recommendations for learning & using RxJava in your project are as follows:


Schematic diagram


8. To summarize

  • This article mainly explainsRxJava 2.0Relative to theRxJava 1.0The change of
  • As you can see from above,RxJava 2.0Relative to theRxJava 1.0The biggest change,This is mainly a new implementation of being observed:Flowable& for back pressure (BackpressureThe processing of)

Here, I recommend you to learn more about back pressure, see the article: Android RxJava back pressure strategy: graphic + examples of comprehensive analysis

  • Now, I’m going to go furtherRxjavaRelevant knowledge, interested can continue to pay attention toCarson_Ho android Development Notes

Thumb up, please! Because your encouragement is the biggest power that I write!

Related articles reading

  • Android Rxjava: create operator Android Rxjava: create operator Android Rxjava: create operator Android Rxjava: create operator Android Rxjava: create operator Android Rxjava: create operator Android Rxjava: create operator Android Rxjava: create operator Android Rxjava: Android RxJava: A comprehensive tutorial on the functional operators
  • Android RxJava application description: (Unconditional) Network request polling Android RxJava application description: (conditional) Network request polling Android RxJava application description: Network request nested callback Android RxJava: merge data source Android RxJava: merge data from disk/memory cache: merge data from disk/memory cache Android RxJava: Network request error reconnection (Retrofit)

Welcome to attentionCarson_HoJane books!

Share the dry things about Android development from time to time, the pursuit of short, flat, fast, but there is no lack of depth.