This article is based on OkHttp 4.9.0 analysis

What is OkHttp?

As you know, OkHttp is an application-layer framework that clients use to send HTTP messages and process server responses. And the underlying version of the popular Retrofit is also based on Okhttp. So what are the advantages of OkHttp? Let’s take a look:

  • Seamless support for GZIP reduces data traffic
  • Caching response data reduces repeated network requests
  • Request failure automatically retry other IP of host, automatic redirection.
  • If HTTP/2 is not available, use connection pool reuse to reduce request latency.
  • .

use

OkHttpClient client = new OkHttpClient();
Request request = new Request.Builder()
      .url(url)
      .build();
/ / synchronizeResponse Response = client.newCall(request).execute();/ / asynchronousResponse Response = client.newCall(request).enqueue();Copy the code

We can see that synchronous and asynchronous methods Call execute and enqueue. Let’s look at the implementation:

 override fun execute(a): Response {
    check(executed.compareAndSet(false.true)) { "Already Executed" }
    timeout.enter()
    callStart()
    try {
      client.dispatcher.executed(this)
      return getResponseWithInterceptorChain()
    } finally {
      client.dispatcher.finished(this)}}override fun enqueue(responseCallback: Callback) {
    check(executed.compareAndSet(false.true)) { "Already Executed" }
    callStart()
    client.dispatcher.enqueue(AsyncCall(responseCallback))
  }
Copy the code

As you can see, whether synchronous or asynchronous, the dispatcher is used.

The Dispatcher dispenser

The dispatcher has a thread pool inside it, which is what we’re going to use when we’re using asynchronous requests. Let’s take a look at the basic member variables inside Dispatcher:

 // The maximum number of asynchronous requests
 var maxRequests = 64
 // Maximum number of simultaneous requests per host
 var maxRequestsPerHost = 5
 // Idle tasks
 var idleCallback: Runnable? = null
 // Asynchronous request thread pool
 private var executorServiceOrNull: ExecutorService? = null
 val executorService: ExecutorService
 // Asynchronous request waiting queue
 private val readyAsyncCalls = ArrayDeque<AsyncCall>()
 // Asynchronous request execution queue
 private val runningAsyncCalls = ArrayDeque<AsyncCall>()
 // Synchronize the request execution queue
 private val runningSyncCalls = ArrayDeque<RealCall>()
Copy the code

A synchronous request

 @Synchronized internal fun executed(call: RealCall) {
    runningSyncCalls.add(call)
  }
Copy the code

Since it is a synchronous request, nothing needs to be done except to put the callback executed into the synchronous queue.

An asynchronous request

internal fun enqueue(call: AsyncCall) {
    synchronized(this) {
      readyAsyncCalls.add(call)
      if(! call.call.forWebSocket) {val existingCall = findExistingCallWithHost(call.host)
        if(existingCall ! =null) call.reuseCallsPerHostFrom(existingCall)
      }
    }
    promoteAndExecute()
  }
Copy the code

As you can see, when executing an asynchronous request, we put the request on a wait queue and then call promoteAndExecute. So what does this code do?

private fun promoteAndExecute(a): Boolean {
    this.assertThreadDoesntHoldLock()
    
    val executableCalls = mutableListOf<AsyncCall>()
    val isRunning: Boolean
    synchronized(this) {
      val i = readyAsyncCalls.iterator()
      while (i.hasNext()) {
        val asyncCall = i.next()
        
        if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity.
        if (asyncCall.callsPerHost.get() > =this.maxRequestsPerHost) continue // Host max capacity.
        
        i.remove()
        asyncCall.callsPerHost.incrementAndGet()
        executableCalls.add(asyncCall)
        runningAsyncCalls.add(asyncCall)
      }
      isRunning = runningCallsCount() > 0
    }

    for (i in 0 until executableCalls.size) {
      val asyncCall = executableCalls[i]
      asyncCall.executeOn(executorService)
    }

    return isRunning
  }
Copy the code

When the number of requests in progress does not exceed the maximum number of 64 requests, and the number of requests to the same host does not exceed 5, it is added to the execution queue. Start executing.

When the request completes, the dispatcher is also called. When the Finish method is used, let’s look at the Finish method:

 /** Used by [AsyncCall.run] to signal completion. */
  internal fun finished(call: AsyncCall) {
    call.callsPerHost.decrementAndGet()
    finished(runningAsyncCalls, call)
  }

  /** Used by [Call.execute] to signal completion. */
  internal fun finished(call: RealCall) {
    finished(runningSyncCalls, call)
  }

  private fun <T> finished(calls: Deque<T>, call: T) {
    val idleCallback: Runnable?
    synchronized(this) {
      if(! calls.remove(call))throw AssertionError("Call wasn't in-flight!")
      idleCallback = this.idleCallback
    }

    val isRunning = promoteAndExecute()

    if(! isRunning && idleCallback ! =null) {
      idleCallback.run()
    }
  }
