preface

This article is a detailed analysis of the OkHttp open source library. If you feel you are not familiar with OkHttp and want to learn more, I believe this article will help you.

This article contains a detailed analysis of the request process, the major interceptor interpretation and a little reflection summary of my own, the article is very long, welcome to share the discussion.

Method of use

Using the method is simple: create an OkHttpClient object and a Request object, and then create a Call object using both of them. Finally, Call the synchronous Request execute() method or the asynchronous Request enqueue() method to get the Response.

private final OkHttpClient client = new OkHttpClient();
    Request request = new Request.Builder()
      .url("https://github.com/")
      .build();
    // Synchronize the request
    Response response = client.newCall(request).execute();
    //todo handle response

    // Asynchronous request
    client.newCall(request).enqueue(new Callback() {
      @Override
      public void onFailure(@NotNull Call call, @NotNull IOException e) {
	  //todo handle request failed
      }

      @Override
      public void onResponse(@NotNull Call call, @NotNull Response response) throws IOException {
          //todo handle Response}});Copy the code

Introduction to Basic Objects

As described in the usage method, we have built OkHttpClient object, Request object, Call object, so what do these objects mean, what do they do? This needs us to further learn to understand.

OkHttpClient

A request configuration class, using the builder mode, easy to configure some request parameters, such as configure callTimeout, cookie, interceptor and so on.

open class OkHttpClient internal constructor(
  builder: Builder
) : Cloneable, Call.Factory, WebSocket.Factory {

  constructor() : this(Builder())

  class Builder constructor() {
    / / scheduler
    internal var dispatcher: Dispatcher = Dispatcher()
    / / the connection pool
    internal var connectionPool: ConnectionPool = ConnectionPool()
    // Whole process interceptor
    internal val interceptors: MutableList<Interceptor> = mutableListOf()
    // Network flow interceptor
    internal val networkInterceptors: MutableList<Interceptor> = mutableListOf()
    // Process listener
    internal var eventListenerFactory: EventListener.Factory = EventListener.NONE.asFactory()
    // Whether to reconnect when the connection fails
    internal var retryOnConnectionFailure = true
    // Server authentication Settings
    internal var authenticator: Authenticator = Authenticator.NONE
    // Whether to redirect
    internal var followRedirects = true
    // Whether to redirect from HTTP to HTTPS
    internal var followSslRedirects = true
    / / cookie Settings
    internal var cookieJar: CookieJar = CookieJar.NO_COOKIES
    // Cache Settings
    internal var cache: Cache? = null
    / / DNS Settings
    internal var dns: Dns = Dns.SYSTEM
    // Proxy Settings
    internal var proxy: Proxy? = null
    // Proxy selector Settings
    internal var proxySelector: ProxySelector? = null
    // Proxy server authentication Settings
    internal var proxyAuthenticator: Authenticator = Authenticator.NONE
    / / socket configuration
    internal var socketFactory: SocketFactory = SocketFactory.getDefault()
    / / HTTPS socket configuration
    internal var sslSocketFactoryOrNull: SSLSocketFactory? = null
    internal var x509TrustManagerOrNull: X509TrustManager? = null
    internal var connectionSpecs: List<ConnectionSpec> = DEFAULT_CONNECTION_SPECS
    / / agreement
    internal var protocols: List<Protocol> = DEFAULT_PROTOCOLS
    // Verify domain name
    internal var hostnameVerifier: HostnameVerifier = OkHostnameVerifier
    internal var certificatePinner: CertificatePinner = CertificatePinner.DEFAULT
    internal var certificateChainCleaner: CertificateChainCleaner? = null
    // The request timed out
    internal var callTimeout = 0
    // Connection timed out
    internal var connectTimeout = 10 _000
    // Read times out
    internal var readTimeout = 10 _000
    // Write timeout
    internal var writeTimeout = 10 _000
    internal var pingInterval = 0
    internal var minWebSocketMessageToCompress = RealWebSocket.DEFAULT_MINIMUM_DEFLATE_SIZE
    internal var routeDatabase: RouteDatabase? = null... omit codeCopy the code

Request

The same configuration class of Request parameters, also uses the builder pattern, but compared to OkHttpClient, Request is very simple, only four parameters, respectively is the Request URL, Request method, Request header, Request body.

class Request internal constructor(
  @get:JvmName("url") val url: HttpUrl,
  @get:JvmName("method") val method: String,
  @get:JvmName("headers") val headers: Headers,
  @get:JvmName("body") valbody: RequestBody? .internal val tags: Map<Class<*>, Any>
) {

  open class Builder {
    // The requested URL
    internal var url: HttpUrl? = null
    // Request methods, such as GET, POST..
    internal var method: String
    / / request header
    internal var headers: Headers.Builder
    / / request body
    internal var body: RequestBody? = null... omit codeCopy the code

Call

Request invocation interface, indicating that the request is ready to be executed or cancelled only once.

interface Call : Cloneable {
  /** Returns the original request that made this call */
  fun request(a): Request

  /** * Synchronize the request and execute it immediately. IOException is thrown when the request fails. * 2. Throw IllegalStateException if executed once before; * /
  @Throws(IOException::class)
  fun execute(a): Response

  /** * asynchronous request, scheduling the request to be executed at some point in the future. Throws IllegalStateException */ if executed once before
  fun enqueue(responseCallback: Callback)

  /** Cancel the request. Completed requests cannot be cancelled */
  fun cancel(a)

  /** Has been executed */
  fun isExecuted(a): Boolean

  /** Whether to cancel */
  fun isCanceled(a): Boolean

  / * * a complete Call request timeout configuration process, the default selected from [OkHttpClient. Builder. CallTimeout] * /
  fun timeout(a): Timeout

  /** Clone the call and create a new identical call */
  public override fun clone(a): Call

  /** Use factory mode to let OkHttpClient create the Call object */
  fun interface Factory {
    fun newCall(request: Request): Call
  }
}
Copy the code

RealCall

In OkHttpClient, we use the newCall method to create a Call object, but we can see from the source that the newCall method returns a RealCall object.

OkHttpClient.kt

override fun newCall(request: Request): Call = RealCall(this, request, forWebSocket = false)
Copy the code

RealCall is a concrete implementation class of the Call interface. It is a bridge between the application end and the network layer, displaying the original request and connection data of the application end, as well as the response and other data flows returned by the network layer. The RealCall object also contains synchronous request execute() and asynchronous request enqueue() methods. (Detailed analysis will be carried out later)

AsyncCall

Asynchronous request invocation, an internal class of RealCall, is a Runnable that is executed by the thread pool in the scheduler.

inner class AsyncCall(
    // The response callback method passed by the user
    private val responseCallback: Callback
  ) : Runnable {
    // The number of requests for the same domain name. Volatile + AtomicInteger Guarantees visibility and atomicity in multiple threads
    @Volatile var callsPerHost = AtomicInteger(0)
      private set

    fun reuseCallsPerHostFrom(other: AsyncCall) {
      this.callsperHost = other.callsperhost} ·· omit code ···fun executeOn(executorService: ExecutorService) {
      client.dispatcher.assertThreadDoesntHoldLock()

      var success = false
      try {
        // Call thread pool execution
        executorService.execute(this)
        success = true
      } catch (e: RejectedExecutionException) {
        val ioException = InterruptedIOException("executor rejected")
        ioException.initCause(e)
        noMoreExchanges(ioException)
        // Call callback.onFailure () when the request fails
        responseCallback.onFailure(this@RealCall, ioException)
      } finally {
        if(! success) {// The request fails, call the scheduler finish method
          client.dispatcher.finished(this) // This call is no longer running!}}}override fun run(a) {
      threadName("OkHttp ${redactedUrl()}") {
        var signalledCallback = false
        timeout.enter()
        try {
          // The request was successful, and the response returned by the server was obtained
          val response = getResponseWithInterceptorChain()
          signalledCallback = true
          // Call callback.onResponse () to pass the response
          responseCallback.onResponse(this@RealCall, response)
        } catch (e: IOException) {
          if (signalledCallback) {
            // Do not signal the callback twice!
            Platform.get().log("Callback failure for ${toLoggableString()}", Platform.INFO, e)
          } else {
            // Call callback.onFailure () when the request fails
            responseCallback.onFailure(this@RealCall, e)
          }
        } catch (t: Throwable) {
          // The request is abnormal. Call the cancel method to cancel the request
          cancel()
          if(! signalledCallback) {val canceledException = IOException("canceled due to $t")
            canceledException.addSuppressed(t)
            // Call callback.onFailure () when the request fails
            responseCallback.onFailure(this@RealCall, canceledException)
          }
          throw t
        } finally {
          // When the request ends, call the scheduler finish method
          client.dispatcher.finished(this)}}}}Copy the code

Dispatcher

The scheduler, which schedules Call objects, also contains a thread pool and an asynchronous request queue for storing and executing AsyncCall objects.

class Dispatcher constructor() {
  @get:Synchronized
  @get:JvmName("executorService") val executorService: ExecutorService
    get() {
      if (executorServiceOrNull == null) {
        // Create a cache thread pool to handle request calls
        executorServiceOrNull = ThreadPoolExecutor(0.Int.MAX_VALUE, 60, TimeUnit.SECONDS,
            SynchronousQueue(), threadFactory("$okHttpName Dispatcher".false))}return executorServiceOrNull!!
    }

  /** Ready asynchronous request queue */
  @get:Synchronized
  private val readyAsyncCalls = ArrayDeque<AsyncCall>()

  /** A running asynchronous request queue containing cancelled but not finished AsyncCall */
  private val runningAsyncCalls = ArrayDeque<AsyncCall>()

  /** A running queue of synchronous requests containing cancelled RealCall */ that have not yet finished
  private valRunningSyncCalls = ArrayDeque<RealCall>()Copy the code

To summarize

object role
Call Request invocation interface, indicating that the request is ready to be executed or can be cancelled only once.
RealCall CallThe concrete implementation class of the interface, which is the bridge between the application and the network layer, containsOkHttpClientwithRequestInformation.
AsyncCall Asynchronous request invocation, in fact, is aRunnable, will be put into the thread pool for processing.
Dispatcher Scheduler, used for schedulingCallObject that contains both a thread pool and an asynchronous request queue for storage and executionAsyncCallObject.
Request Request class, containingurl,method,headers,body.
Response Response data returned by the network layer.
Callback The response callback function interface containsonFailure,onResponseTwo methods.

Process analysis

After the introduction of the object, then according to the use of the method, look at the source code.

A synchronous request

How to use synchronous requests.

client.newCall(request).execute();
Copy the code

The newCall method simply creates a RealCall object and executes its execute() method.

  RealCall.kt
  
  override fun execute(a): Response {
    //CAS determines whether the command has been executed, ensures that it can only be executed once, and throws an exception if it has been executed
    check(executed.compareAndSet(false.true)) { "Already Executed" }

    // The request timed out
    timeout.enter()
    // Enable request listening
    callStart()
    try {
      // Call the executed() method in the scheduler, which simply adds the call to the runningSyncCalls queue
      client.dispatcher.executed(this)
      / / call getResponseWithInterceptorChain method to get the response
      return getResponseWithInterceptorChain()
    } finally {
      // After execution, the scheduler removes the call from the runningSyncCalls queue
      client.dispatcher.finished(this)}}Copy the code

Method call scheduler executed, it is the current RealCall object to join runningSyncCalls queue, then call getResponseWithInterceptorChain method response.

An asynchronous request

Let’s look at asynchronous requests.

  RealCall.kt

  override fun enqueue(responseCallback: Callback) {
    //CAS determines whether the command has been executed, ensures that it can only be executed once, and throws an exception if it has been executed
    check(executed.compareAndSet(false.true)) { "Already Executed" }
    // Enable request listening
    callStart()
    // Create a new AsyncCall object and add it to the readyAsyncCalls queue using the scheduler enqueue method
    client.dispatcher.enqueue(AsyncCall(responseCallback))
  }
Copy the code

Then call the scheduler’s enqueue method,

  Dispatcher.kt
  
  internal fun enqueue(call: AsyncCall) {
    // Lock to ensure thread safety
    synchronized(this) {
      // Add the request call to the readyAsyncCalls queue
      readyAsyncCalls.add(call)

      // Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to
      // the same host.
      if(! call.call.forWebSocket) {// If there is a request for the same domain name, it will be reused.
        val existingCall = findExistingCallWithHost(call.host)
        if(existingCall ! =null) call.reuseCallsPerHostFrom(existingCall)
      }
    }
    // Execute the request
    promoteAndExecute()
  }


  private fun promoteAndExecute(a): Boolean {
    this.assertThreadDoesntHoldLock()

    val executableCalls = mutableListOf<AsyncCall>()
    // Determine whether a request is being executed
    val isRunning: Boolean
    // Lock to ensure thread safety
    synchronized(this) {
      // Iterate over the readyAsyncCalls queue
      val i = readyAsyncCalls.iterator()
      while (i.hasNext()) {
        val asyncCall = i.next()
        // The number of runningAsyncCalls cannot exceed the maximum number of concurrent requests 64
        if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity.
        // The maximum number of requests for the same domain name is 5. A maximum of 5 threads can execute requests for the same domain name simultaneously
        if (asyncCall.callsPerHost.get() > =this.maxRequestsPerHost) continue // Host max capacity.

        // Remove from the readyAsyncCalls queue and join the executableCalls and runningAsyncCalls queues
        i.remove()
        asyncCall.callsPerHost.incrementAndGet()
        executableCalls.add(asyncCall)
        runningAsyncCalls.add(asyncCall)
      }
      // Determine whether a request is being executed by the number of requests in the run queue
      isRunning = runningCallsCount() > 0
    }

    // Iterate over the executable queue and call the thread pool to execute AsyncCall
    for (i in 0 until executableCalls.size) {
      val asyncCall = executableCalls[i]
      asyncCall.executeOn(executorService)
    }

    return isRunning
  }
Copy the code

The scheduler enQueue method simply adds AsyncCall to the readyAsyncCalls queue and calls promoteAndExecute to execute the request. All the promoteAndExecute method does is iterate through the readyAsyncCalls queue and execute the qualified request with the thread pool, which executes the AsynccAll.run () method.

AsyncCall method specific code to see the basic object AsyncCall, this would not be in the show, is simply call getResponseWithInterceptorChain method to get the response, It is then passed through callback.onresponse. Conversely, if the request fails and an exception is caught, the exception information is passed through callback.onFailure. Finally, the request ends and the scheduler finish method is called.

  Dispatcher.kt

  /** Asynchronous request call end method */
  internal fun finished(call: AsyncCall) {
    call.callsPerHost.decrementAndGet()
    finished(runningAsyncCalls, call)
  }

  /** Call the end method */
  internal fun finished(call: RealCall) {
    finished(runningSyncCalls, call)
  }

  private fun <T> finished(calls: Deque<T>, call: T) {
    val idleCallback: Runnable?
    synchronized(this) {
      // Removes the current request invocation from the running queue
      if(! calls.remove(call))throw AssertionError("Call wasn't in-flight!")
      idleCallback = this.idleCallback
    }

    // Continue with the rest of the request, add call from readyAsyncCalls to runningAsyncCalls, and execute
    val isRunning = promoteAndExecute()

    if(! isRunning && idleCallback ! =null) {
      // Call the idle callback method if all requests have been executed and the state is idle
      idleCallback.run()
    }
  }
Copy the code

To obtain the Response

Then look at how getResponseWithInterceptorChain method to get the response.

  internal fun getResponseWithInterceptorChain(a): Response {
    // List of interceptors
    val interceptors = mutableListOf<Interceptor>()
    interceptors += client.interceptors
    interceptors += RetryAndFollowUpInterceptor(client)
    interceptors += BridgeInterceptor(client.cookieJar)
    interceptors += CacheInterceptor(client.cache)
    interceptors += ConnectInterceptor
    if(! forWebSocket) { interceptors += client.networkInterceptors } interceptors += CallServerInterceptor(forWebSocket)// Build the interceptor responsibility chain
    val chain = RealInterceptorChain(
        call = this,
        interceptors = interceptors,
        index = 0,
        exchange = null,
        request = originalRequest,
        connectTimeoutMillis = client.connectTimeoutMillis,
        readTimeoutMillis = client.readTimeoutMillis,
        writeTimeoutMillis = client.writeTimeoutMillis
    )
    // If the call request completes, that means the interaction is complete and there is nothing more to exchange
    var calledNoMoreExchanges = false
    try {
      // Execute the interceptor responsibility chain to get response
      val response = chain.proceed(originalRequest)
      // If cancelled, close the response and throw an exception
      if (isCanceled()) {
        response.closeQuietly()
        throw IOException("Canceled")}return response
    } catch (e: IOException) {
      calledNoMoreExchanges = true
      throw noMoreExchanges(e) as Throwable
    } finally {
      if(! calledNoMoreExchanges) { noMoreExchanges(null)}}}Copy the code

To recap: The chain of responsibility design pattern is used here, where the RealInterceptorChain of responsibilities is constructed using the interceptor and the proceed method is used to get response.

So, what is an interceptor? What is the chain of responsibility for interceptors?

Interceptor

Only one interceptor method is declared, implemented in a subclass, and contains a Chain interface. The core method is proceed(request) to process the request for a response.

fun interface Interceptor {
  /** Intercepting method */
  @Throws(IOException::class)
  fun intercept(chain: Chain): Response

  interface Chain {
    /** Original request data */
    fun request(a): Request

    /** core method, process request, get response */
    @Throws(IOException::class)
    fun proceed(request: Request): Response
    
    fun connection(a): Connection?

    fun call(a): Call

    fun connectTimeoutMillis(a): Int

    fun withConnectTimeout(timeout: Int, unit: TimeUnit): Chain

    fun readTimeoutMillis(a): Int

    fun withReadTimeout(timeout: Int, unit: TimeUnit): Chain

    fun writeTimeoutMillis(a): Int

    fun withWriteTimeout(timeout: Int, unit: TimeUnit): Chain
  }
}
Copy the code

RealInterceptorChain

The Chain of interceptors implements the interceptor. Chain interface, with the emphasis on the override proceed method.

class RealInterceptorChain(
  internal val call: RealCall,
  private val interceptors: List<Interceptor>,
  private val index: Int.internal valexchange: Exchange? .internal val request: Request,
  internal val connectTimeoutMillis: Int.internal val readTimeoutMillis: Int.internal val writeTimeoutMillis: Int) : interceptor.chain {·· omit code ···private var calls: Int = 0
  override fun call(a): Call = call
  override fun request(a): Request = request

  @Throws(IOException::class)
  override fun proceed(request: Request): Response {
    check(index < interceptors.size)

    calls++

    if(exchange ! =null) {
      check(exchange.finder.sameHostAndPort(request.url)) {
        "network interceptor ${interceptors[index - 1]} must retain the same host and port"
      }
      check(calls == 1) {
        "network interceptor ${interceptors[index - 1]} must call proceed() exactly once"}}//index+1, the copy creates a new chain of responsibility, which means that the next handler in the chain of responsibility is called, which is the next interceptor
    val next = copy(index = index + 1, request = request)
    // Retrieve the current interceptor
    val interceptor = interceptors[index]

    // Executes the intercepting method of the current interceptor
    @Suppress("USELESS_ELVIS")
    val response = interceptor.intercept(next) ?: throw NullPointerException(
        "interceptor $interceptor returned null")

    if(exchange ! =null) {
      check(index + 1 >= interceptors.size || next.calls == 1) {
        "network interceptor $interceptor must call proceed() exactly once"} } check(response.body ! =null) { "interceptor $interceptor returned a response with no body" }

    return response
  }
}
Copy the code

The chain call, which eventually executes each interceptor in the list of interceptors, returns Response.

The interceptor

OK, now it’s time to look at specific interceptors in the list of interceptors.

Here’s a summary of the interceptors, in order:

  1. client.interceptors: This is set by the developer before any interceptors process itThe earliestCan be used to add some public parameters, such asThe custom header,Custom logAnd so on.
  2. RetryAndFollowUpInterceptor: This does some initialization of the connection, retries of failed requests, and redirects of subsequent requests. As his name implies, he does retries and some connection tracing.
  3. BridgeInterceptor: is the communication bridge between the client and the server, and is responsible for converting user-built requests into requests required by the server, and converting network requests back into responses available to the user.
  4. CacheInterceptor: Here is mainly related to the cache processing, will be based on the user inOkHttpClientThen create a new cache policy based on the request to determine whether to build using network or cacheresponse.
  5. ConnectInterceptor: Here is mainly responsible for establishing the connection, will establishA TCP connectionorThe TLS connection.
  6. client.networkInterceptors: This is also the developer’s own setup, so it’s essentially the same as the first interceptor, but it’s different because of its location.
  7. CallServerInterceptorThis is the request and response for network data, i.e. the actual network I/O operation, sending the request header and request body to the server, and parsing the data returned by the serverresponse.

Let’s take a look at the interceptors one by one, from top to bottom.

client.interceptors

This is a user-defined Interceptor, called an application Interceptor, stored in the OkHttpClient interceptors: List

List. It is the first interceptor in the chain of responsibility, which means it will execute the interceptor method first. We can use it to add custom headers, such as:

class HeaderInterceptor implements Interceptor {
    @Override
    public Response intercept(Chain chain) throws IOException {
        Request request = chain.request().newBuilder()
                .addHeader("device-android"."xxxxxxxxxxx")
                .addHeader("country-code"."ZH")
                .build();
        returnchain.proceed(request); }}// Then add it to the OkHttpClient
OkHttpClient client = new OkHttpClient.Builder()
    .connectTimeout(60, TimeUnit.SECONDS)
    .readTimeout(15, TimeUnit.SECONDS)
    .writeTimeout(15, TimeUnit.SECONDS)
    .cookieJar(new MyCookieJar())
    .addInterceptor(new HeaderInterceptor())// Add a custom Header interceptor
    .build();
Copy the code

RetryAndFollowUpInterceptor

The second interceptor, known by its name, is responsible for retrying failed requests and redirecting subsequent requests, as well as doing some initialization of the connection.

class RetryAndFollowUpInterceptor(private val client: OkHttpClient) : Interceptor {

  @Throws(IOException::class)
  override fun intercept(chain: Interceptor.Chain): Response {
    val realChain = chain as RealInterceptorChain
    var request = chain.request
    val call = realChain.call
    var followUpCount = 0
    var priorResponse: Response? = null
    var newExchangeFinder = true
    var recoveredFailures = listOf<IOException>()
    while (true) {
      // An ExchangeFinder will be created, which will be used by the ConnectInterceptor
      call.enterNetworkInterceptorExchange(request, newExchangeFinder)

      var response: Response
      var closeActiveExchange = true
      try {
        if (call.isCanceled()) {
          throw IOException("Canceled")}try {
          response = realChain.proceed(request)
          newExchangeFinder = true
        } catch (e: RouteException) {
          // Failed to connect through routing. The request will not be sent.
          if(! recover(e.lastConnectException, call, request, requestSendStarted =false)) {
            throw e.firstConnectException.withSuppressed(recoveredFailures)
          } else {
            recoveredFailures += e.firstConnectException
          }
          newExchangeFinder = false
          continue
        } catch (e: IOException) {
          // Failed to communicate with the server. The request may have been sent.
          if(! recover(e, call, request, requestSendStarted = e !is ConnectionShutdownException)) {
            throw e.withSuppressed(recoveredFailures)
          } else {
            recoveredFailures += e
          }
          newExchangeFinder = false
          continue
        }

        // Attach the prior response if it exists. Such responses never have a body.
        // Try to associate a response. Note that body is null
        if(priorResponse ! =null) {
          response = response.newBuilder()
              .priorResponse(priorResponse.newBuilder()
                  .body(null)
                  .build())
              .build()
        }

        val exchange = call.interceptorScopedExchange
        // Based on the responseCode, a new request is constructed and returned to retry or redirect
        val followUp = followUpRequest(response, exchange)

        if (followUp == null) {
          if(exchange ! =null && exchange.isDuplex) {
            call.timeoutEarlyExit()
          }
          closeActiveExchange = false
          return response
        }
        // If the request body is one-time, there is no need to retry
        val followUpBody = followUp.body
        if(followUpBody ! =null && followUpBody.isOneShot()) {
          closeActiveExchange = false
          returnresponse } response.body? .closeQuietly()// The maximum number of retries varies by browser, for example, 21 for Chrome and 16 for Safari
        if (++followUpCount > MAX_FOLLOW_UPS) {
          throw ProtocolException("Too many follow-up requests: $followUpCount")
        }

        request = followUp
        priorResponse = response
      } finally {
        call.exitNetworkInterceptorExchange(closeActiveExchange)
      }
    }
  }

  /** Determine whether to retry, false-> Do not retry; True -> Attempt to reconnect. * /
  private fun recover(
    e: IOException,
    call: RealCall,
    userRequest: Request,
    requestSendStarted: Boolean
  ): Boolean {
    // The client disallows retries
    if(! client.retryOnConnectionFailure)return false

    // The request body cannot be sent again
    if (requestSendStarted && requestIsOneShot(e, userRequest)) return false

    // An exception occurs that is fatal and cannot be recovered, e.g. :ProtocolException
    if(! isRecoverable(e, requestSendStarted))return false

    // There is no more way to try to reconnect
    if(! call.retryAfterFailure())return false

    // For failed recovery, use the same route selector with a new connection
    return true}... omit codeCopy the code

BridgeInterceptor

As its name suggests, it is positioned as a bridge between the client and the server, and is responsible for converting user-generated requests into those required by the server, such as adding content-Type, Cookie, and User-Agent. Then do some processing to convert the response returned by the server into the response required by the client. For example, remove content-encoding, content-Length, and so on from the response header.

class BridgeInterceptor(private val cookieJar: CookieJar) : Interceptor {

  @Throws(IOException::class)
  override fun intercept(chain: Interceptor.Chain): Response {
    // Get the original request data
    val userRequest = chain.request()
    val requestBuilder = userRequest.newBuilder()
    // Rebuild the request header, request body information
    val body = userRequest.body

	val contentType = body.contentType()
	requestBuilder.header("Content-Type", contentType.toString())
	requestBuilder.header("Content-Length", contentLength.toString())
	requestBuilder.header("Transfer-Encoding"."chunked")
	requestBuilder.header("Host", userRequest.url.toHostHeader())
	requestBuilder.header("Connection"."Keep-Alive")... omit code/ / add a cookie
    val cookies = cookieJar.loadForRequest(userRequest.url)
    if (cookies.isNotEmpty()) {
      requestBuilder.header("Cookie", cookieHeader(cookies))
    }
    / / add the user-agent
    if (userRequest.header("User-Agent") = =null) {
      requestBuilder.header("User-Agent", userAgent)
    }
    // Rebuild a Request and execute the next interceptor to process the Request
    val networkResponse = chain.proceed(requestBuilder.build())

    cookieJar.receiveHeaders(userRequest.url, networkResponse.headers)

    // Create a new responseBuilder to build the original request data into the response
    val responseBuilder = networkResponse.newBuilder()
        .request(userRequest)

    if (transparentGzip &&
        "gzip".equals(networkResponse.header("Content-Encoding"), ignoreCase = true) &&
        networkResponse.promisesBody()) {
      val responseBody = networkResponse.body
      if(responseBody ! =null) {
        val gzipSource = GzipSource(responseBody.source())
        val strippedHeaders = networkResponse.headers.newBuilder()
            .removeAll("Content-Encoding")
            .removeAll("Content-Length")
            .build()
        // Modify response header information to remove content-encoding and content-Length information
        responseBuilder.headers(strippedHeaders)
        val contentType = networkResponse.header("Content-Type")
        // Modify the response Body information
        responseBuilder.body(RealResponseBody(contentType, -1L, gzipSource.buffer()))
      }
    }
    
    returnResponsebuilder.build () · omit code ··Copy the code

CacheInterceptor

The user can configure the cache with okHttpClient. cache, and the cache interceptor uses CacheStrategy to determine whether to build a response using a network or a cache.

class CacheInterceptor(internal val cache: Cache?) : Interceptor {

  @Throws(IOException::class)
  override fun intercept(chain: Interceptor.Chain): Response {
    val call = chain.call()
    // Retrieve the cache from okHttpClient. cache via request
    valcacheCandidate = cache? .get(chain.request())

    val now = System.currentTimeMillis()
    // Create a cache policy to determine how to use the cache
    val strategy = CacheStrategy.Factory(now, chain.request(), cacheCandidate).compute()
    // If the value is empty, the network is not used. Otherwise, the network is used
    val networkRequest = strategy.networkRequest
    // If the value is null, caching is not used; otherwise, caching is used
    val cacheResponse = strategy.cacheResponse
    // Track network and cache usagecache? .trackResponse(strategy)val listener = (call as? RealCall)? .eventListener ? : EventListener.NONE// There is a cache but it does not apply, close it
    if(cacheCandidate ! =null && cacheResponse == null) { cacheCandidate.body? .closeQuietly() }// If the network is disabled and the cache is empty, construct a response with code 504 and return it
    if (networkRequest == null && cacheResponse == null) {
      return Response.Builder()
          .request(chain.request())
          .protocol(Protocol.HTTP_1_1)
          .code(HTTP_GATEWAY_TIMEOUT)
          .message("Unsatisfiable Request (only-if-cached)")
          .body(EMPTY_RESPONSE)
          .sentRequestAtMillis(-1L)
          .receivedResponseAtMillis(System.currentTimeMillis())
          .build().also {
            listener.satisfactionFailure(call, it)
          }
    }

    // If we disable the network and do not use the network, and have a cache, build directly from the cache content and return a response
    if (networkRequest == null) {
      returncacheResponse!! .newBuilder() .cacheResponse(stripBody(cacheResponse)) .build().also { listener.cacheHit(call, it) } }// Add a listener for the cache
    if(cacheResponse ! =null) {
      listener.cacheConditionalHit(call, cacheResponse)
    } else if(cache ! =null) {
      listener.cacheMiss(call)
    }

    var networkResponse: Response? = null
    try {
      // The responsibility chain goes down, returning response from the server to networkResponse
      networkResponse = chain.proceed(networkRequest)
    } finally {
      If the request fails, networkResponse is null, and there is a cache, the cache content is not exposed.
      if (networkResponse == null&& cacheCandidate ! =null) { cacheCandidate.body? .closeQuietly() } }// If there is a cache
    if(cacheResponse ! =null) {
      // If the network returns a response code of 304, construct a new response response using the cached content.
      if(networkResponse? .code == HTTP_NOT_MODIFIED) {valresponse = cacheResponse.newBuilder() .headers(combine(cacheResponse.headers, networkResponse.headers)) .sentRequestAtMillis(networkResponse.sentRequestAtMillis) .receivedResponseAtMillis(networkResponse.receivedResponseAtMillis) .cacheResponse(stripBody(cacheResponse)) .networkResponse(stripBody(networkResponse)) .build() networkResponse.body!! .close()// Update the cache after combining headers but before stripping the
        // Content-Encoding header (as performed by initContentStream()).cache!! .trackConditionalCacheHit() cache.update(cacheResponse, response)return response.also {
          listener.cacheHit(call, it)
        }
      } else {
        // Otherwise, the cache response body is closedcacheResponse.body? .closeQuietly() } }// Build a response to the network request
    valresponse = networkResponse!! .newBuilder() .cacheResponse(stripBody(cacheResponse)) .networkResponse(stripBody(networkResponse)) .build()// If the cache is not null, i.e. the user has configured the cache in OkHttpClient, then the newly constructed network request response from the previous step is stored in the cache
    if(cache ! =null) {
      // Check whether the response can be cached based on the code,header, and cachecontrol. noStore
      if (response.promisesBody() && CacheStrategy.isCacheable(response, networkRequest)) {
        // Cache the response
        val cacheRequest = cache.put(response)
        return cacheWritingResponse(cacheRequest, response).also {
          if(cacheResponse ! =null) {
            listener.cacheMiss(call)
          }
        }
      }
      // Only Get requests are cached, and requests from other methods are removed
      if (HttpMethod.invalidatesCache(networkRequest.method)) {
        try {
          // The cache is invalid. Remove the request cache from the client cache configuration
          cache.remove(networkRequest)
        } catch (_: IOException) {
          // The cache cannot be written.}}}returnResponse} ·· omit code ···Copy the code

ConnectInterceptor

Responsible for actually establishing a connection with the server,

object ConnectInterceptor : Interceptor {
  @Throws(IOException::class)
  override fun intercept(chain: Interceptor.Chain): Response {
    val realChain = chain as RealInterceptorChain
    // Initialize an Exchange object
    val exchange = realChain.call.initExchange(chain)
    // Create a new connection responsibility chain based on the Exchange object
    val connectedChain = realChain.copy(exchange = exchange)
    // Execute the link responsibility chain
    return connectedChain.proceed(realChain.request)
  }
}
Copy the code

At a glance, the code is very simple, with only three steps in the interception method.

  1. Initialize oneexchangeObject.
  2. And then based on thisexchangeObject to create a new link responsibility chain.
  3. Perform the link responsibility chain.

So what is this Exchange object?

RealCall.kt

internal fun initExchange(chain: RealInterceptorChain): Exchange { ... Omit code.../ / the exchangeFinder here is created in the RetryAndFollowUpInterceptor
    val exchangeFinder = this.exchangeFinder!!
    // Return an ExchangeCodec (an encoder that encodes request and decodes Response)
    val codec = exchangeFinder.find(client, chain)
    // Build a new Exchange object with CODEC based on Exchange Finder and return
    val result = Exchange(this, eventListener, exchangeFinder, codec) ... Omit code...return result
  }
Copy the code

Look specifically at the exchangeFinder.find () step,

ExchangeFinder.kt

fun find(
    client: OkHttpClient,
    chain: RealInterceptorChain
  ): ExchangeCodec {
    try {
      // Find a qualified available connection and return a RealConnection object
      valresultConnection = findHealthyConnection( connectTimeout = chain.connectTimeoutMillis, readTimeout = chain.readTimeoutMillis, writeTimeout = chain.writeTimeoutMillis, pingIntervalMillis = client.pingIntervalMillis, connectionRetryEnabled = client.retryOnConnectionFailure, doExtensiveHealthChecks = chain.request.method ! ="GET"
      )
      // Depending on the connection, create and return a request response encoder: Http1ExchangeCodec or Http2ExchangeCodec, corresponding to Http1 and Http2 respectively
      return resultConnection.newCodec(client, chain)
    } catch (e: RouteException) {
      trackFailure(e.lastConnectException)
      throw e
    } catch (e: IOException) {
      trackFailure(e)
      throw RouteException(e)
    }
  }
Copy the code

Continue to look at the findHealthyConnection method

ExchangeFinder.kt

  private fun findHealthyConnection(
    connectTimeout: Int,
    readTimeout: Int,
    writeTimeout: Int,
    pingIntervalMillis: Int,
    connectionRetryEnabled: Boolean,
    doExtensiveHealthChecks: Boolean
  ): RealConnection {
    while (true) {
      // Key: find the connection
      val candidate = findConnection(
          connectTimeout = connectTimeout,
          readTimeout = readTimeout,
          writeTimeout = writeTimeout,
          pingIntervalMillis = pingIntervalMillis,
          connectionRetryEnabled = connectionRetryEnabled
      )
      // Check whether the connection is available, if yes, directly return to the connection
      if (candidate.isHealthy(doExtensiveHealthChecks)) {
        return candidate
      }
      // If the connection fails, it is marked as unavailable and removed from the connection poolcandidate.noNewExchanges() ... Omit code... }}Copy the code

To summarize, use the findConnection method to find a connection, determine if it is available, and return the connection if it is.

So the core method is findConnection. Let’s take a closer look at this method:

private fun findConnection(
    connectTimeout: Int, 
    readTimeout: Int,
    writeTimeout: Int,
    pingIntervalMillis: Int,
    connectionRetryEnabled: Boolean
  ): RealConnection {
    if (call.isCanceled()) throw IOException("Canceled")

    // For the first time, try to reconnect the connection in call without trying to get a new connection
    val callConnection = call.connection // This may be mutated by releaseConnectionNoEvents()!
    if(callConnection ! =null) {
      var toClose: Socket? = null
      synchronized(callConnection) {
        if(callConnection.noNewExchanges || ! sameHostAndPort(callConnection.route().address.url)) { toClose = call.releaseConnectionNoEvents() } }// If connection in call has not been released, reuse it.
      if(call.connection ! =null) {
        check(toClose == null)
        return callConnection
      }

      // If connection in call has been released, close the Socket.toClose? .closeQuietly() eventListener.connectionReleased(call, callConnection) }// A new connection is required, so reset some states
    refusedStreamCount = 0
    connectionShutdownCount = 0
    otherFailureCount = 0

    // The second time, try to get a connection from the connection pool, without routing, without multiplexing
    if (connectionPool.callAcquirePooledConnection(address, call, null.false)) {
      val result = call.connection!!
      eventListener.connectionAcquired(call, result)
      return result
    }

    // The connection pool is empty, ready for the next connection attempt
    val routes: List<Route>?
    valroute: Route ... Omit code...// The third time, try again to get a connection from the connection pool, with routing, without multiplexing
      if (connectionPool.callAcquirePooledConnection(address, call, routes, false)) {
        val result = call.connection!!
        eventListener.connectionAcquired(call, result)
        return result
      }

      route = localRouteSelection.next()
    }

    For the fourth time, manually create a new connection
    val newConnection = RealConnection(connectionPool, route)
    call.connectionToCancel = newConnection
    try {
      newConnection.connect(
          connectTimeout,
          readTimeout,
          writeTimeout,
          pingIntervalMillis,
          connectionRetryEnabled,
          call,
          eventListener
      )
    } finally {
      call.connectionToCancel = null
    }
    call.client.routeDatabase.connected(newConnection.route())

    // For the fifth time, try again to get a connection from the connection pool, with routing, with multiplexing.
    // This step is mainly for verification, for example, there is already a connection, can be reused directly, rather than manually create a new connection.
    if (connectionPool.callAcquirePooledConnection(address, call, routes, true)) {
      val result = call.connection!!
      nextRouteToTry = route
      newConnection.socket().closeQuietly()
      eventListener.connectionAcquired(call, result)
      return result
    }

    synchronized(newConnection) {
      // Add the manually created new connection to the connection poolconnectionPool.put(newConnection) call.acquireConnectionNoEvents(newConnection) } eventListener.connectionAcquired(call,  newConnection)return newConnection
  }
Copy the code

As you can see from the code, there are five attempts to get a connection:

  1. The first time, you try to reconnect the Connection in call without having to reacquire the connection.
  2. The second time, try to get a connection from the connection pool, without routing, without multiplexing.
  3. The third time, try again to get a connection from the connection pool, with routing, without multiplexing.
  4. For the fourth time, manually create a new connection.
  5. The fifth time, try again to get a connection from the connection pool, with routing, with multiplexing.

OK, so at this point, we’ve established a connection.

client.networkInterceptors

This interceptor is called a network interceptor and, like the Client. interceptors, is user-defined and also exists as a list in the OkHttpClient.

So what’s the difference between the two interceptors?

In fact, the difference between the two is due to their different positions, the application interceptor is in the first position, so it will be executed anyway, and only once. And network interceptor in the penultimate position, it will not necessarily be performed, but may be executed multiple times, for example: in RetryAndFollowUpInterceptor failure or CacheInterceptor returned directly under the condition of the cache, our network interceptor is not implemented.

CallServerInterceptor

At this point, the client and the server have established a connection, and it is time to send the request header and request body to the server and parse the response returned by the server.

class CallServerInterceptor(private val forWebSocket: Boolean) : Interceptor {

  @Throws(IOException::class)
  override fun intercept(chain: Interceptor.Chain): Response {
    val realChain = chain as RealInterceptorChain
    val exchange = realChain.exchange!!
    val request = realChain.request
    val requestBody = request.body
    var invokeStartEvent = true
    var responseBuilder: Response.Builder? = null
    try {
      // Write the request header
      exchange.writeRequestHeaders(request)
      // If it is not a GET request and the request body is not empty
      if(HttpMethod.permitsRequestBody(request.method) && requestBody ! =null) {
        // When the request header is "Expect: 100-continue", wait for the server to return a response of "HTTP/1.1 100 continue" before sending the request body.
        //POST request, first send the request header, after 100 continue to send the request body
        if ("100-continue".equals(request.header("Expect"), ignoreCase = true)) {
          // Refresh the request, that is, send the request header
          exchange.flushRequest()
          // Parse the response header
          responseBuilder = exchange.readResponseHeaders(expectContinue = true)
          exchange.responseHeadersStart()
          invokeStartEvent = false
        }
        // Write the request body
        if (responseBuilder == null) {
          if (requestBody.isDuplex()) {
            // If the request body is dual-male, the request header is sent first and the request body is sent later
            exchange.flushRequest()
            val bufferedRequestBody = exchange.createRequestBody(request, true).buffer()
            // Write the request body
            requestBody.writeTo(bufferedRequestBody)
          } else {
            // If a "Expect: 100-continue" response is received, write the request body
            val bufferedRequestBody = exchange.createRequestBody(request, false). The buffer () requestBody. WriteTo (bufferedRequestBody) bufferedRequestBody. Close ()}...... ellipsis code// The request body is sentExchange.finishrequest ()... omit codetry {
      if (responseBuilder == null) {
        // Read the response header
        responseBuilder = exchange.readResponseHeaders(expectContinue = false)!!!!! ... omit code// Construct a response
      var response = responseBuilder
          .request(request)
          .handshake(exchange.connection.handshake())
          .sentRequestAtMillis(sentRequestMillis)
          .receivedResponseAtMillis(System.currentTimeMillis())
          .build()
      varCode = response.code · omit code ··returnResponse... omit codeCopy the code

To recap: The send request header is written, and the request ends depending on whether the condition is written to the send request body. Parse the request header returned by the server, construct a new response, and return. Here the CallServerInterceptor is the last interceptor in the chain of interceptors’ responsibilities, so instead of calling chain.proceed() down, it passes the built response up to each interceptor in the chain.

conclusion

We’ve analyzed the flow of requests, both synchronous and asynchronous, and carefully analyzed each interceptor in the interceptor responsibility chain. Now draw a flow chart and briefly summarize, you can walk through the flow chart.

reflection

Design patterns

  1. Builder model: Whether inOkHttpClient,RequestorResponseBecause there are many parameters in these classes, users need to choose the parameters they need to build the instance they want, so in the open source library,The Build modeIs very common.
  2. Factory method pattern: helps generate complex objects such as:Okhttpclient. newCall(Request Request) to create the Call object.
  3. Chain of Responsibility modelThe interceptor responsibility chain is composed of 7 interceptors and then executed from top to bottomResponseThen, upload it back from the bottom.

Thread safety

In the AsyncCall class, the callsPerHost variable is Volatile and AtomicInteger to ensure thread safety in multithreading. And why? Refer to my other article, which I won’t go into here, about the reinvention of Volatile for Android programmers.

inner class AsyncCall(
    private val responseCallback: Callback
  ) : Runnable {
    // The number of requests for the same domain name. Volatile + AtomicInteger Guarantees visibility and atomicity in multiple threads
    @Volatile var callsPerHost = AtomicInteger(0)
      private set. Omit code...Copy the code

The data structure

Why readyAsyncCalls runningAsyncCalls runningSyncCalls ArrayDeque?

First, they are used to store network requests. These requests need to be first come, first served, so they are queued. When enQueue is executed, readyAsyncCalls are iterated and runningAsyncCalls are added to runningAsyncCalls. ArrayDeque is used to search arrays more efficiently than linked lists.

At the end

OkHttp source code parsing is introduced.

In fact, the best way to learn source code is to clone the code themselves, and then to use the method, according to the process, step by step down. Here you can refer to the OkHttp detailed code comment.

In fact, the biggest purpose of sharing articles is to wait for someone to point out my mistakes. If you find any mistakes, please point them out without reservation and consult with an open mind. In addition, if you think this article is good and helpful, please give me a like as encouragement, thank you ~ Peace~!