preface

The book picks up where it left off. The previous article analyzed the design of the Retrofit network request flow. Understand Retrofit design at all stages of code execution. This article will focus on the CallAdapter and talk about the changes in Retrofit usage with RxJava/Java8. And adapter source code for RxJava/Java8.

Ps: this article is for RxJava2 only.

RxJava2

Gradle dependencies covered in this section:

implementation 'com. Squareup. Retrofit2: retrofit: 2.9.0' 
implementation 'com. Squareup. Retrofit2: adapter - rxjava2:2.9.0'
implementation 'the IO. Reactivex. Rxjava2: rxjava: 2.2.21'
implementation 'the IO. Reactivex. Rxjava2: rxandroid: 2.1.1'
Copy the code

The first one is RxJava2. After adding the above dependencies on RxJavaAdapter, you can add the following code when initializing Retrofit to add a callAdapter.factory for RxJava2

retrofit = new Retrofit.Builder()
        .baseUrl("https://api.github.com")
        .addCallAdapterFactory(RxJava2CallAdapterFactory.create())
        .addConverterFactory(GsonConverterFactory.create())
        .build();


// RxJava2CallAdapterFactory.java
public static RxJava2CallAdapterFactory create(a) {
  return new RxJava2CallAdapterFactory(null.false);
}

private RxJava2CallAdapterFactory(@Nullable Scheduler scheduler, boolean isAsync) {
  this.scheduler = scheduler;
  this.isAsync = isAsync;
}
Copy the code

There are two parameters RxJava2CallAdapterFactory construction method

  • schedulerYou can specifysubscribeOnThread scheduling
  • isAsyncThe corresponding isOkHttptheSynchronous and asynchronous execution. Ps: in this paper,Parsing is performed only for synchronization.

Thus, the network request interface can be defined as follows:

@GET("users/{user}/repos")
Observable<List<Repo>> listReposRx(@Path("user") String user);
Copy the code

RxJava2CallAdapterFactory

The us for the return type of the interface declaration for observables, will match to RxJava2CallAdapterFactory in loadServiceMethod, invoke the get method, get a RxJava2CallAdapter object.

// RxJava2CallAdapterFactory.java
public @NullableCallAdapter<? ,? > get( Type returnType, Annotation[] annotations, Retrofit retrofit) { Class<? > rawType = getRawType(returnType);if (rawType == Completable.class) {
    // Completable is not parameterized (which is what the rest of this method deals with) so it
    // can only be created with a single configuration.
    return new RxJava2CallAdapter(
        Void.class, scheduler, isAsync, false.true.false.false.false.true);
  }

  boolean isFlowable = rawType == Flowable.class;
  boolean isSingle = rawType == Single.class;
  boolean isMaybe = rawType == Maybe.class;
  if(rawType ! = Observable.class && ! isFlowable && ! isSingle && ! isMaybe) {return null;
  }

  boolean isResult = false;
  boolean isBody = false;
  Type responseType;
  if(! (returnTypeinstanceof ParameterizedType)) {
    String name =
        isFlowable ? "Flowable" : isSingle ? "Single" : isMaybe ? "Maybe" : "Observable";
    throw new IllegalStateException(
        name
            + " return type must be parameterized"
            + " as "
            + name
            + "<Foo> or "
            + name
            + "<? extends Foo>");
  }

  Type observableType = getParameterUpperBound(0, (ParameterizedType) returnType); Class<? > rawObservableType = getRawType(observableType);if (rawObservableType == Response.class) {
    if(! (observableTypeinstanceof ParameterizedType)) {
      throw new IllegalStateException(
          "Response must be parameterized" + " as Response<Foo> or Response<? extends Foo>");
    }
    responseType = getParameterUpperBound(0, (ParameterizedType) observableType);
  } else if (rawObservableType == Result.class) {
    if(! (observableTypeinstanceof ParameterizedType)) {
      throw new IllegalStateException(
          "Result must be parameterized" + " as Result<Foo> or Result<? extends Foo>");
    }
    responseType = getParameterUpperBound(0, (ParameterizedType) observableType);
    isResult = true;
  } else {
    responseType = observableType;
    isBody = true;
  }

  return new RxJava2CallAdapter(
      responseType, scheduler, isAsync, isResult, isBody, isFlowable, isSingle, isMaybe, false);
}
Copy the code

As you can see from the code, the Adapter also supports RxJava’s Completable, Flowable, Single, and Maybe.

RxJava2CallAdapter

Then take a look at the RxJava2CallAdapter#adapt method

