As we all know, when an exception is thrown in a chain call in RxJava2, if there is no corresponding Consumer to handle the exception, the exception will be thrown into the virtual machine.

Subscribe to the way

Observable () {subscribe();} subscribe();} subscribe();


/ / 1
subscribe()
/ / 2
Disposable subscribe(Consumer<? super T> onNext)
/ / 3
Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError)
/ / 4
Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,Action onComplete)
/ / 5
Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,Action onComplete, Consumer<? super Disposable> onSubscribe)
/ / 6
void subscribe(Observer<? super T> observer)

Copy the code

Both the no-argument and Consumer methods internally call the fifth method by completing the default arguments, while method 5 internally calls the sixth method by wrapping the arguments as an Observer through a LambdaObserver

    public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
            Action onComplete, Consumer<? super Disposable> onSubscribe) {
        ObjectHelper.requireNonNull(onNext, "onNext is null");
        ObjectHelper.requireNonNull(onError, "onError is null");
        ObjectHelper.requireNonNull(onComplete, "onComplete is null");
        ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");

        LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);

        subscribe(ls);

        return ls;
    }
Copy the code

So there is no difference between subscribing to the Consumer argument and subscribing to the Observer argument except to observe that the callback source is different. But just because of this difference, there will be differences in the processing results when abnormal conditions occur

Exception handling

We simulate anomalies in several ways:

  • 1, Observer onNext throws an exception (thread switch)
                apiService.newJsonKeyData()
                    .doOnSubscribe { t -> compositeDisposable.add(t) }
                    .compose(RxScheduler.sync()) // Encapsulate the thread switch
                    .subscribe(object : Observer<List<ZooData>> {
                        override fun onComplete(a){}override fun onSubscribe(d: Disposable){}override fun onNext(t: List<ZooData>) {
                            throw RuntimeException("runtime exception")}override fun onError(e: Throwable) {
                            Log.d("error", e.message)
                        }

                    })
Copy the code

Result: No onError is raised and App crashes

  • Observer onNext throws an exception (thread not switched)
               Observable.create<String> {
                        it.onNext("ssss")
                    }
                            .subscribe(object : Observer<String> {
                                override fun onComplete(a){}override fun onSubscribe(d: Disposable){}override fun onNext(t: String) {
                                    Log.d("result::", t)
                                    throw RuntimeException("run llllll")}override fun onError(e: Throwable) {
                                    Log.e("sss"."sss", e)
                                }

                            })
Copy the code

Result: onError is raised and App does not crash

  • The Observer map operator throws an exception
                apiService.newJsonKeyData()
                    .doOnSubscribe { t -> compositeDisposable.add(t) }
                    .map {
                        throw RuntimeException("runtime exception")
                    }
                    .compose(RxScheduler.sync())
                    .subscribe(object : Observer<List<ZooData>> {
                        override fun onComplete(a) {}override fun onSubscribe(d: Disposable) {}override fun onNext(t: List<ZooData>) {}override fun onError(e: Throwable) {
                            Log.d("error", e.message)
                        }

                    })
Copy the code

Result: An onError in the Observer is raised and the App does not crash

  • 4. Consumer onNext throws an exception
             apiService.newJsonKeyData()
                    .doOnSubscribe { t -> compositeDisposable.add(t) }
                    .compose(RxScheduler.sync())
                    .subscribe({
                        throw RuntimeException("messsasassssssssssssssssssssssssssssssssssssss")
                    }, {
                        Log.d("Error", it.message)
                    })
Copy the code

Result A: errorConsumer triggers errorConsumer, and the App does not crash

    apiService.newJsonKeyData()
                    .doOnSubscribe { t -> compositeDisposable.add(t) }
                    .compose(RxScheduler.sync())
                    .subscribe {
                        throw RuntimeException("messsasassssssssssssssssssssssssssssssssssssss")}Copy the code

Result B: There is no errorConsumer and the App crashes

So why these differences? Let’s take a look at the source code.

Consumer subscription crashes and doesn’t crash