Copy the code

Whether synchronous or asynchronous, when the execution is complete, it needs to be removed from the queue, and then determine whether there are ongoing tasks. If not, the idle task is executed.

Request process

The dispatcher’s synchronous and asynchronous operations were combed out earlier, but the real request flow is still in RealCall. Let’s look at its synchronous and asynchronous methods:

 override fun execute(a): Response {
    check(executed.compareAndSet(false.true)) { "Already Executed" }

    timeout.enter()
    callStart()
    try {
      client.dispatcher.executed(this)
      return getResponseWithInterceptorChain()
    } finally {
      client.dispatcher.finished(this)}}Copy the code

Can see the synchronous request directly return the getResponseWithInterceptorChain () method. We are looking at asynchronous requests:

override fun enqueue(responseCallback: Callback) {
    check(executed.compareAndSet(false.true)) { "Already Executed" }

    callStart()
    client.dispatcher.enqueue(AsyncCall(responseCallback))
  }
Copy the code

As you can see, AsyncCall is thrown to handle asynchronous requests. AsyncCall is a Runnable. Let’s just look at its run method.

 override fun run(a) {
      threadName("OkHttp ${redactedUrl()}") {
        var signalledCallback = false
        timeout.enter()
        try {
          val response = getResponseWithInterceptorChain()
          signalledCallback = true
          responseCallback.onResponse(this@RealCall, response)
        } catch (e: IOException) {
          if (signalledCallback) {
            // Do not signal the callback twice!
            Platform.get().log("Callback failure for ${toLoggableString()}", Platform.INFO, e)
          } else {
            responseCallback.onFailure(this@RealCall, e)
          }
        } catch (t: Throwable) {
          cancel()
          if(! signalledCallback) {val canceledException = IOException("canceled due to $t")
            canceledException.addSuppressed(t)
            responseCallback.onFailure(this@RealCall, canceledException)
          }
          throw t
        } finally {
          client.dispatcher.finished(this)}}}Copy the code

As you can see, here is also getResponseWithInterceptorChain () to obtain the response of it. At the heart of OkHttp is this method, which handles the logic of various interceptors.

The interceptor

No matter with the asynchronous request will call to getResponseWithInterceptorChain (), the method mainly use the chain of responsibility pattern the entire request can be divided into several interceptor invocation, simplifies the respective responsibility and logic, but also some custom interceptors can be extended. If you are not sure about the chain of responsibility pattern, check the chain of responsibility pattern of design pattern first.

How to intercept

RealInterceptorChain (RealInterceptorChain) : RealInterceptorChain (RealInterceptorChain) : RealInterceptorChain Let’s look at this method:


  @Throws(IOException::class)
  override fun proceed(request: Request): Response {
    check(index < interceptors.size)

    calls++

    if(exchange ! =null) {
      check(exchange.finder.sameHostAndPort(request.url)) {
        "network interceptor ${interceptors[index - 1]} must retain the same host and port"
      }
      check(calls == 1) {
        "network interceptor ${interceptors[index - 1]} must call proceed() exactly once"}}/ / comment 1
    // Call the next interceptor in the chain.
    val next = copy(index = index + 1, request = request)
    val interceptor = interceptors[index]

    @Suppress("USELESS_ELVIS")
    val response = interceptor.intercept(next) ?: throw NullPointerException(
        "interceptor $interceptor returned null")

    if(exchange ! =null) {
      check(index + 1 >= interceptors.size || next.calls == 1) {
        "network interceptor $interceptor must call proceed() exactly once"} } check(response.body ! =null) { "interceptor $interceptor returned a response with no body" }

    return response
  }
Copy the code

As you can see in comment 1, take the next level of interceptor, execute its Intercept method over and over again, and then return response and then feed back data layer by layer.

Interceptor analysis