// RxJava2CallAdapter.java
public Object adapt(Call<R> call) {
  Observable<Response<R>> responseObservable =
      isAsync ? new CallEnqueueObservable<>(call) : newCallExecuteObservable<>(call); Observable<? > observable;if (isResult) {
    observable = new ResultObservable<>(responseObservable);
  } else if (isBody) {
    observable = new BodyObservable<>(responseObservable);
  } else {
    observable = responseObservable;
  }

  if(scheduler ! =null) {
    observable = observable.subscribeOn(scheduler);
  }

  if (isFlowable) {
    return observable.toFlowable(BackpressureStrategy.LATEST);
  }
  if (isSingle) {
    return observable.singleOrError();
  }
  if (isMaybe) {
    return observable.singleElement();
  }
  if (isCompletable) {
    return observable.ignoreElements();
  }
  return RxJavaPlugins.onAssembly(observable);
}
Copy the code
  • First of all, based onisAsyncTo create aObservableObject,CallExecuteObservableThe packaging isOkHttp synchronization execution logic. Of course, we need moreWill encapsulate the OkHttp request logicOkHttpCallobjectThe incoming.
  • In accordance with the externally defined return typeObservableAs a conversion.
@GET("users/{user}/repos")
Observable<List<Repo>> listReposRx(@Path("user") String user);
Copy the code

So the interface methods defined in the beginning, the returned observables object is actually a CallExecuteObservable/CallEnqueueObservable.

CallExecuteObservable

Let’s take a look at the external network request

Observable<List<Repo>> observable = service.listReposRx("octocat");
observable.subscribe(new Consumer<List<Object>>() {
    @Override
    public void accept(List<Repo> objects) throws Exception {}},new Consumer<Throwable>() {
    @Override
    public void accept(Throwable throwable) throws Exception {}},new Action() {
    @Override
    public void run(a) throws Exception {}});Copy the code

Let’s look at the logic of a CallExecuteObservable

// CallExecuteObservable.java
final class CallExecuteObservable<T> extends Observable<Response<T>> {
  private final Call<T> originalCall;

  CallExecuteObservable(Call<T> originalCall) {
    this.originalCall = originalCall;
  }

  @Override
  protected void subscribeActual(Observer<? super Response<T>> observer) {
    // Since Call is a one-shot type, clone it for each new observer.
    Call<T> call = originalCall.clone();
    CallDisposable disposable = new CallDisposable(call);
    observer.onSubscribe(disposable);
    if (disposable.isDisposed()) {
      return;
    }

    boolean terminated = false;
    try {
      Response<T> response = call.execute();
      if(! disposable.isDisposed()) { observer.onNext(response); }if(! disposable.isDisposed()) { terminated =true; observer.onComplete(); }}catch (Throwable t) {
      Exceptions.throwIfFatal(t);
      if (terminated) {
        RxJavaPlugins.onError(t);
      } else if(! disposable.isDisposed()) {try {
          observer.onError(t);
        } catch (Throwable inner) {
          Exceptions.throwIfFatal(inner);
          RxJavaPlugins.onError(newCompositeException(t, inner)); }}}}...Copy the code

An external call to observable.subscribe eventually leads to the CallExecuteObservable#subscribeActual method, at which point the network request is formally initiated. For those who don’t know, read my article on RxJava: What does Rx observable. create do?

  • throughcall.execute();In order tosynchronousTo initiate a network request.
  • After the network request results are responded, theonNextIn the callback.

At this point, the network request initiation and response is closed loop.

The thread switch issue

As mentioned in the previous article, Retrofit’s default implementation switches threads when responding. It should also be added that the need to switch threads is also determined by OkHttp’s synchronous asynchrony, as OkHttp is executed on the current thread during synchronous execution. Of course, the OkHttp asynchronous execution will first check maxRequests and maxRequestsPerHost.

For example, the current code declaration is:

observable
    .subscribeOn(Schedulers.io())
    .subscribe(new Consumer<List<Object>>() {
    @Override
    public void accept(List<Object> objects) throws Exception {}},new Consumer<Throwable>() {
    @Override
    public void accept(Throwable throwable) throws Exception {}},new Action() {
    @Override
    public void run(a) throws Exception {}});Copy the code

So when CallExecuteObservable#subscribeActual is executed, the thread is schedulers.io () and call.execute(); The schedulers.io () thread is also schedulers.io (), the same thread when onNext responds.

If you change the invocation to asynchronous, you can do so when you initially initialize Retrofit

.addCallAdapterFactory(RxJava2CallAdapterFactory.createAsync())
Copy the code

When CallEnqueueObservable#subscribeActual is executed, the thread is schedulers.io () and call.enqueue() is executed. Since the network request is managed by OkHttp’s thread pool, the callback thread is a thread from OkHttp’s thread pool and also OkHttp’s thread when onNext responds.

Ps: For asynchronous execution, OkHttp sets limits on the maximum number of requests, the maximum number of requests to the same Host, and so on. Synchronization relies on the thread pool scheduling set by RxJava’s subscribeOn. This can be used according to the actual situation.

Implement a thread-switch method in response to a callback that can be declared at call time

implementation 'the IO. Reactivex. Rxjava2: rxandroid: 2.1.1'
Copy the code
observable
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
Copy the code

This will convert the onNext callback to execute on the main thread.

Ps: To understand RxJava’s thread switching problem, you can read another article by me: subscribeOn and observeOn source code parsing for RxJava

A few development tips

In Restful network interface design, we generally agree on a fixed response format at the back end, such as:

{
    "errCode": ""."errMessage": ""."data": {}}Copy the code

Then we can have a fixed response data entity, which I’m sure many people do

public class XResponse<T> {
    private String errCode;
    private String errMessage;
    private T data;