Subscribe () is passed in as a consumer parameter, which eventually converts to a LambdaObserver in Observable and subscribe(LambdaObserver). Expand LambdaObserver :(see handling in onNext and onError methods)

.@Override
    public void onNext(T t) {
        if(! isDisposed()) {try {
                onNext.accept(t);
            } catch(Throwable e) { Exceptions.throwIfFatal(e); get().dispose(); onError(e); }}}@Override
    public void onError(Throwable t) {
        if(! isDisposed()) { lazySet(DisposableHelper.DISPOSED);try {
                onError.accept(t);
            } catch (Throwable e) {
                Exceptions.throwIfFatal(e);
                RxJavaPlugins.onError(newCompositeException(t, e)); }}else{ RxJavaPlugins.onError(t); }}..Copy the code

OnNext calls the apply() method for the corresponding consumer and makes a try catch. So our work in the Consumer throws an exception that is caught and raises onError in the LambdaObserver. In onError, if the subscription is not cancelled and the errorConsumer’s apply() is executed without exception, the event stream will complete normally, otherwise rxJavaplugins.onError (t) will be called. Observable specifies OnErrorMissingConsumer as the default errorConsumer when errorConsumer is not passed in when subscribing. Thrown when an exception occurs OnErrorNotImplementedException.

RxJavaPlugins.onError(t)

From the above analysis, we found that the exception eventually flows to rxJavaplugins.onerror (t). This method provides a global static method for RxJava2.

    public static void onError(@NonNull Throwable error) {
        Consumer<? super Throwable> f = errorHandler;

        if (error == null) {
            error = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
        } else {
            if(! isBug(error)) { error =newUndeliverableException(error); }}if(f ! =null) {
            try {
                f.accept(error);
                return;
            } catch (Throwable e) {
                // Exceptions.throwIfFatal(e); TODO decide
                e.printStackTrace(); // NOPMD
                uncaught(e);
            }
        }

        error.printStackTrace(); // NOPMD
        uncaught(error);
    }
Copy the code

If the errorHandler is not empty, it will consume the exception. If the errorHandler is empty or the consumption process generates a new exception, RxJava will throw the exception to the virtual machine (possibly causing the program to crash). ErrorHandler itself is a Consumer object and can be configured as follows:

    RxJavaPlugins.setErrorHandler(object : Consumer1<Throwable> {
        override fun accept(t: Throwable?) {
            TODO("not implemented") //To change body of created functions use File | Settings | File Templates.}})Copy the code

Data operator throws an exception

In the case of the Map operator, RxJava actually hooks the event stream to another new Observable ObservableMap

    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.NONE)
    public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
        ObjectHelper.requireNonNull(mapper, "mapper is null");
        return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
    }
Copy the code

The ObservableMap class is subscribed to an internal static class called MapObserver, focusing on the onNext method of MapObserver

        public void onNext(T t) {
            if (done) {
                return;
            }

            if(sourceMode ! = NONE) { downstream.onNext(null);
                return;
            }

            U v;

            try {
                v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
            } catch (Throwable ex) {
                fail(ex);
                return;
            }
            downstream.onNext(v);
        }
Copy the code

The onNext try catches mapper.apply(), which executes the function method we implemented in the operator. Thus an exception generated in a data transformation operator such as a Map can be caught by itself and sent to the final Observer. If the subscription object can consume exceptions at this time, the event stream ends with onError(). If the subscription mode is consumer as described in the previous section, the crash is the result of the analysis in the previous section.

An exception is thrown in the Observer onNext

Method 1 is a network request, which involves thread switching. Method 2 directly creates an Observable without thread switching. The result is that onError() cannot be triggered after an exception is thrown in the onNext() method of Observer after thread switching, and the program crashes.

Observable.create without switching threads

Looking at the source code for the Create () method, you can see that an ObservableCreate object is created internally that triggers the subscribeActual() method when the subscription is called. The stream of events is triggered by calling the subscribe() method of the ObservableOnSubscribe object passed in when we create in subscribeActual().

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
	
		// Wrap our observers with CreateEmitter, and the internal firing methods are corresponding
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);

        try {
			// The ObservableOnSubscribe anonymous internal interface implementation class created when source is create
            source.subscribe(parent);
        } catch(Throwable ex) { Exceptions.throwIfFatal(ex); parent.onError(ex); }}Copy the code