Now we’ll look at the interceptor specific logic, direct viewing getResponseWithInterceptorChain () this method:

  @Throws(IOException::class)
  internal fun getResponseWithInterceptorChain(a): Response {
    // Build a full stack of interceptors.
    val interceptors = mutableListOf<Interceptor>()
    // Custom interceptor
    interceptors += client.interceptors
    // Process subsequent requests for redirection and retry failures
    interceptors += RetryAndFollowUpInterceptor(client)
    // Complete the request to handle the network bridge
    interceptors += BridgeInterceptor(client.cookieJar)
    // Handle the cache
    interceptors += CacheInterceptor(client.cache)
    // Handle TCP connections
    interceptors += ConnectInterceptor
    // Handle network
    if(! forWebSocket) { interceptors += client.networkInterceptors }// Handles server communication and encapsulates request data and parses response data
    interceptors += CallServerInterceptor(forWebSocket)

    val chain = RealInterceptorChain(
        call = this,
        interceptors = interceptors,
        index = 0,
        exchange = null,
        request = originalRequest,
        connectTimeoutMillis = client.connectTimeoutMillis,
        readTimeoutMillis = client.readTimeoutMillis,
        writeTimeoutMillis = client.writeTimeoutMillis
    )

    var calledNoMoreExchanges = false
    try {
      val response = chain.proceed(originalRequest)
      if (isCanceled()) {
        response.closeQuietly()
        throw IOException("Canceled")}return response
    } catch (e: IOException) {
      calledNoMoreExchanges = true
      throw noMoreExchanges(e) as Throwable
    } finally {
      if(! calledNoMoreExchanges) { noMoreExchanges(null)}}}Copy the code
RetryAndFollowUpInterceptor

This interceptor deals with retries and redirects. Normally, the first request is not involved. Let’s look at retries first.

    try {
          response = realChain.proceed(request)
          newExchangeFinder = true
        } catch (e: RouteException) {
          // The request will not be sent
          if(! recover(e.lastConnectException, call, request, requestSendStarted =false)) {
            throw e.firstConnectException.withSuppressed(recoveredFailures)
          } else {
            recoveredFailures += e.firstConnectException
          }
          newExchangeFinder = false
          continue
        } catch (e: IOException) {
          // An exception occurred while communicating with the server. The request may have already been sent
          if(! recover(e, call, request, requestSendStarted = e !is ConnectionShutdownException)) {
            throw e.withSuppressed(recoveredFailures)
          } else {
            recoveredFailures += e
          }
          newExchangeFinder = false
          continue
        }
Copy the code

You can see that both exceptions are retried based on the RECOVER method. If true is returned, retries are allowed. So let’s look at the recover method:

private fun recover( e: IOException, call: RealCall, userRequest: Request,
    requestSendStarted: Boolean
  ): Boolean {
    // Retries are not allowed
    if(! client.retryOnConnectionFailure)return false

    // If the request cannot be sent again
    if (requestSendStarted && requestIsOneShot(e, userRequest)) return false

    // If it is not a retry exception
    if(! isRecoverable(e, requestSendStarted))return false

    // No other path to link to
    if(! call.retryAfterFailure())return false

    // For failure recovery, use the same route selector with a new connection.
    return true
  }
Copy the code

If the request ends without an exception, it does not mean that the current response is the final interaction, because we also need to determine whether a redirection is required, and the redirection method is followUpRequest. Let’s take a look:

 @Throws(IOException::class)
  private fun followUpRequest(userResponse: Response, exchange: Exchange?).: Request? {
    valroute = exchange? .connection? .route()val responseCode = userResponse.code

    val method = userResponse.request.method
    when (responseCode) {
    //407: Proxy server is used, and proxy server is authorized
      HTTP_PROXY_AUTH -> {
        valselectedProxy = route!! .proxyif(selectedProxy.type() ! = Proxy.Type.HTTP) {throw ProtocolException("Received HTTP_PROXY_AUTH (407) code while not using proxy")}return client.proxyAuthenticator.authenticate(route, userResponse)
      }
    //401: Unauthorized authorization
      HTTP_UNAUTHORIZED -> return client.authenticator.authenticate(route, userResponse)
    //300, 301, 302, 302, 303, 307, 308: Redirection is required.
      HTTP_PERM_REDIRECT, HTTP_TEMP_REDIRECT, HTTP_MULT_CHOICE, HTTP_MOVED_PERM, HTTP_MOVED_TEMP, HTTP_SEE_OTHER -> {
        return buildRedirectRequest(userResponse, method)
      }
    // 408: The request timed out
      HTTP_CLIENT_TIMEOUT -> {
      // If the client does not allow it, return null
        if(! client.retryOnConnectionFailure) {return null
        }
        // If an attempt is made and still fails, null is returned
        val requestBody = userResponse.request.body
        if(requestBody ! =null && requestBody.isOneShot()) {
          return null
        }
        val priorResponse = userResponse.priorResponse
        if(priorResponse ! =null && priorResponse.code == HTTP_CLIENT_TIMEOUT) {
          return null
        }
        // If the server tells us the retry time, we don't care and return null
        if (retryAfter(userResponse, 0) > 0) {
          return null
        }

        return userResponse.request
      }
    //503: The service is unavailable, but only request again if the server tells you to Retry-After: 0
      HTTP_UNAVAILABLE -> {
        val priorResponse = userResponse.priorResponse
        if(priorResponse ! =null && priorResponse.code == HTTP_UNAVAILABLE) {
          return null
        }

        if (retryAfter(userResponse, Integer.MAX_VALUE) == 0) {
          return userResponse.request
        }

        return null
      }
    //421: Okhttp can merge Http2 links even if the domain name is different, and retry with another link when 421 is returned.
      HTTP_MISDIRECTED_REQUEST -> {
        val requestBody = userResponse.request.body
        if(requestBody ! =null && requestBody.isOneShot()) {
          return null
        }

        if (exchange == null| |! exchange.isCoalescedConnection) {return null
        }

        exchange.connection.noCoalescedConnections()
        return userResponse.request
      }

      else -> return null}}Copy the code

