Analyze the flow roughly according to the design pattern

Let’s start with a picture

Let’s start with a concrete use of OkHttp:

val okHttpClient = OkHttpClient() //val okHttpClient = OkHttpClient().newBuilder().build() val request = Request.builder ().build() val call = okHttpClient.newCall(Request) // Call onFailure(call: Call, e: IOException) { } override fun onResponse(call: Call, response: Response) {}}) val Response = call.execute()Copy the code

Let’s start with asynchronous requests:

First, analyze the creation of OkhttpClient

Public OkHttpClient() {this(new Builder()); } / / -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- Builder -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- - public Builder () {/ / create a distributor, to deal with the Call the dispatcher = new dispatcher (); protocols = DEFAULT_PROTOCOLS; connectionSpecs = DEFAULT_CONNECTION_SPECS; eventListenerFactory = EventListener.factory(EventListener.NONE); proxySelector = ProxySelector.getDefault(); if (proxySelector == null) { proxySelector = new NullProxySelector(); } cookieJar = CookieJar.NO_COOKIES; socketFactory = SocketFactory.getDefault(); hostnameVerifier = OkHostnameVerifier.INSTANCE; certificatePinner = CertificatePinner.DEFAULT; proxyAuthenticator = Authenticator.NONE; authenticator = Authenticator.NONE; connectionPool = new ConnectionPool(); dns = Dns.SYSTEM; followSslRedirects = true; followRedirects = true; retryOnConnectionFailure = true; CallTimeout = 0; connectTimeout = 10_000; readTimeout = 10_000; writeTimeout = 10_000; pingInterval = 0; }Copy the code

Then let’s look at the creation of the Request

Build (); // Only one constructor, protected, can be built Request(Builder Builder).Copy the code

Let’s see what the internal build is doing