The subscription process in the above code is wrapped today using a try catch. The subscription and the event stream sent after the subscription is triggered are all in one thread, so it is possible to catch exceptions in the entire event stream. Try using observeOn() to switch the event sending thread. The exception can no longer be caught and the program crashes.

Exception handling involving thread transitions

The Observable returned by Retrofit for network request is essentially the BodyObservable generated in RxJava2CallAdapter, and the onNext inside does not carry out exception capture. Capturing or not is not the root cause of the crash, because making network requests necessarily involves thread switching. Even if a try catch is handled here, exceptions downstream of the event stream are not caught.

    @Override public void onNext(Response<R> response) {
      if (response.isSuccessful()) {
        observer.onNext(response.body());
      } else {
        terminated = true;
        Throwable t = new HttpException(response);
        try {
          observer.onError(t);
        } catch (Throwable inner) {
          Exceptions.throwIfFatal(inner);
          RxJavaPlugins.onError(newCompositeException(t, inner)); }}}Copy the code

For example, if we throw an exception on onNext in the final Observer, the exception must be caught in the final calling thread. Namely, observeOn (AndroidSchedulers mainThread ()) switched Android the main thread. As with the other operators, a new set of subscriptions is generated when the thread switches, and a new ObservableObserveOn is created internally in RxJava.

        @Override
        public void onNext(T t) {
            if (done) {
                return;
            }

            if(sourceMode ! = QueueDisposable.ASYNC) { queue.offer(t); } schedule(); }...void schedule(a) {
            if (getAndIncrement() == 0) {
                worker.schedule(this); // Execute the run method of ObservableObserveOn}}..@Override
        public void run(a) {
            if (outputFused) {
                drainFused();
            } else{ drainNormal(); }}Copy the code

Worker on a mission and corresponding implementation subclass is corresponding to the thread Scheduler worker, created by AndroidSchedulers. MainThread () as an example, the Scheduler implementation class for HandlerScheduler, The corresponding Worker is HandlerWorker, and ScheduledRunnable performs the final task.

    private static final class ScheduledRunnable implements Runnable.Disposable {
        private final Handler handler;
        private final Runnable delegate;

        private volatile boolean disposed; // Tracked solely for isDisposed().

        ScheduledRunnable(Handler handler, Runnable delegate) {
            this.handler = handler;
            this.delegate = delegate;
        }

        @Override
        public void run(a) {
            try {
                delegate.run();
            } catch(Throwable t) { RxJavaPlugins.onError(t); }}@Override
        public void dispose(a) {
            handler.removeCallbacks(this);
            disposed = true;
        }

        @Override
        public boolean isDisposed(a) {
            returndisposed; }}Copy the code

And you can see that there’s a try catch in the run. However, the global exception handling rxJavaplugins.onerror (t) is used for catch internals. Instead of onError for an observer. So onError cannot catch an exception thrown by the observer onNext after switching the thread operator.

Treatment scheme

Now that you know what the problem is, the solution to the problem is clear. 1. Register global exception handling

        RxJavaPlugins.setErrorHandler(object : Consumer<Throwable> {
            override fun accept(t: Throwable?) {
                // do something   }})Copy the code

2. When Consumer acts as an observer, it is necessary to add an exception to handle Consumer if you are not completely sure that there is no exception

 apiService.stringData()
                    .doOnSubscribe { t -> compositeDisposable.add(t) }
                    .compose(RxScheduler.sync())
                    .subscribe(Consumer<Boolean>{ }, Consumer<Throwable> { })
Copy the code

3. The Observer can create a BaseObaerver to manually stream a try catch inside onNext to onError. All observations in the project will use this BaseObaerver subclass.

    @Override
    public void onNext(T t) {
        try {
            onSuccess(t);
        } catch (Exception e) {
            onError(e);
        }
        data = t;
        success = true;
    }
Copy the code