    public String getErrCode(a) {
        return errCode;
    }

    public void setErrCode(String errCode) {
        this.errCode = errCode;
    }

    public String getErrMessage(a) {
        return errMessage;
    }

    public void setErrMessage(String errMessage) {
        this.errMessage = errMessage;
    }

    public T getData(a) {
        return data;
    }

    public void setData(T data) {
        this.data = data; }}Copy the code

In general, we also need a common place to judge the status of back-end requests. Using RxJava to combine this entity, you can encapsulate two forms of common layer

The customObserver

abstract class HttpResultObserver<T> extends ResourceObserver<XResponse<T>> {
    @Override
    public void onNext(@NonNull XResponse<T> tResponse) {
        if (tResponse.getErrCode().equals("OK")) {
            onSuccess(tResponse.getData());
        } else {
            onFailed(newException(tResponse.getErrMessage())); }}@Override
    public void onError(@NonNull Throwable e) {
        onFailed(new Exception(e));
    }

    @Override
    public void onComplete(a) {}abstract void onSuccess(T data);

    abstract void onFailed(Exception e);
}

Observable<XResponse<List<Repo>>> observable = service.listReposRx2("abc");

Disposable disposable = observable.subscribeWith(new HttpResultObserver<List<Repo>>() {
    @Override
    void onSuccess(List<Object> data) {}@Override
    void onFailed(Exception e) {}});Copy the code

You can define a ResourceObserver to determine the value of errCode during onNext to determine whether the network request succeeds or fails.

The customObservable

The second method is to customize Observables based on RxJava

public class HttpResultObservable<T> extends Observable<T> {
    final Observable<XResponse<T>> source;

    public HttpResultObservable(Observable<XResponse<T>> source) {
        this.source = source;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        HttpResultObserver<T> parent = new HttpResultObserver<>(observer);
        observer.onSubscribe(parent);
        source.subscribe(parent);
    }

    static class HttpResultObserver<T> extends ResourceObserver<XResponse<T>> {
        final Observer<? super T> observer;

        HttpResultObserver(Observer<? super T> observer) {
            this.observer = observer;
        }

        @Override
        public void onNext(@NonNull XResponse<T> txResponse) {
            if (txResponse.getErrCode().equals("OK")) {
                observer.onNext(txResponse.getData());
            } else {
                observer.onError(newException(txResponse.getErrMessage())); }}@Override
        public void onError(@NonNull Throwable e) {
            observer.onError(e);
        }