Public Builder() {// Set the default request to GET this.method = "GET"; This.headers = new headers.Builder(); Public Request build() {if (url == null) throw new IllegalStateException("url == null"); return new Request(this); }Copy the code

The code above should not be too difficult, so let’s move on to the next step.

okHttpClient.newCall(request)

Call @override public Call newCall(Request Request) {// Create RealCall. NewRealCall (this, Request, false /* for web socket */); }Copy the code

When you look at @override, you should think of this method as either inheritance or implementation

public class OkHttpClient implements Cloneable, Call.Factory, Interface Factory {Call newCall(Request Request); }Copy the code

As you can see from this, the idea is to take advantage of the factory pattern. You don’t care about the implementation details, just provide a method that takes care of the Call object, and hand over the details to the subclasses.

Let’s go back to our main flow

static RealCall newRealCall(OkHttpClient client, Request originalRequest, Boolean forWebSocket) {// Safely publish the Call instance to the EventListener. RealCall RealCall Call = new RealCall(client, originalRequest, forWebSocket); Connection stream requests responses call. Transmitter = new transmitter (client, call); return call; }Copy the code

An asynchronous request

With the Call in hand, the next step is to execute the request

    call.enqueue(object :Callback {
        override fun onFailure(call: Call, e: IOException) {

        }

        override fun onResponse(call: Call, response: Response) {

        }

    })
Copy the code

Let’s go straight to RealCall’s Enqueue

@Override public void enqueue(Callback responseCallback) { synchronized (this) { if (executed) throw new IllegalStateException("Already Executed"); executed = true; } // The transmitter performs eventListener.callstart () transmitter. CallStart (); // Here is the key code client.dispatcher().enqueue(new AsyncCall(responseCallback)); }Copy the code

Let’s start with Dispatcher

public Dispatcher dispatcher() { return dispatcher; } // It looks like nothing, Void enqueue(AsyncCall Call) {synchronized (this) {enqueue(AsyncCall call) {synchronized (this) { Readyasynccalls.add (call) stores request information as a queue; // Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to // the same host. // Default is false if (! Call.get ().forwebSocket) {// Return the call from the running queue, Otherwise look up AsyncCall existingCall = findExistingCallWithHost(call.host()); // Assign this call to callsPerHost if (existingCall! = null) call.reuseCallsPerHostFrom(existingCall); } // callsPerHost promoteAndExecute(); } /** * Promotes eligible calls from {@link #readyAsyncCalls} to {@link #runningAsyncCalls} and runs * them on the executor service. Must not be called with synchronization because executing calls * can call into user code. * * @return  true if the dispatcher is currently running calls. */ private boolean promoteAndExecute() { assert (! Thread.holdsLock(this)); List<AsyncCall> executableCalls = new ArrayList<>(); boolean isRunning; Synchronized (this) {for (Iterator<AsyncCall> I = readyAsynccalls.iterator (); synchronized (this) {for (Iterator<AsyncCall> I = readyAsynccalls.iterator (); i.hasNext(); ) { AsyncCall asyncCall = i.next(); //maxRequests (64) if (runningAsynccalls.size () >= maxRequests) break; If (asynccall.callsperhost ().get() >= maxRequestsPerHost) continue; // Host max capacity. i.remove(); asyncCall.callsPerHost().incrementAndGet(); ExecutableCalls. Add (asyncCall); // Join the runningAsynccalls.add (asyncCall); } isRunning = runningCallsCount() > 0; } for (int i = 0, size = executableCalls.size(); i < size; i++) { AsyncCall asyncCall = executableCalls.get(i); ExecutorService (executorService()); asynccall.executeon (executorService()); } return isRunning; } ------------------------------AsyncCall--------------------------------- /** * Attempt to enqueue this async call on {@code executorService}. This will attempt to clean up * if the executor has been shut down by reporting the call as failed. */ void executeOn(ExecutorService executorService) { assert (! Thread.holdsLock(client.dispatcher())); boolean success = false; Executorservice.execute (this); // Execute the runnable executorService.execute(this); success = true; } catch (RejectedExecutionException e) { InterruptedIOException ioException = new InterruptedIOException("executor rejected"); ioException.initCause(e); transmitter.noMoreExchanges(ioException); / / abnormal return responseCallback. OnFailure (RealCall. This, ioException); } finally { if (! success) { client.dispatcher().finished(this); // This call is no longer running! }}}Copy the code

Let’s look at AsyncCall

    final class AsyncCall extends NamedRunnable {
        ***
        AsyncCall(Callback responseCallback) {
          super("OkHttp %s", redactedUrl());
          this.responseCallback = responseCallback;
        }
        
        ***
    }
Copy the code

We can see that AsyncCall inherits NamedRunnable

 /**
 * Runnable implementation which always sets its thread name.
 */
public abstract class NamedRunnable implements Runnable {
  protected final String name;

  public NamedRunnable(String format, Object... args) {
    this.name = Util.format(format, args);
  }
   
  @Override public final void run() {
    String oldName = Thread.currentThread().getName();
    Thread.currentThread().setName(name);
    try {
      execute();
    } finally {
      Thread.currentThread().setName(oldName);
    }
  }

  protected abstract void execute();
}
Copy the code

You can see a Runnable

The run method sets the name of the thread and executes ();

@Override protected void execute() { boolean signalledCallback = false; transmitter.timeoutEnter(); Try {/ / important methods to getResponseWithInterceptorChain () Response Response = getResponseWithInterceptorChain (); signalledCallback = true; / / return network response information responseCallback. OnResponse (RealCall. This response). } catch (IOException e) { if (signalledCallback) { // Do not signal the callback twice! Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e); } else { responseCallback.onFailure(RealCall.this, e); } } catch (Throwable t) { cancel(); if (! signalledCallback) { IOException canceledException = new IOException("canceled due to " + t); canceledException.addSuppressed(t); responseCallback.onFailure(RealCall.this, canceledException); } throw t; } finally {// Release call Client.dispatcher ().finished(this); }}Copy the code

Finally found the Response figure, requests for network getResponseWithInterceptorChain this method is illustrated in the following some abnormal judgment interface callback, release the call resources, first don’t analysis, We see here, getResponseWithInterceptorChain this method.

Response getResponseWithInterceptorChain() throws IOException { // Build a full stack of interceptors. List<Interceptor>  interceptors = new ArrayList<>(); // Add the user's custom interceptors.addall (client.interceptors()); / / added to the directional interceptor interceptors. Add (new RetryAndFollowUpInterceptor (client)); Add (new BridgeInterceptor(client.cookiejar ())); // Interceptors. add(new CacheInterceptor(client.internalCache()))); Interceptors. add(new ConnectInterceptor(client)); if (! ForWebSocket) {/ / configuration Settings when OkHttpClient networkInterceptors interceptors. AddAll (client.net workInterceptors ()); } // Interceptors. add(new CallServerInterceptor(forWebSocket)); //originalRequest is the actual request, 0 interceptor. Chain = new RealInterceptorChain(Interceptors, transmitter, null, 0, originalRequest) this, client.connectTimeoutMillis(), client.readTimeoutMillis(), client.writeTimeoutMillis()); boolean calledNoMoreExchanges = false; Response = chain.proceed(originalRequest); if (transmitter.isCanceled()) { closeQuietly(response); throw new IOException("Canceled"); } return response; } catch (IOException e) { calledNoMoreExchanges = true; throw transmitter.noMoreExchanges(e); } finally { if (! calledNoMoreExchanges) { transmitter.noMoreExchanges(null); }}}Copy the code

The method that proceeds from the Interceptor.Chain is called to obtain a Response. The method that proceeds from the RealInterceptorChain is the implementation of the Interceptor

@override public Response proceed(Request Request) throws IOException {// Return Request (Request, transmitter) exchange); } public Response proceed(Request request, Transmitter transmitter, @Nullable Exchange exchange) throws IOException { if (index >= interceptors.size()) throw new AssertionError(); // to ensure that each interceptor calls++ only once; // If we already have a stream, confirm that the incoming request will use it. if (this.exchange ! = null && ! this.exchange.connection().supportsUrl(request.url())) { throw new IllegalStateException("network interceptor " + interceptors.get(index - 1) + " must retain the same host and port"); } // If we already have a stream, confirm that this is the only call to chain.proceed(). if (this.exchange ! = null && calls > 1) { throw new IllegalStateException("network interceptor " + interceptors.get(index - 1) + " must call proceed() exactly once"); } // Call the next interceptor in the chain. // The index + 1 realtorchain next = new RealInterceptorChain(interceptors, transmitter, exchange, index + 1, request, call, connectTimeout, readTimeout, writeTimeout); // Interceptor = interceptors.get(index); Intercept Response Response = interceptor.intercept(next); // Confirm that the next interceptor made its required call to chain.proceed(). if (exchange ! = null && index + 1 < interceptors.size() && next.calls ! = 1) { throw new IllegalStateException("network interceptor " + interceptor + " must call proceed() exactly once"); } // Confirm that the intercepted response isn't null. if (response == null) { throw new NullPointerException("interceptor " + interceptor + " returned null"); } if (response.body() == null) { throw new IllegalStateException( "interceptor " + interceptor + " returned a response with no body"); } return response; }Copy the code

As you can see from above, executing intercept gets Response, let’s go inside, we first analyze with a system interceptor.

public final class RetryAndFollowUpInterceptor implements Interceptor { *** @Override public Response intercept(Chain chain) throws IOException { Request request = chain.request(); RealInterceptorChain realChain = (RealInterceptorChain) chain; Transmitter transmitter = realChain.transmitter(); int followUpCount = 0; Response priorResponse = null; While (true) {/ / ready for connection request transmitter prepareToConnect (request); if (transmitter.isCanceled()) { throw new IOException("Canceled"); } Response response; boolean success = false; try { // !!! Response = realchain.proceed (request, transmitter, null); success = true; } catch (RouteException e) { // The attempt to connect via a route failed. The request will not have been sent. if (! recover(e.getLastConnectException(), transmitter, false, request)) { throw e.getFirstConnectException(); } continue; } catch (IOException e) { // An attempt to communicate with a server failed. The request may have been sent. boolean requestSendStarted = ! (e instanceof ConnectionShutdownException); if (! recover(e, transmitter, requestSendStarted, request)) throw e; continue; } finally { // The network call threw an exception. Release any resources. if (! success) { transmitter.exchangeDoneDueToException(); } } // Attach the prior response if it exists. Such responses never have a body. if (priorResponse ! = null) { response = response.newBuilder() .priorResponse(priorResponse.newBuilder() .body(null) .build()) .build(); } Exchange exchange = Internal.instance.exchange(response); Route route = exchange ! = null ? exchange.connection().route() : null; Request followUp = followUpRequest(response, route); if (followUp == null) { if (exchange ! = null && exchange.isDuplex()) { transmitter.timeoutEarlyExit(); } return response; } RequestBody followUpBody = followUp.body(); if (followUpBody ! = null && followUpBody.isOneShot()) { return response; } closeQuietly(response.body()); if (transmitter.hasExchange()) { exchange.detachWithViolence(); } if (++followUpCount > MAX_FOLLOW_UPS) { throw new ProtocolException("Too many follow-up requests: " + followUpCount); } request = followUp; priorResponse = response; * * *}}}Copy the code

The code above starts the most interesting part of OkHttp, which is its most classic pattern, the chain of responsibility pattern, which is a little bit recursive, layer by layer, and you end up with this Response.

A synchronous request

@Override public Response execute() throws IOException { synchronized (this) { if (executed) throw new IllegalStateException("Already Executed"); executed = true; } transmitter.timeoutEnter(); transmitter.callStart(); Try {// Queue requests with the scheduler client.dispatcher().executed(this); / / really start to request return getResponseWithInterceptorChain (); } finally { client.dispatcher().finished(this); }}Copy the code

Finished reading asynchronous, in fact synchronization is very simple, a process, let’s stop here.