So if this method returns nothing, that means you don’t need to redirect, you just return the response; But if the return is non-empty, the returned Request is rerequested.

BridgeInterceptor

A bridge interceptor is essentially an HTTP request header, and each network request has its own header before it reaches the server. I won’t tell you more here. The current request header used by the interceptor is described as follows:

Request header instructions
Content-Type Request body type
Content-Length/Transfer-Encoding Request body resolution mode
Host Host site requested
Connection: Keep-Alive Keep long connection
Accept-Encoding: gzip The received response supports GZIP compression
Cookie Cookie identification
User-Agent Requested user information
CacheInterceptor

The cache interceptor is mainly used for cache-related processing. If the cache exists locally and the cache logic is matched when sending, the cached response can be directly used. So let’s look at its intercept method:

@Throws(IOException::class)
  override fun intercept(chain: Interceptor.Chain): Response {
    val call = chain.call()
    valcacheCandidate = cache? .get(chain.request())

    val now = System.currentTimeMillis()

    val strategy = CacheStrategy.Factory(now, chain.request(), cacheCandidate).compute()
    val networkRequest = strategy.networkRequest
    valcacheResponse = strategy.cacheResponse cache? .trackResponse(strategy)val listener = (call as? RealCall)? .eventListener ? : EventListener.NONE// If the cached request is not empty and the cached response is empty, it does not apply. Directly closed
    if(cacheCandidate ! =null && cacheResponse == null) { cacheCandidate.body? .closeQuietly() }// If the network request is null and the cache response is null, then the network is disabled and 504 is returned
    if (networkRequest == null && cacheResponse == null) {
      return Response.Builder()
          .request(chain.request())
          .protocol(Protocol.HTTP_1_1)
          .code(HTTP_GATEWAY_TIMEOUT)
          .message("Unsatisfiable Request (only-if-cached)")
          .body(EMPTY_RESPONSE)
          .sentRequestAtMillis(-1L)
          .receivedResponseAtMillis(System.currentTimeMillis())
          .build().also {
            listener.satisfactionFailure(call, it)
          }
    }

    // If the network request is empty, the cache response is not empty
    if (networkRequest == null) {
      returncacheResponse!! .newBuilder() .cacheResponse(stripBody(cacheResponse)) .build().also { listener.cacheHit(call, it) } }// If the network request is not empty and the Cache is not empty, the notification hits the Cache. If the Cache is empty and the Cache is not empty, the notification Cache is lost.
    if(cacheResponse ! =null) {
      listener.cacheConditionalHit(call, cacheResponse)
    } else if(cache ! =null) {
      listener.cacheMiss(call)
    }

    var networkResponse: Response? = null
    try {
      networkResponse = chain.proceed(networkRequest)
    } finally{if (networkResponse == null&& cacheCandidate ! =null) { cacheCandidate.body? .closeQuietly() } }// If the network request response gets a code of 304, then it is not modified and updated to the cache
    if(cacheResponse ! =null) {
      if(networkResponse? .code == HTTP_NOT_MODIFIED) {valresponse = cacheResponse.newBuilder() .headers(combine(cacheResponse.headers, networkResponse.headers)) .sentRequestAtMillis(networkResponse.sentRequestAtMillis) .receivedResponseAtMillis(networkResponse.receivedResponseAtMillis) .cacheResponse(stripBody(cacheResponse)) .networkResponse(stripBody(networkResponse)) .build() networkResponse.body!! .close() cache!! .trackConditionalCacheHit() cache.update(cacheResponse, response)return response.also {
          listener.cacheHit(call, it)
        }
      } else{ cacheResponse.body? .closeQuietly() } }valresponse = networkResponse!! .newBuilder() .cacheResponse(stripBody(cacheResponse)) .networkResponse(stripBody(networkResponse)) .build()if(cache ! =null) {
    // Determine if there is a body and if it can be cached for later use
      if (response.promisesBody() && CacheStrategy.isCacheable(response, networkRequest)) {
        // add cache
        val cacheRequest = cache.put(response)
        return cacheWritingResponse(cacheRequest, response).also {
          if(cacheResponse ! =null) {
            listener.cacheMiss(call)
          }
        }
      }
    // If the requested method is invalid, remove it from the cache
      if (HttpMethod.invalidatesCache(networkRequest.method)) {
        try {
          cache.remove(networkRequest)
        } catch (_: IOException) {
          // The cache cannot be written.}}}return response
  }
Copy the code

The operation of the cache interceptor is relatively simple, albeit slightly convoluted. Whether you can actually cache or request the server is determined by CacheStrategy. Which is this line of code in the cache interceptor:

val strategy = CacheStrategy.Factory(now, chain.request(), cacheCandidate).compute()
Copy the code

Let’s go in and see:

private fun computeCandidate(a): CacheStrategy {
      // There is no cache, return directly
      if (cacheResponse == null) {
        return CacheStrategy(request, null)}// If the required handshake is missing, return directly
      if (request.isHttps && cacheResponse.handshake == null) {
        return CacheStrategy(request, null)}// Determine whether to return based on the response header
      if(! isCacheable(cacheResponse, request)) {return CacheStrategy(request, null)}val requestCaching = request.cacheControl
      // If there is no cache, or the user does not specify a cache, it returns directly
      if (requestCaching.noCache || hasConditions(request)) {
        return CacheStrategy(request, null)}val responseCaching = cacheResponse.cacheControl

      // Get the cached response time from creation to present
      val ageMillis = cacheResponseAge()
      // Get the duration of the valid cache for this response
      var freshMillis = computeFreshnessLifetime()
      // If max-age is specified in the request, we need to combine the response cache duration with the request cache duration to obtain the minimum available response cache duration
      if(requestCaching.maxAgeSeconds ! = -1) {
        freshMillis = minOf(freshMillis, SECONDS.toMillis(requestCaching.maxAgeSeconds.toLong()))
      }

      var minFreshMillis: Long = 0
      // Request the thought cache validity time
      if(requestCaching.minFreshSeconds ! = -1) {
        minFreshMillis = SECONDS.toMillis(requestCaching.minFreshSeconds.toLong())
      }

    // cache-control :must-revalidate Can be cached but must be confirmed to the source server
    // cache-control :max-stale A specified period of time can be used after the Cache expires. If no number of seconds is specified, no matter how long the Cache expires. If specified, the cache can be used for as long as specified
	// The former ignores the latter, so it is not necessary to check with the server to get max-stale in the request header
      var maxStaleMillis: Long = 0
      if(! responseCaching.mustRevalidate && requestCaching.maxStaleSeconds ! = -1) {
        maxStaleMillis = SECONDS.toMillis(requestCaching.maxStaleSeconds.toLong())
      }
      // There is no need to verify validity with the server && the time the response exists + the time the request thinks the cache is valid < the time the cache is valid + the time it can be used after expiration
      // Cache can be used
      if(! responseCaching.noCache && ageMillis + minFreshMillis < freshMillis + maxStaleMillis) {val builder = cacheResponse.newBuilder()
        if (ageMillis + minFreshMillis >= freshMillis) {
          builder.addHeader("Warning"."110 HttpURLConnection \"Response is stale\"")}val oneDayMillis = 24 * 60 * 60 * 1000L
        if (ageMillis > oneDayMillis && isFreshnessLifetimeHeuristic()) {
          builder.addHeader("Warning"."113 HttpURLConnection \"Heuristic expiration\"")}return CacheStrategy(null, builder.build())
      }
      
      val conditionName: String
      val conditionValue: String?
      when{ etag ! =null -> {
          conditionName = "If-None-Match"conditionValue = etag } lastModified ! =null -> {
          conditionName = "If-Modified-Since"conditionValue = lastModifiedString } servedDate ! =null -> {
          conditionName = "If-Modified-Since"
          conditionValue = servedDateString
        }

        else -> return CacheStrategy(request, null)}val conditionalRequestHeaders = request.headers.newBuilder()
      conditionalRequestHeaders.addLenient(conditionName, conditionValue!!)

      val conditionalRequest = request.newBuilder()
          .headers(conditionalRequestHeaders.build())
          .build()
      return CacheStrategy(conditionalRequest, cacheResponse)
    }
Copy the code
ConnectInterceptor

This interceptor is responsible for establishing the connection. Contains the TCP connection (HTTP) or TLS connection (HTTPS) required for the network request, and creates the corresponding HttpCodec object (used to encode and decode the HTTP request). Let’s look at the code:

    val realChain = chain as RealInterceptorChain
    val exchange = realChain.call.initExchange(chain)
    val connectedChain = realChain.copy(exchange = exchange)
    return connectedChain.proceed(realChain.request)
Copy the code

Huh? RealCall’s initExchange method:

    internal fun initExchange(chain: RealInterceptorChain): Exchange {
    synchronized(this) {
      check(expectMoreExchanges) { "released"} check(! responseBodyOpen) check(! requestBodyOpen) }val exchangeFinder = this.exchangeFinder!!
    val codec = exchangeFinder.find(client, chain)
    val result = Exchange(this, eventListener, exchangeFinder, codec)
    this.interceptorScopedExchange = result
    this.exchange = result
    synchronized(this) {
      this.requestBodyOpen = true
      this.responseBodyOpen = true
    }

    if (canceled) throw IOException("Canceled")
    return result
  }
Copy the code

This method is mainly used to find new or merged links for upcoming requests and responses. Let’s look at the lookup logic:

fun find(client: OkHttpClient,chain: RealInterceptorChain): ExchangeCodec {
    try {
      valresultConnection = findHealthyConnection( connectTimeout = chain.connectTimeoutMillis, readTimeout = chain.readTimeoutMillis, writeTimeout = chain.writeTimeoutMillis, pingIntervalMillis = client.pingIntervalMillis, connectionRetryEnabled = client.retryOnConnectionFailure, doExtensiveHealthChecks = chain.request.method ! ="GET"
      )
      return resultConnection.newCodec(client, chain)
    } catch (e: RouteException) {
      trackFailure(e.lastConnectException)
      throw e
    } catch (e: IOException) {
      trackFailure(e)
      throw RouteException(e)
    }
  }
Copy the code

It makes an internal call to findHealthyConnection, which looks for an available link. Let’s move on:

@Throws(IOException::class)
  private fun findHealthyConnection(connectTimeout: Int, readTimeout: Int, writeTimeout: Int, pingIntervalMillis: Int, connectionRetryEnabled: Boolean, doExtensiveHealthChecks: Boolean ): RealConnection {
    while (true) {
    // Find the link
      val candidate = findConnection(
          connectTimeout = connectTimeout,
          readTimeout = readTimeout,
          writeTimeout = writeTimeout,
          pingIntervalMillis = pingIntervalMillis,
          connectionRetryEnabled = connectionRetryEnabled
      )

      // Make sure the link found is available and return
      if (candidate.isHealthy(doExtensiveHealthChecks)) {
        return candidate
      }

      candidate.noNewExchanges()

      if(nextRouteToTry ! =null) continue

      valroutesLeft = routeSelection? .hasNext() ? :true
      if (routesLeft) continue

      valroutesSelectionLeft = routeSelector? .hasNext() ? :true
      if (routesSelectionLeft) continue

      throw IOException("exhausted all routes")}}Copy the code

Let’s look at how to find links:

 @Throws(IOException::class)
  private fun findConnection(
    connectTimeout: Int,
    readTimeout: Int,
    writeTimeout: Int,
    pingIntervalMillis: Int,
    connectionRetryEnabled: Boolean
  ): RealConnection {
    if (call.isCanceled()) throw IOException("Canceled")

    // Try to reuse the call link
    val callConnection = call.connection 
    if(callConnection ! =null) {
      var toClose: Socket? = null
      synchronized(callConnection) {
        if(callConnection.noNewExchanges || ! sameHostAndPort(callConnection.route().address.url)) { toClose = call.releaseConnectionNoEvents() } }// If not null, return
      if(call.connection ! =null) {
        check(toClose == null)
        returncallConnection } toClose? .closeQuietly() eventListener.connectionReleased(call, callConnection) } refusedStreamCount =0
    connectionShutdownCount = 0
    otherFailureCount = 0

    // Try to find a connection from the connection pool and return the connection if it is found
    if (connectionPool.callAcquirePooledConnection(address, call, null.false)) {
      val result = call.connection!!
      eventListener.connectionAcquired(call, result)
      return result
    }

    // If there is no connection pool, find the connection you want to try next
    val routes: List<Route>?
    val route: Route
    if(nextRouteToTry ! =null) {
      routes = null
      route = nextRouteToTry!!
      nextRouteToTry = null
    } else if(routeSelection ! =null&& routeSelection!! .hasNext()) { routes =nullroute = routeSelection!! .next() }else {
      var localRouteSelector = routeSelector
      if (localRouteSelector == null) {
        localRouteSelector = RouteSelector(address, call.client.routeDatabase, call, eventListener)
        this.routeSelector = localRouteSelector
      }
      val localRouteSelection = localRouteSelector.next()
      routeSelection = localRouteSelection
      routes = localRouteSelection.routes

      if (call.isCanceled()) throw IOException("Canceled")

      if (connectionPool.callAcquirePooledConnection(address, call, routes, false)) {
        val result = call.connection!!
        eventListener.connectionAcquired(call, result)
        return result
      }

      route = localRouteSelection.next()
    }

    // Make a connection
    val newConnection = RealConnection(connectionPool, route)
    call.connectionToCancel = newConnection
    try {
      newConnection.connect(
          connectTimeout,
          readTimeout,
          writeTimeout,
          pingIntervalMillis,
          connectionRetryEnabled,
          call,
          eventListener
      )
    } finally {
      call.connectionToCancel = null
    }
    call.client.routeDatabase.connected(newConnection.route())
    // Determine whether it is multiplexed, if so join merge.
    if (connectionPool.callAcquirePooledConnection(address, call, routes, true)) {
      val result = call.connection!!
      nextRouteToTry = route
      newConnection.socket().closeQuietly()
      eventListener.connectionAcquired(call, result)
      return result
    }

    // Drop it into the cache pool
    synchronized(newConnection) {
      connectionPool.put(newConnection)
      call.acquireConnectionNoEvents(newConnection)
    }

    eventListener.connectionAcquired(call, newConnection)
    return newConnection
  }
Copy the code

Let’s read on to see how it actually makes the connection:

fun connect( connectTimeout: Int, readTimeout: Int, writeTimeout: Int, pingIntervalMillis: Int, connectionRetryEnabled: Boolean, call: Call, eventListener: EventListener) {
    check(protocol == null) { "already connected" }

    var routeException: RouteException? = null
    val connectionSpecs = route.address.connectionSpecs
    val connectionSpecSelector = ConnectionSpecSelector(connectionSpecs)

    if (route.address.sslSocketFactory == null) {
      if (ConnectionSpec.CLEARTEXT !in connectionSpecs) {
        throw RouteException(UnknownServiceException(
            "CLEARTEXT communication not enabled for client"))}val host = route.address.url.host
      if(! Platform.get().isCleartextTrafficPermitted(host)) {
        throw RouteException(UnknownServiceException(
            "CLEARTEXT communication to $host not permitted by network security policy"))}}else {
      if (Protocol.H2_PRIOR_KNOWLEDGE in route.address.protocols) {
        throw RouteException(UnknownServiceException(
            "H2_PRIOR_KNOWLEDGE cannot be used with HTTPS"))}}while (true) {
      try {
        // If a communication tunnel is needed, a tunnel is established. That is, HTTPS is accessed through an HTTP proxy
        if (route.requiresTunnel()) {
          connectTunnel(connectTimeout, readTimeout, writeTimeout, call, eventListener)
          if (rawSocket == null) {
            // We were unable to connect the tunnel but properly closed down our resources.
            break}}else {
          // Establish a socket connection.
          connectSocket(connectTimeout, readTimeout, call, eventListener)
        }
        establishProtocol(connectionSpecSelector, pingIntervalMillis, call, eventListener)
        eventListener.connectEnd(call, route.socketAddress, route.proxy, protocol)
        break
      } catch(e: IOException) { socket? .closeQuietly() rawSocket? .closeQuietly() socket =null
        rawSocket = null
        source = null
        sink = null
        handshake = null
        protocol = null
        http2Connection = null
        allocationLimit = 1

        eventListener.connectFailed(call, route.socketAddress, route.proxy, null, e)

        if (routeException == null) {
          routeException = RouteException(e)
        } else {
          routeException.addConnectException(e)
        }

        if(! connectionRetryEnabled || ! connectionSpecSelector.connectionFailed(e)) {throw routeException
        }
      }
    }

    if (route.requiresTunnel() && rawSocket == null) {
      throw RouteException(ProtocolException(
          "Too many tunnel connections attempted: $MAX_TUNNEL_ATTEMPTS"))
    }

    idleAtNs = System.nanoTime()
  }
Copy the code

At this point, the connection process ends.

CallServerInterceptor

This interceptor is responsible for specific request and response I/O operations, namely writing request data into the Socket and reading response data from the Socket. That is, send the request to the server and wait until the data is parsed to generate a response. Without further ado, go directly to the code:

  override fun intercept(chain: Interceptor.Chain): Response {
    val realChain = chain as RealInterceptorChain
    val exchange = realChain.exchange!!
    val request = realChain.request
    val requestBody = request.body
    val sentRequestMillis = System.currentTimeMillis()

    exchange.writeRequestHeaders(request)

    var invokeStartEvent = true
    var responseBuilder: Response.Builder? = null
    // Determine whether the request header is POST
    if(HttpMethod.permitsRequestBody(request.method) && requestBody ! =null) {
      // If the request header contains a response with "100-continue", wait for it to complete the response, in the execution body, if not, return the normal result.
      if ("100-continue".equals(request.header("Expect"), ignoreCase = true)) {
        exchange.flushRequest()
        responseBuilder = exchange.readResponseHeaders(expectContinue = true)
        exchange.responseHeadersStart()
        invokeStartEvent = false
      }
      if (responseBuilder == null) {
        if (requestBody.isDuplex()) {
          exchange.flushRequest()
          val bufferedRequestBody = exchange.createRequestBody(request, true).buffer()
          requestBody.writeTo(bufferedRequestBody)
        } else {
          val bufferedRequestBody = exchange.createRequestBody(request, false).buffer()
          requestBody.writeTo(bufferedRequestBody)
          bufferedRequestBody.close()
        }
      } else {
        exchange.noRequestBody()
        if(! exchange.connection.isMultiplexed) { exchange.noNewExchangesOnConnection() } } }else {
      exchange.noRequestBody()
    }

    if (requestBody == null| |! requestBody.isDuplex()) { exchange.finishRequest() }if (responseBuilder == null) {
      responseBuilder = exchange.readResponseHeaders(expectContinue = false)!!!!!if (invokeStartEvent) {
        exchange.responseHeadersStart()
        invokeStartEvent = false}}var response = responseBuilder
        .request(request)
        .handshake(exchange.connection.handshake())
        .sentRequestAtMillis(sentRequestMillis)
        .receivedResponseAtMillis(System.currentTimeMillis())
        .build()
    var code = response.code
    // If the response is 100, Expect: 100-continue a successful response, requiring another copy of the response header
    if (code == 100) {
      responseBuilder = exchange.readResponseHeaders(expectContinue = false)!!!!!if (invokeStartEvent) {
        exchange.responseHeadersStart()
      }
      response = responseBuilder
          .request(request)
          .handshake(exchange.connection.handshake())
          .sentRequestAtMillis(sentRequestMillis)
          .receivedResponseAtMillis(System.currentTimeMillis())
          .build()
      code = response.code
    }

    exchange.responseHeadersEnd(response)

    response = if (forWebSocket && code == 101) {
      response.newBuilder()
          .body(EMPTY_RESPONSE)
          .build()
    } else {
      response.newBuilder()
          .body(exchange.openResponseBody(response))
          .build()
    }
    if ("close".equals(response.request.header("Connection"), ignoreCase = true) | |"close".equals(response.header("Connection"), ignoreCase = true)) {
      exchange.noNewExchangesOnConnection()
    }
    if ((code == 204 || code == 205) && response.body? .contentLength() ? : -1L > 0L) {
      throw ProtocolException(
          "HTTP $code had non-zero Content-Length: ${response.body? .contentLength()}")}return response
  }
Copy the code

HTTP packets are encapsulated and parsed.

conclusion

The entire OkHttp functionality is implemented in these five default interceptors. The specific summary is as follows:

  • AddInterceptor (Interceptor), which is set by the developer, does the first Interceptor processing required by the developer, before all interceptors, such as public parameters, headers can be added here.

  • RetryAndFollowUpInterceptor, here will do some of the connection in the initialization, and enrich the work request failed, redirect the subsequent requests. As his name implies, he does retries and some connection tracing.

  • BridgeInterceptor, which builds a network access request for the user and converts the Response from the network request into a Response available to the user, such as adding file type, adding content-Length calculation, and unpacking gZIP.

  • CacheInterceptor, which deals primarily with cache-related processing, caches the requested values according to OkHttpClient’s configuration and cache policy, and can return cached results without network interaction if a local cache is available.

  • The ConnectInterceptor is an interceptor that provides TCP/TLS connections and an HttpCodec that provides encoding and decoding.

  • NetworkInterceptors, which is also set up by the developers themselves, is essentially the same as the first interceptor, but has different uses because of its location. The interceptor added at this location can now see the request and response data, so it can do some network debugging.

  • The CallServerInterceptor is the network data request and response, that is, the actual network I/O operations, reading and writing data through sockets.

This article was first published on my personal blog: OkHttp source code analysis. For more articles, please pay attention to my public number: code farming workplace