1. Create a Retrofit object

OkHttpClient.Builder okHttpClient = new OkHttpClient.Builder();
retrofit = new Retrofit.Builder()
                .client(okHttpClient.build())
                .addConverterFactory(GsonConverterFactory.create())
                .addCallAdapterFactory(RxJavaCallAdapterFactory.create())
                .baseUrl(base_url)
                .build();Copy the code

Here is the normal Retrofit object creation process, passing in the necessary parameters: OkHttpClient, converterFactory, callAdapterFactory (use Retrofit’s default callAdapterFactory when not working with Rxjava, do nothing), baseUrl.

Paying special attention to is introduced into RxJavaCallAdapterFactory. Here the create () this RxjavaCallAdapter object, that object will completely change the use of the Retrofit. Retrofit with Rxjava is possible thanks to the programming skills of Retrofit’s authors, who opened the CallAdapterFactory interface to make Retrofit more flexible.

2. Create a dynamic proxy object for the interface

Give the interface of experiment

public interface NetApiService {

    / / post request
    @FormUrlEncoded
    @POST("{url}")
    Observable<ResponseBody> executePost(
            @Path("url") String url,
            @Field("params") String params,
            @Field("signature") String signature
    );

}Copy the code
netApiService = retrofit.create(NetApiService .class);  // Return a dynamic proxy objectCopy the code

This is also the magic of Retrofit, passing in an interface, you can generate objects that implement that interface, which of course are just dynamic proxy objects generated by Java code. Now let’s go into the create() method.

public <T> T create(final Class<T> service) {
    Utils.validateServiceInterface(service);  // Verify that the service interface passed in externally is valid
    if (validateEagerly) {
      eagerlyValidateMethods(service);  // Check whether all methods in the interface are cached according to validateMBIT/s
    }
    Use the Proxy factory class to return a generic dynamic Proxy instance.
    return (T) Proxy.newProxyInstance(service.getClassLoader(), newClass<? >[] { service },new InvocationHandler() {
          private final Platform platform = Platform.get();

          @Override public Object invoke(Object proxy, Method method, Object... args)
              throws Throwable {
            // If the method is a method from Object then defer to normal invocation.
            if (method.getDeclaringClass() == Object.class) {
              return method.invoke(this, args);
            }
            if (platform.isDefaultMethod(method)) {
              return platform.invokeDefaultMethod(method, service, proxy, args);
            }
            ServiceMethod serviceMethod = loadServiceMethod(method);
            OkHttpCall okHttpCall = new OkHttpCall<>(serviceMethod, args);
            returnserviceMethod.callAdapter.adapt(okHttpCall); }}); }Copy the code

In this case, we check whether the incoming interface is an interface first, then check whether all methods in the interface are cached according to validatemx. Finally, we use java.lang.reflect.proxy; Create a generic dynamic proxy object and return this object. (Don’t worry if you don’t understand Java dynamic proxy technology, I will provide references at the end of this article.)

3. Create observables

Observable<ResponseBody> observable = netApiService.executePost(url, params, signature);

Call the interface method of the dynamic proxy object

new InvocationHandler() {
          private final Platform platform = Platform.get();

          @Override public Object invoke(Object proxy, Method method, Object... args)
              throws Throwable {
            // If the method is a method from Object then defer to normal invocation.
            if (method.getDeclaringClass() == Object.class) {
              return method.invoke(this, args);
            }
            if (platform.isDefaultMethod(method)) {
              return platform.invokeDefaultMethod(method, service, proxy, args);
            }
            ServiceMethod serviceMethod = loadServiceMethod(method);
            OkHttpCall okHttpCall = new OkHttpCall<>(serviceMethod, args);
            returnserviceMethod.callAdapter.adapt(okHttpCall); }}Copy the code

The invoke() method of InvocationHandler, where we see three arguments: proxy represents the proxy class object generated by proxy.newProxyInstance (). Method represents the function to which the proxy object is called. Args represents the argument of the function to which the proxy object is called. Each function that calls a proxy object actually ends up calling the Invoke function of InvocationHandler.

Since this is the method of the interface, it will not enter the first if, and since it is not the default method, it will not enter the second if either. You can see that our proxy object actually new an okHttpCall<> object after calling the interface’s methods, and then passes this object as a parameter to callAdapter.adapt(); Methods.