        @Override
        public void onComplete(a) {
            observer.onComplete();
        }
    }
}

Observable<XResponse<List<Repo>>> observable = service.listReposRx2("abc");

Disposable disposable = new HttpResultObservable<List<Object>>(observable)
        .subscribe(new Consumer<List<Repo>>() {
            @Override
            public void accept(List<Object> objects) throws Exception {}},new Consumer<Throwable>() {
            @Override
            public void accept(Throwable throwable) throws Exception {}});Copy the code

Either way you can return a Disposable object, dispose() can be called in conjunction with the Activity life cycle to avoid memory leaks.

Java8

I currently use Kotlin for most of my daily Android development, and Retrofit + RxJava is currently the mainstream. But considering that Java8 is actually an older version, it’s worth knowing based on its new features. In addition, JDK11 is required in Gradle7.0. So this article will take a look at Retrofit’s CallAdapter adaptation for Java8.

The complete Java8 call network request code:

CompletableFuture<List<Repo>> completableFuture = service.listReposJava8("octocat");
try {
    List<Repo> list = completableFuture.get();
} catch (ExecutionException e) {
    e.printStackTrace();
} catch (InterruptedException e) {
    e.printStackTrace();
}
Copy the code

Java8 is supported by default in the latest version of Retrofit, which requires an Android SDK of 24 and above. In Retrofit. Builder# will join a CompletableFutureCallAdapterFactory build.

Ps: CompletableFuture is a new addition to Java8.

// Platform.java
static final class Android extends Platform {
  Android() {
    super(Build.VERSION.SDK_INT >= 24);   HasJava8Types is true for 24 and above}... } List<? extends CallAdapter.Factory> defaultCallAdapterFactories(@Nullable Executor callbackExecutor) {
  DefaultCallAdapterFactory executorFactory = new DefaultCallAdapterFactory(callbackExecutor);
  return hasJava8Types
      ? asList(CompletableFutureCallAdapterFactory.INSTANCE, executorFactory)
      : singletonList(executorFactory);
}
Copy the code

CompletableFutureCallAdapterFactory

// CompletableFutureCallAdapterFactory.java
public @NullableCallAdapter<? ,? > get( Type returnType, Annotation[] annotations, Retrofit retrofit) {if(getRawType(returnType) ! = CompletableFuture.class) {return null;
  }
  if(! (returnTypeinstanceof ParameterizedType)) {
    throw new IllegalStateException(
        "CompletableFuture return type must be parameterized"
            + " as CompletableFuture<Foo> or CompletableFuture<? extends Foo>");
  }
  Type innerType = getParameterUpperBound(0, (ParameterizedType) returnType);

  if(getRawType(innerType) ! = Response.class) {// Generic type is not Response<T>. Use it for body-only adapter.
    return new BodyCallAdapter<>(innerType);
  }

  // Generic type is Response<T>. Extract T and create the Response version of the adapter.
  if(! (innerTypeinstanceof ParameterizedType)) {
    throw new IllegalStateException(
        "Response must be parameterized" + " as Response<Foo> or Response<? extends Foo>");
  }
  Type responseType = getParameterUpperBound(0, (ParameterizedType) innerType);
  return new ResponseCallAdapter<>(responseType);
}
Copy the code

CompletableFutureCallAdapterFactory# get in, according to the return parameter return to different CallAdapter CompletableFuture and its generic type. Here we take the BodyCallAdapter as an example.

BodyCallAdapter

// CompletableFutureCallAdapterFactory.java
private static final class BodyCallAdapter<R> implements CallAdapter<R.CompletableFuture<R>> {
  private final Type responseType;

  BodyCallAdapter(Type responseType) {
    this.responseType = responseType;
  }

  @Override
  public Type responseType(a) {
    return responseType;
  }

  @Override
  public CompletableFuture<R> adapt(final Call<R> call) {
    CompletableFuture<R> future = new CallCancelCompletableFuture<>(call);
    call.enqueue(new BodyCallback(future));
    return future;
  }

  @IgnoreJRERequirement
  private class BodyCallback implements Callback<R> {
    private final CompletableFuture<R> future;

    public BodyCallback(CompletableFuture<R> future) {
      this.future = future;
    }

    @Override
    public void onResponse(Call<R> call, Response<R> response) {
      if (response.isSuccessful()) {
        future.complete(response.body());
      } else {
        future.completeExceptionally(newHttpException(response)); }}@Override
    public void onFailure(Call<R> call, Throwable t) { future.completeExceptionally(t); }}}Copy the code

From adapt as you can see, will eventually return to a CallCancelCompletableFuture object, and the return type is, in fact, the upper call. This class only handles cancellation of network requests and is not explained here.

Call is OkHttpCall, and in this scenario, the adapt method is called and the network request (call.enqueue) is initiated.

Upon the response, the corresponding methods of CompletableFuture are called to indicate completion or failure.

Get request results

try {
    List<Repo> list = completableFuture.get();
} catch (ExecutionException e) {
    e.printStackTrace();
} catch (InterruptedException e) {
    e.printStackTrace();
}
Copy the code
  • completableFuture.get();Methods willHas been blockedUp to the topcomplete/completeExceptionallyThe callback.
  • When an error occurs, it is thrownExecutionException/InterruptedException.

CompletableFuture (); completableFuture.get(); The call to the OkHttp thread is still in the current thread, and the internal network request is called asynchronously, so it goes to the thread in the OkHttp thread pool. The get method has a spin-like design inside it that loops until the OkHttp thread requests a response and gets the result.

The last

This article introduces the various CallAdapters for Retrofit, focusing on RxJava2/Java8 adaptation. Retrofit also has a lot of interesting designs inside ofit, and it’s highly recommended that you read the logic on your own if you have the time, which will help you with your programming ideas.