Because we are before the incoming RxJavaCallAdapterFactory. The create (), so we further RxJavaCallAdapterFactory. Java see structural approach of the observables, the adapt () can be seen:

static final class ResponseCallAdapter implements CallAdapter<Observable<? >>{
    private final Type responseType;
    private final Scheduler scheduler;

    ResponseCallAdapter(Type responseType, Scheduler scheduler) {
      this.responseType = responseType;
      this.scheduler = scheduler;
    }

    @Override public Type responseType(a) {
      return responseType;
    }

    @Override public <R> Observable<Response<R>> adapt(Call<R> call) {
      Observable<Response<R>> observable = Observable.create(new CallOnSubscribe<>(call));
      if(scheduler ! =null) {
        return observable.subscribeOn(scheduler);
      }
      returnobservable; }}Copy the code

Here we see that the CallOnSubscribe object is constructed by taking the okHttp object passed in as an argument. What is CallOnSubscribe?? In terms of the Rxjava construct Observable, the CallOnSubscribe should be an object that implements the Observable.OnSubscribe

interface.

We look at the source code, sure enough.

static final class CallOnSubscribe<T> implements Observable.OnSubscribe<Response<T>> {
    private final Call<T> originalCall;

    CallOnSubscribe(Call<T> originalCall) {
      this.originalCall = originalCall;
    }

    @Override public void call(final Subscriber<? super Response<T>> subscriber) {
      // Since Call is a one-shot type, clone it for each new subscriber.
      Call<T> call = originalCall.clone();

      // Wrap the call in a helper which handles both unsubscription and backpressure.
      RequestArbiter<T> requestArbiter = newRequestArbiter<>(call, subscriber); subscriber.add(requestArbiter); subscriber.setProducer(requestArbiter); }}Copy the code

You can probably see why calling the interface methods of the generated dynamic proxy object does not return an okHttpCall<> object as with Retrofit alone, but rather an Observable

object. That’s what RxJavaCallAdapterFactory does.

If we look closely at the call() method of CallOnSubscribe

, we see that the subscriber (which is the external observer when the subscribe() method is called) has added a requestArbiter object. This object is important in subscriber.setProducer(requestArbiter); When, it will control the okHttpCall object to get the data directly online and then call back to the observer subscriber.

4.observable.subscribe(subscriber); To subscribe to

There’s not much code here, just one line observable.subscribe(subscriber); . So let’s take a look at what’s happening in the subscribe method, okay? (To give you a preview of the story, the Observable.OnSubscribe

object is great, it acts as a bridge between an Observable and an Observer.)

 public final Subscription subscribe(Subscriber<? super T> subscriber) {
        return Observable.subscribe(subscriber, this);
    }

    static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
     // validate and proceed
    // Check whether the passed argument, the observed object, is empty.
        if (subscriber == null) {
            throw new IllegalArgumentException("observer can not be null");
        }
        if (observable.onSubscribe == null) {
            throw new IllegalStateException("onSubscribe function can not be null.");
            /* * the subscribe function can also be overridden but generally that's not the appropriate approach * so I won't mention that in the exception */
        }

        // new Subscriber so onStart it
       // Important operations, you can do some preparatory work before subscribing
        subscriber.onStart();

        / * * See https://github.com/ReactiveX/RxJava/issues/216 for the discussion on "Guideline 6.4: Protect calls * to user code from within an Observer" */
        // if not already wrapped
        if(! (subscriberinstanceof SafeSubscriber)) {
            // assign to `observer` so we return the protected version
            subscriber = new SafeSubscriber<T>(subscriber);
        }

        // The code below is exactly the same an unsafeSubscribe but not used because it would 
        // add a significant depth to already huge call stacks.
        try {
            // allow the hook to intercept and/or decorate
            hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
            return hook.onSubscribeReturn(subscriber);
        } catch (Throwable e) {
            // special handling for certain Throwable/Error/Exception types
            Exceptions.throwIfFatal(e);
            // in case the subscriber can't listen to exceptions anymore
            if (subscriber.isUnsubscribed()) {
                RxJavaPluginUtils.handleException(hook.onSubscribeError(e));
            } else {
                // if an unhandled error occurs executing the onSubscribe we will propagate it
                try {
                    subscriber.onError(hook.onSubscribeError(e));
                } catch (Throwable e2) {
                    Exceptions.throwIfFatal(e2);
                    // if this happens it means the onError itself failed (perhaps an invalid function implementation)
                    // so we are unable to propagate the error correctly and will just throw
                    RuntimeException r = new OnErrorFailedException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
                    // TODO could the hook be the cause of the error in the on error handling.
                    hook.onSubscribeError(r);
                    // TODO why aren't we throwing the hook's return value.
                    throwr; }}returnSubscriptions.unsubscribed(); }}Copy the code

In this code, we see that onStart() is called first during the subscribe() process. This method is usually called at the beginning of the subscribe but before the event is sent. It can be used to do some preparatory work, such as clearing or resetting data. This is an optional method whose implementation is null by default. Note that onStart() does not apply if there is a requirement for the preparing thread (such as a pop-up dialog showing progress, which must be executed on the main thread) (since it is always called on the thread that subscribe occurs, and cannot specify the thread). To do the preparatory work on the specified thread, use the doOnSubscribe() method.

High energy coming!! Let’s focus on this code:

hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
            return hook.onSubscribeReturn(subscriber);Copy the code
 public <T> OnSubscribe<T> onSubscribeStart(Observable<? extends T> observableInstance, final OnSubscribe<T> onSubscribe) {
        // pass through by default
        return onSubscribe;
    }Copy the code

The onSubscribe start () method returns an onSubscribe object and calls the Call (subscriber) method of onSubscribe directly. Remember we just analyzed that we’re actually calling the call() method of the CallOnSubscribe

object. That is, the method of networking to get the data and then calling back the Subscriber observer. (The code is the subscriber. SetProducer (requestArbiter) of the call() method.) .

public void setProducer(Producer p) {
        long toRequest;
        boolean passToSubscriber = false;
        synchronized (this) {
            toRequest = requested;
            producer = p;
            if(subscriber ! =null) {
                // middle operator ... we pass through unless a request has been made
                if (toRequest == NOT_SET) {
                    // we pass through to the next producer as nothing has been requested
                    passToSubscriber = true; }}}// do after releasing lock
        if (passToSubscriber) {
            subscriber.setProducer(producer);
        } else {
            // we execute the request with whatever has been requested (or Long.MAX_VALUE)
            if (toRequest == NOT_SET) {
                producer.request(Long.MAX_VALUE);
            } else{ producer.request(toRequest); }}}Copy the code

Finally, producer.request(toRequest) is called; Methods. RequestArbiter

RequestArbiter = new RequestArbiter<>(call, subscriber); The request of ().

static final class RequestArbiter<T> extends AtomicBoolean implements Subscription.Producer {
    private final Call<T> call;
    private final Subscriber<? super Response<T>> subscriber;

    RequestArbiter(Call<T> call, Subscriber<? super Response<T>> subscriber) {
      this.call = call;
      this.subscriber = subscriber;
    }

    @Override public void request(long n) {
      if (n < 0) throw new IllegalArgumentException("n < 0: " + n);
      if (n == 0) return; // Nothing to do when requesting 0.
      if(! compareAndSet(false.true)) return; // Request was already triggered.

      try {
        Response<T> response = call.execute();
        if (!subscriber.isUnsubscribed()) {
          subscriber.onNext(response);
        }
      } catch (Throwable t) {
        Exceptions.throwIfFatal(t);
        if(! subscriber.isUnsubscribed()) { subscriber.onError(t); }return;
      }

      if (!subscriber.isUnsubscribed()) {
        subscriber.onCompleted();
      }
    }

    @Override public void unsubscribe(a) {
      call.cancel();
    }

    @Override public boolean isUnsubscribed(a) {
      returncall.isCanceled(); }}Copy the code

Response

response = call.execute(); There’s a network request; if (! subscriber.isUnsubscribed()) { subscriber.onNext(response); } here is the callback.

The subsequent onError() callback is the same as the onCompleted() callback and will not be analyzed.

At this point, we have a complete understanding of Retrofit + Rxjava’s process from creating an Observable and Observer to an Observable subscribing to an Observer, as well as the hidden networking and callbacks.

The resources

Retrofit2.1 Source code analysis Java dynamic agent technology Rxjava source code analysis