The internal key of OkHttp is the implementation of interceptor processing, which encapsulates the network request into each interceptor to achieve the decoupling of the various layers.

Let’s start with a request:

// Create okHttpClient object
OkHttpClient okHttpClient = new OkHttpClient.Builder()
                    .connectTimeout(6, TimeUnit.SECONDS)
                    .readTimeout(6, TimeUnit.SECONDS)
                    .build();

// Create a Request
Request request = new Request.Builder().url(strUrl).build();
// Initiate a synchronization request
try {
            Response response = client.newCall(request).execute();
            return response.body().string();
        } catch (IOException e) {
            e.printStackTrace();
        }
Copy the code

Internal request flow

The resulting code is that OkHttp calls newCall, returns a RealCall object, and calls the Execute synchronization method. RealCall is the class that manages network requests.

## RealCall   
override fun execute(a): Response {
    synchronized(this) { check(! executed) {"Already Executed" }
      executed = true
    }
    // Start timer, send
    transmitter.timeoutEnter()
    transmitter.callStart()
    try {
      // Adapter request
      client.dispatcher.executed(this)
      // Invoke interceptor processing
      return getResponseWithInterceptorChain()
    } finally {
      // End of adapter
      client.dispatcher.finished(this)}}Copy the code

So now we know these three methods. Let’s look at them one by one

The excuted method of dispatcher is called. You can see that there are three array queues for processing

### Dispatcher
	// Prepare the asynchronous call queue
  private val readyAsyncCalls = ArrayDeque<AsyncCall>()

  // Run the asynchronous call queue
  private val runningAsyncCalls = ArrayDeque<AsyncCall>()

 	// Run the synchronous call queue
  private val runningSyncCalls = ArrayDeque<RealCall>()
Copy the code

In our last step, we called

### Dispatcher

/ / synchronize
@Synchronized internal fun executed(call: RealCall) {
  // Put it into a running queue
    runningSyncCalls.add(call)
  }

/ / asynchronous
internal fun enqueue(call: AsyncCall) {
    synchronized(this) {
      readyAsyncCalls.add(call)

      // Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to
      // the same host.
      if(! call.get().forWebSocket) {
        val existingCall = findExistingCallWithHost(call.host())
        if(existingCall ! =null) call.reuseCallsPerHostFrom(existingCall)
      }
    }
    promoteAndExecute()
  }


//Dispath core logic
// called when enqueue is asynchronous
// Promote qualified calls from [readyAsyncCalls] to [runningAsyncCalls] and run them on the executor service. Cannot be called with synchronous because executing the call can call user code. @return true If the scheduler is currently running the call.
  private fun promoteAndExecute(a): Boolean{ assert(! Thread.holdsLock(this))

    val executableCalls = mutableListOf<AsyncCall>()
    val isRunning: Boolean
    synchronized(this) {
      val i = readyAsyncCalls.iterator()
      while (i.hasNext()) {
        val asyncCall = i.next()

        if (runningAsyncCalls.size >= this.maxRequests) break // Greater than the maximum number of requests
        if (asyncCall.callsPerHost().get() > =this.maxRequestsPerHost) continue // Greater than the maximum capacity
// Delete one from readyAsyncCalls when it is retrieved
        i.remove()
        asyncCall.callsPerHost().incrementAndGet()
        executableCalls.add(asyncCall)
        // Move to runningAsyncCalls
        runningAsyncCalls.add(asyncCall)
      }
      isRunning = runningCallsCount() > 0
    }

    // Commit executables to the task thread pool
    for (i in 0 until executableCalls.size) {
      val asyncCall = executableCalls[i]
      asyncCall.executeOn(executorService)
    }

    return isRunning
  }

Copy the code

The promoteAndExecute method is called after enqueueing and the execution of the previous task has finished. Let’s look again at the use of the executorService thread pool.

### Dispatcher
@get:Synchronized
  @get:JvmName("executorService") val executorService: ExecutorService
    get() {
      if (executorServiceOrNull == null) {
        // A CachedThreadPool is used to quickly process a large number of short tasks
        executorServiceOrNull = ThreadPoolExecutor(0.Int.MAX_VALUE, 60, TimeUnit.SECONDS,
            SynchronousQueue(), threadFactory("OkHttp Dispatcher".false))}return executorServiceOrNull!!
    }
Copy the code

SynchronousQueue SynchronousQueue. This queue is like a baton that must be passed from queue to queue at the same time. Because CachedThreadPool threads are created indefinitely, there is no queue waiting, so SynchronousQueue is used.

Analysis, then look at the second method getResponseWithInterceptorChain ` ()

### RealCall  
@Throws(IOException::class)
  fun getResponseWithInterceptorChain(a): Response {
    // Create an array of interceptors
    val interceptors = mutableListOf<Interceptor>()
    // Add a custom interceptorInterceptors + = client. Interceptors 】// Add retry and redirection interceptors
    interceptors += RetryAndFollowUpInterceptor(client)
    // Add bridge interceptor
    interceptors += BridgeInterceptor(client.cookieJar)
    // Add a cache interceptor
    interceptors += CacheInterceptor(client.cache)
    // Add connection pool interceptor
    interceptors += ConnectInterceptor
    // Add network interceptor
    if(! forWebSocket) { interceptors += client.networkInterceptors }// Add network request interceptor
    interceptors += CallServerInterceptor(forWebSocket)

    // Creates a chain of interceptors, the final callers of all interceptors
    val chain = RealInterceptorChain(interceptors, transmitter, null.0, originalRequest, this,
        client.connectTimeoutMillis, client.readTimeoutMillis, client.writeTimeoutMillis)

    var calledNoMoreExchanges = false
    try {
      // Start the interceptor chain
      val response = chain.proceed(originalRequest)
      .....
      return response
    } 
    ......
  }
Copy the code

As you can see, the interceptor set by addInterceptor runs before any other Intercept processing. This is followed by the default five interceptors.

The interceptor role
Apply interceptor Processing header information,
RetryAndFollowUpInterceptor Responsible for error retries, redirects
BridgeInterceptor Fill the HEAD header in the HTTP request protocol
CacheInterceptor A cache interceptor that does not initiate a network request if it hits the cache.
ConnectInterceptor Connection pool interceptor, the core of Okhttp
networkInterceptors Custom network interceptor for monitoring network transmission data
CallServerInterceptor Be responsible for sending and receiving the network

Interceptors will be analyzed. Run the stream first. Chain.proceed (originalRequest) is eventually executed to take a look at the internal implementation

  ### RealInterceptorChain
  @Throws(IOException::class)
  fun proceed(request: Request, transmitter: Transmitter, exchange: Exchange?).: Response {
    if (index >= interceptors.size) throw AssertionError()

    // The number of times that the current interceptor calls PROCEED
    calls++

    // Exchange transmits a single HTTP request and response pair
    // Verify that the incoming request is being invoked and that the previous network interceptor changed the URL or port,
    check(this.exchange == null || this.exchange.connection()!! .supportsUrl(request.url)) {"network interceptor ${interceptors[index - 1]} must retain the same host and port"
    }

    // Make sure that chain.proceed() is the only call, and that interceptors after connectInteceptor can only call proceed once at most!!
    check(this.exchange == null || calls <= 1) {
      "network interceptor ${interceptors[index - 1]} must call proceed() exactly once"
    }

    // Create the next interceptor chain processing
    val next = RealInterceptorChain(interceptors, transmitter, exchange,
        index + 1, request, call, connectTimeout, readTimeout, writeTimeout)
    
    // Fetch the index interceptor and call its Intercept method to pass in the new chain.
    val interceptor = interceptors[index]

    // The chain of responsibility design calls interceptors in turn
    @Suppress("USELESS_ELVIS")
    val response = interceptor.intercept(next) ?: throw NullPointerException(
        "interceptor $interceptor returned null")

    Make sure that the ConnectInterceptor and subsequent interceptors call proceed at least once!!
    check(exchange == null || index + 1 >= interceptors.size || next.calls == 1) {
      "network interceptor $interceptor must call proceed() exactly once"} check(response.body ! =null) { "interceptor $interceptor returned a response with no body" }

    return response
  }
Copy the code

The chain of responsibility limits the number of times that proceed can be called by different interceptors. The ConnectInterceptor interceptor and subsequent interceptors can only be called once. Because the work of shaking hands, connecting, and sending requests takes place in these interceptors, a network request has been formally issued. An interceptor before that can proceed multiple times.

The main implementation is to create the next level of responsibility chain, and then take out the current interceptor, call the Intercept method and pass in the created responsibility chain, through the interception chain call level by level, and finally execute to the CallServerInterceptor intercept return Response object.

Cache interceptor

To understand how a cache interceptor works, you must first understand Http caching. Http caches fall into two categories (mandatory caches and comparison caches).

Thoroughly understand HTTP caching mechanisms and principles

Mandatory cache

Mandatory caching is when a network request response header indicates Expires or cache-Control indicates max-age information, and the client calculates that the Cache is not expired. In this case, the local Cache content can be used directly without actually making a network request.

Compared to the cache

The first time the browser requests data, the server returns the cache id along with the data to the client, which backs up both to the cache database. When requesting data again, the client sends the backup cache ID to the server. The server checks the backup cache ID. After the check succeeds, the server returns the 304 status code to inform the client that the backup data is available.

In contrast caching, the passing of cache identity needs to be understood.

  • Last-madified: When the server responds to a request, it tells the browser when the resource was Last modified.
  • If-modified-since: This field is used to notify the server of the last modification time of the resource returned by the server during the last request.
  • Etag :(priority over last-modified/if-modified-since) when the server responds to a request, it tells the browser the unique identity of the current resource on the server.
  • If-none-match: this field is used to notify the server of the unique identifier used by the client to cache data when a request is made again.

For mandatory caching, the server notifies the browser of a cache time, within which the next request is made, the cache will be used directly. For comparison cache, Etag and Last-Modified in the cache information are sent to the server through a request, which is verified by the server. When the 304 status code is returned, the browser directly uses the cache.

In CacheInterceptor, the Intercept method is called

###CacheInterceptor

  @Throws(IOException::class)
  override fun intercept(chain: Interceptor.Chain): Response {
    // Get the response from the cache with the REQUESTD URL
    valcacheCandidate = cache? .get(chain.request())

    val now = System.currentTimeMillis()

    // Get the cache policy based on the Request candidate Response
    // The cache policy determines whether to use the cache
    / / strategy.net workRequest is null, do not use the network;
    //strategy.cacheResponse is null and no cache is used
    val strategy = CacheStrategy.Factory(now, chain.request(), cacheCandidate).compute()
    val networkRequest = strategy.networkRequest
    val cacheResponse = strategy.cacheResponse

    // Whether the processing hits the network or the local cache
    // Update statistics according to the cache policy: number of requests, number of network requests, and number of cache usescache? .trackResponse(strategy)if(cacheCandidate ! =null && cacheResponse == null) {
      // There is a cache but it cannot be usedcacheCandidate.body? .closeQuietly() }// Network and cache cannot be used to return 504
    if (networkRequest == null && cacheResponse == null) {
      return Response.Builder()
          .request(chain.request())
          .protocol(Protocol.HTTP_1_1)
          .code(HTTP_GATEWAY_TIMEOUT)
          .message("Unsatisfiable Request (only-if-cached)")
          .body(EMPTY_RESPONSE)
          .sentRequestAtMillis(-1L)
          .receivedResponseAtMillis(System.currentTimeMillis())
          .build()
    }

    // If you don't use the network cacheResponse is definitely not null, then use the cache
    if (networkRequest == null) {
      returncacheResponse!! .newBuilder() .cacheResponse(stripBody(cacheResponse)) .build() }//networkRequest is not null, so the interceptor chain continues processing
    var networkResponse: Response? = null
    try {
      networkResponse = chain.proceed(networkRequest)
    } finally {
      // If we're crashing on I/O or otherwise, don't leak the cache body.
      if (networkResponse == null&& cacheCandidate ! =null) { cacheCandidate.body? .closeQuietly() } }// If the network request returns 304, the server resource has not been modified
    // Update the cache based on the network response and cache response
    if(cacheResponse ! =null) {
      if(networkResponse? .code == HTTP_NOT_MODIFIED) {valresponse = cacheResponse.newBuilder() .headers(combine(cacheResponse.headers, networkResponse.headers)) .sentRequestAtMillis(networkResponse.sentRequestAtMillis) .receivedResponseAtMillis(networkResponse.receivedResponseAtMillis) .cacheResponse(stripBody(cacheResponse)) .networkResponse(stripBody(networkResponse)) .build() networkResponse.body!! .close()// Update the cache after combining headers but before stripping the
        // Content-Encoding header (as performed by initContentStream()).cache!! .trackConditionalCacheHit() cache.update(cacheResponse, response)return response
      } else {
        // If it is not 304, the server resource is updated and the cache body is closedcacheResponse.body? .closeQuietly() } }// Pass in network data and cache
    valresponse = networkResponse!! .newBuilder() .cacheResponse(stripBody(cacheResponse)) .networkResponse(stripBody(networkResponse)) .build()if(cache ! =null) {
      / / network response can cache (CacheStrategy isCacheable response processing, according to the code of the response and the response. CacheControl. NoStore)
      if (response.promisesBody() && CacheStrategy.isCacheable(response, networkRequest)) {
        // Write cache
        val cacheRequest = cache.put(response)
        return cacheWritingResponse(cacheRequest, response)
      }

      //OkHttp only caches get requests by default, because GET requests are usually persistent, and POST is usually an interactive operation that doesn't make much sense to cache
	  // Remove the cache without a GET request
      if (HttpMethod.invalidatesCache(networkRequest.method)) {
        try {
          cache.remove(networkRequest)
        } catch (_: IOException) {
          // The cache cannot be written.}}}return response
  }

Copy the code

Using the cache policy CacheStrategy is used to determine whether caching is used.

It can be understood as:

  1. Network and cache are down. Return 504
  2. If networkResponse is null, cacheResponse is definitely not null, so cache is used
  3. NetworkResponse is not null, regardless of whether cacheResponse is null, it directly requests the network
  4. CacheResponse is not null. If a network request returns 304, the server resource is not modified
  5. If it is not 304, the server resource is updated, and the network data and cache are passed in
  6. Returns if the network response is cacheablecacheWritingResponse
  7. The last step is to remove the cache instead of a GET request
// Get the cache policy based on the Request candidate Response
val strategy = CacheStrategy.Factory(now, chain.request(), cacheCandidate).compute()
Copy the code

What’s being handled in CacheStrategy?

###CacheStrategy
// Given a request and cached response, this determines whether to use networking, caching, or both. Selecting a cache policy may add conditions to the request (such as the "if-Modified-since" header for the condition GET) or warnings to the cached response If cached data may be outdated.
// Initialize the interior
  class Factory(
    private val nowMillis: Long.internal val request: Request,
    private val cacheResponse: Response?
  ) {
    // The server time that the service caches the response
    private var servedDate: Date? = null
    private var servedDateString: String? = null

    // Last modified time of cached response
    private var lastModified: Date? = null
    private var lastModifiedString: String? = null

    // The expiration time of cached responses
    private var expires: Date? = null

    // Set to specify the timestamp when the cached Http request is first initiated
    private var sentRequestMillis = 0L

    // The timestamp of the first cached response received
    private var receivedResponseMillis = 0L

    // Cache the Etag of the response
    private var etag: String? = null

    // The lifetime of the cached response
    private var ageSeconds = -1

    /** * Returns true if computeFreshnessLifetime used a heuristic. If we used a heuristic to serve a * cached response older than 24 hours, we are required to attach a warning. */
    private fun isFreshnessLifetimeHeuristic(a): Boolean {
      returncacheResponse!! .cacheControl.maxAgeSeconds == -1 && expires == null
    }

    init {
      if(cacheResponse ! =null) {
        // Request time, response time, expiration time, modification time, and resource markers are all retrieved from cached responses
        this.sentRequestMillis = cacheResponse.sentRequestAtMillis
        this.receivedResponseMillis = cacheResponse.receivedResponseAtMillis
        val headers = cacheResponse.headers
        for (i in 0 until headers.size) {
          val fieldName = headers.name(i)
          val value = headers.value(i)
          when {
            fieldName.equals("Date", ignoreCase = true) -> {
              servedDate = value.toHttpDateOrNull()
              servedDateString = value
            }
            fieldName.equals("Expires", ignoreCase = true) -> {
              expires = value.toHttpDateOrNull()
            }
            fieldName.equals("Last-Modified", ignoreCase = true) -> {
              lastModified = value.toHttpDateOrNull()
              lastModifiedString = value
            }
            fieldName.equals("ETag", ignoreCase = true) -> {
              etag = value
            }
            fieldName.equals("Age", ignoreCase = true) -> {
              ageSeconds = value.toNonNegativeInt(-1)}}}}}..... }Copy the code

Now, how does it return CacheStrategy

### CacheStrategy
// Use cacheResponse to return policies that satisfy the request.
fun compute(a): CacheStrategy {
      val candidate = computeCandidate()

      // Disable the use of insufficient network cache
      if(candidate.networkRequest ! =null && request.cacheControl.onlyIfCached) {
        return CacheStrategy(null.null)}return candidate
    }

    // Returns a policy that assumes the request can use the network.
    private fun computeCandidate(a): CacheStrategy {
      // No cached response
      if (cacheResponse == null) {
        return CacheStrategy(request, null)}// HTTPS but no handshake, network request
      if (request.isHttps && cacheResponse.handshake == null) {
        return CacheStrategy(request, null)}// The network response cannot be cached
      if(! isCacheable(cacheResponse, request)) {return CacheStrategy(request, null)}// Cache control instruction
      // Do not use cache
      // Add header if-modified-since if-none-match
      val requestCaching = request.cacheControl
      if (requestCaching.noCache || hasConditions(request)) {
        return CacheStrategy(request, null)}val responseCaching = cacheResponse.cacheControl

      // Cache age
      val ageMillis = cacheResponseAge()
      // Cache validity period
      var freshMillis = computeFreshnessLifetime()

      
      if(requestCaching.maxAgeSeconds ! = -1) {
        freshMillis = minOf(freshMillis, SECONDS.toMillis(requestCaching.maxAgeSeconds.toLong()))
      }

      var minFreshMillis: Long = 0
      if(requestCaching.minFreshSeconds ! = -1) {
        minFreshMillis = SECONDS.toMillis(requestCaching.minFreshSeconds.toLong())
      }

      var maxStaleMillis: Long = 0
      if(! responseCaching.mustRevalidate && requestCaching.maxStaleSeconds ! = -1) {
        maxStaleMillis = SECONDS.toMillis(requestCaching.maxStaleSeconds.toLong())
      }

      If the response header does not require ignoring the local cache and the consolidated cache age is less than the consolidated expiration time, then the cache is available
      if(! responseCaching.noCache && ageMillis + minFreshMillis < freshMillis + maxStaleMillis) {val builder = cacheResponse.newBuilder()
        if (ageMillis + minFreshMillis >= freshMillis) {
          builder.addHeader("Warning"."110 HttpURLConnection \"Response is stale\"")}val oneDayMillis = 24 * 60 * 60 * 1000L
        if (ageMillis > oneDayMillis && isFreshnessLifetimeHeuristic()) {
          builder.addHeader("Warning"."113 HttpURLConnection \"Heuristic expiration\"")}// The cache does not expire
        return CacheStrategy(null, builder.build())
      }

      // The cache has expired
      // Find Etag, lastModified, servedDate in the cache
      // Find the condition to add to the request. If the conditions are met, the response body is not sent.
      val conditionName: String
      val conditionValue: String?
      when{ etag ! =null -> {
          conditionName = "If-None-Match"conditionValue = etag } lastModified ! =null -> {
          conditionName = "If-Modified-Since"conditionValue = lastModifiedString } servedDate ! =null -> {
          conditionName = "If-Modified-Since"
          conditionValue = servedDateString
        }

        // Return request for none
        else -> return CacheStrategy(request, null) // No condition! Make a regular request.
      }

    // These parameters are added directly to the header file to make the conditional request
      val conditionalRequestHeaders = request.headers.newBuilder()
      conditionalRequestHeaders.addLenient(conditionName, conditionValue!!)

      val conditionalRequest = request.newBuilder()
          .headers(conditionalRequestHeaders.build())
          .build()
      ConditionalRequest conditionalRequest conditionalRequest conditionalRequest conditionalRequest conditionalRequest conditionalRequest conditionalRequest conditionalRequest If yes, return to 304. If not, the network request will be executed.
      return CacheStrategy(conditionalRequest, cacheResponse)
    }
Copy the code

CacheStrategy handles the cache request process:

  1. No cache, HTTPS but no handshake, request no cache, ignore cache, or manually configure cache expiration are all direct network requests.

  2. If none of the above is true, then a cache is used (perhaps with a warning) if the cache is not expired.

  3. If the cache expires, but the response header has Etag, Last-Modified, Date, these headers are added for conditional network requests.

  4. If the cache expires and the response header is not set to Etag, last-Modified, Date, the network request is made.

One more question, where do you store cached responses?

No nonsense, just look at it, put it in the DiskLruCache.

### CacheInterceptor
valcacheCandidate = cache? .get(chain.request())

### Cahce
  internal fun get(request: Request): Response? {
    val key = key(request.url)
    val snapshot: DiskLruCache.Snapshot = try{ cache[key] ? :return null
    } catch (_: IOException) {
      return null // Give up because the cache cannot be read.}...return response
  }
Copy the code

The implementation of Okhttp caching should now be clear.

Connection pool interceptor

Finally, the core interceptor

What does the ConnectInterceptor do

### ConnectInterceptor

object ConnectInterceptor : Interceptor {

  @Throws(IOException::class)
  override fun intercept(chain: Interceptor.Chain): Response {
    val realChain = chain as RealInterceptorChain
    val request = realChain.request()
    val transmitter = realChain.transmitter()

    // Network is required to fulfill this requirement. Used to verify that the condition not GET is met
    valdoExtensiveHealthChecks = request.method ! ="GET"
    //Exchange real IO operations: write requests read responses
    //Transmitter is a Transmitter that sends its request from the application end to the network layer. It holds the connection, response and flow of the request. A request corresponds to a Transmitter instance and a data flow
    val exchange = transmitter.newExchange(chain, doExtensiveHealthChecks)

    return realChain.proceed(request, transmitter, exchange)
  }
}
Copy the code

Next, what has transmitter. NewExchange done

// Return a new exchange to carry the new request and response
internal fun newExchange(chain: Interceptor.Chain, doExtensiveHealthChecks: Boolean): Exchange { synchronized(connectionPool) { check(! noMoreExchanges) {"released" }
      check(exchange == null) {
        "cannot make a new request because the previous response is still open: " +
            "please call response.close()"}}//exchangeFinde is responsible for the real IO operations - write request, read response
    valcodec = exchangeFinder!! .find(client, chain, doExtensiveHealthChecks)// Manage IO operations, which can be understood as data flows, are wrapped around Exchange Dec, adding event callbacks,
  // Each request corresponds to one Exchange instance. Pass it to the next interceptor, CallServerInterceptor
    val result = Exchange(this, call, eventListener, exchangeFinder!! , codec) synchronized(connectionPool) {this.exchange = result
      this.exchangeRequestDone = false
      this.exchangeResponseDone = false
      return result
    }
  }
Copy the code

ExchangeFinder

ExchangeFinder is primarily responsible for the real IO operations of write requests and read responses, essentially finding a TCP connection for the request.

###  ExchangeFinder 
// Find the connection and return it when it is healthy, and if it is not, repeat the process until a healthy connection is found
@Throws(IOException::class)
  private fun findHealthyConnection(
    connectTimeout: Int,
    readTimeout: Int,
    writeTimeout: Int,
    pingIntervalMillis: Int,
    connectionRetryEnabled: Boolean,
    doExtensiveHealthChecks: Boolean
  ): RealConnection {
    while (true) {
      // Find the connection
      val candidate = findConnection(
          connectTimeout = connectTimeout,
          readTimeout = readTimeout,
          writeTimeout = writeTimeout,
          pingIntervalMillis = pingIntervalMillis,
          connectionRetryEnabled = connectionRetryEnabled
      )

      // If this is a new connection, you can skip the check and return directly
      synchronized(connectionPool) {
        if (candidate.successCount == 0) {
          return candidate
        }
      }

      // Unhealthy continue to find
      if(! candidate.isHealthy(doExtensiveHealthChecks)) {// The flag is not available
        candidate.noNewExchanges()
        continue
      }

      return candidate
    }
  }
Copy the code

Loop to find connections. If it is an unhealthy connection, mark it as not unavailable and continue looking.

### ExchangeFinder  
// Return a connection to host a new stream.
// If so, the existing connection is selected first, then the pool, and finally the new connection is established.
@Throws(IOException::class)
  private fun findConnection(
    connectTimeout: Int,
    readTimeout: Int,
    writeTimeout: Int,
    pingIntervalMillis: Int,
    connectionRetryEnabled: Boolean
  ): RealConnection {
    var foundPooledConnection = false
    var result: RealConnection? = null
    var selectedRoute: Route? = null
    var releasedConnection: RealConnection?
    val toClose: Socket?
    synchronized(connectionPool) {
      // The connection request was cancelled
      if (transmitter.isCanceled) throw IOException("Canceled")
      hasStreamFailure = false // This is a fresh attempt.

      // Assign real connections
      releasedConnection = transmitter.connection
      // A connection has been allocated, but it is marked as unavailable, so try to release the call
      toClose = if(transmitter.connection ! =null&& transmitter.connection!! .noNewExchanges) { transmitter.releaseConnectionNoEvents() }else {
        null
      }

      if(transmitter.connection ! =null) {
        // There is an allocated connection
        result = transmitter.connection
        releasedConnection = null
      }

      if (result == null) {
        // This will try to fetch from the connection pool (**********)
        if (connectionPool.transmitterAcquirePooledConnection(address, transmitter, null.false)) {
          foundPooledConnection = true
          result = transmitter.connection
        } else if(nextRouteToTry ! =null) {
          // There are routes to try
          selectedRoute = nextRouteToTry
          nextRouteToTry = null
        } else if (retryCurrentRoute()) {
          // Returns true if the route used for the current connection should be retried, even if the connection itself is unhealthy.selectedRoute = transmitter.connection!! .route() } } }// Close the connection to be closedtoClose? .closeQuietly()if(releasedConnection ! =null) {
      // Callback the connection release event
      eventListener.connectionReleased(call, releasedConnection!!)
    }
    if (foundPooledConnection) {
      // Call back to get the connection event
      eventListener.connectionAcquired(call, result!!)
    }
    if(result ! =null) {
      // If we found an already-allocated or pooled connection, we're done.
      return result!!
    }

    // Block the routing information
    var newRouteSelection = false
    if (selectedRoute == null && (routeSelection == null| |! routeSelection!! .hasNext())) { newRouteSelection =true
      routeSelection = routeSelector.next()
    }

    var routes: List<Route>? = null
    synchronized(connectionPool) {
      if (transmitter.isCanceled) throw IOException("Canceled")

      if (newRouteSelection) {
        // Now that we have a set of IP addresses, try again to get connections from the pool. This may match due to join merge.routes = routeSelection!! .routesif (connectionPool.transmitterAcquirePooledConnection(
                address, transmitter, routes, false)) {
          foundPooledConnection = true
          result = transmitter.connection
        }
      }

      // Create a new connection if the pool is not found
      if(! foundPooledConnection) {if (selectedRoute == null) {
          // The route is addedselectedRoute = routeSelection!! .next() }// Create a connection and assign it to this assignment immediately. This allows asynchronous Cancel () to interrupt our impending handshake
        result = RealConnection(connectionPool, selectedRoute!!)
        connectingConnection = result
      }
    }

    // A second time, an available connection was found
    if (foundPooledConnection) {
      eventListener.connectionAcquired(call, result!!)
      return result!!
    }

    // If not found the second time
    // To create a connection, set up a connection with the server through TCP + TLS handshakeresult!! .connect( connectTimeout, readTimeout, writeTimeout, pingIntervalMillis, connectionRetryEnabled, call, eventListener )// Delete failure blacklist by Ip addressconnectionPool.routeDatabase.connected(result!! .route())var socket: Socket? = null
    synchronized(connectionPool) {
      connectingConnection = null
      // The last attempt at connection merging will only happen if we attempt multiple concurrent connections to the same host. (Make sure HTTP2 is multiplexed)
      if (connectionPool.transmitterAcquirePooledConnection(address, transmitter, routes, true)) {
        // If so, close the connection we created and return the obtained connectionresult!! .noNewExchanges =truesocket = result!! .socket() result = transmitter.connection// The successful route can be used for the next attempt.
        nextRouteToTry = selectedRoute
      } else{ connectionPool.put(result!!) transmitter.acquireConnectionNoEvents(result!!) } } socket? .closeQuietly() eventListener.connectionAcquired(call, result!!)return result!!
  }
Copy the code

Find the connection, after three searches from the connection pool.

  1. If there is a connection, just use it
  2. There are no allocated connections to find in the connection pool,connectionPool.transmitterAcquirePooledConnectionSome also return directly
  3. If at first you don’t succeed, get the routing informationrouteSelector.next()If you have the Ip address and then get it again from the connection pool, it may match because of connection merge
  4. If the second attempt fails, create a connection and set up a connection with the server through a TCP+TLS handshake
  5. The last attempt is to ensure the multiplexing of the Http2.0 connection and close the link you just created
  6. If the third attempt fails, the newly created connection is pooled

ConnectionPool

HTTP requests with the same Address share a connection. ConnectioonPool implements connection multiplexing.

 ###  ConnectionPool
// The maximum number of idle connections is 5 and the maximum idle time is 5 minutes
class ConnectionPool(
  maxIdleConnections: Int,
  keepAliveDuration: Long,
  timeUnit: TimeUnit
) {
  internal val delegate = RealConnectionPool(maxIdleConnections, keepAliveDuration, timeUnit)

  constructor() : this(5.5, TimeUnit.MINUTES)

  /** Returns the number of idle connections in the pool. */
  fun idleConnectionCount(a): Int = delegate.idleConnectionCount()

  /** Returns total number of connections in the pool. */
  fun connectionCount(a): Int = delegate.connectionCount()

  /** Close and remove all idle connections in the pool. */
  fun evictAll(a) {
    delegate.evictAll()
  }
}
Copy the code

Currently, the pool can hold up to five idle connections that will be ejected after five minutes of inactivity. You can see that the class that actually implements it is RealConnectionPool. So you know that this is where the actual management of the connection is within OKHTTP.

Let’s look at what happens when a connection is put into a connection pool:

###  RealConnectionPool
fun put(connection: RealConnection) {
    assert(Thread.holdsLock(this))
  // You can see that when you add the connection pool, you also start a cleanup thread
    if(! cleanupRunning) { cleanupRunning =true
      executor.execute(cleanupRunnable)
    }
    connections.add(connection)
  }

 private val cleanupRunnable = object : Runnable {
    override fun run(a) {
      // Loop cleanup
      while (true) {
        val waitNanos = cleanup(System.nanoTime())
        if (waitNanos == -1L) return
        try {
          // Wait until the next cleanup
          this@RealConnectionPool.lockAndWaitNanos(waitNanos)
        } catch (ie: InterruptedException) {
          // Will cause the thread to exit unless other connections are created!
          evictAll()
        }
      }
    }
  }
Copy the code

The reason for starting a cleanup thread is that the connection pool has a limit on the number of free connections and the maximum idle time, so you need to clean up if you don’t meet that limit. Now look at the cleanup() method

###  RealConnectionPool
// Perform maintenance on this pool and expel the longest idle connection if the connection exceeds the keepactive limit or the idle connection limit.
// Returns the sleep duration in nanos until the next scheduled call to this method.
// If no further cleanup is required, -1 is returned.
fun cleanup(now: Long): Long {
    var inUseConnectionCount = 0  // The number of connections being used
    var idleConnectionCount = 0		// Number of idle connections
    var longestIdleConnection: RealConnection? = null  // The connection with the longest idle time
    var longestIdleDurationNs = Long.MIN_VALUE  // The longest idle time

    synchronized(this) {
      // Find the number of connections to clean next time
      for (connection in connections) {
        // If the connection is in use, continue searching
        if (pruneAndGetAllocationCount(connection, now) > 0) {
          inUseConnectionCount++
          continue
        }

        idleConnectionCount++

        // If the connection is ready to be evicted, we're done.
        val idleDurationNs = now - connection.idleAtNanos
         // Assign the longest idle time connection
        if (idleDurationNs > longestIdleDurationNs) {
          longestIdleDurationNs = idleDurationNs
          longestIdleConnection = connection
        }
      }

      when {
        // If the maximum idle time is greater than 5 minutes or the number of idle times is greater than 5, it is removed
        longestIdleDurationNs >= this.keepAliveDurationNs
            || idleConnectionCount > this.maxIdleConnections -> {
          
          connections.remove(longestIdleConnection)
        }
        // Free connections exist, return how much time is left to clean up
        idleConnectionCount > 0- > {// A connection will be ready to evict soon.
          return keepAliveDurationNs - longestIdleDurationNs
        }
        // The connection in use exists
        inUseConnectionCount > 0- > {// All connections are in use. It's at least staying active for a period of time until we're running again.
          return keepAliveDurationNs
        }
        else- > {// No connection, no cleanup
          cleanupRunning = false
          return -1} } } longestIdleConnection!! .socket().closeQuietly()// Close remove red lotus root immediately clean
    return 0
  }
Copy the code

So to summarize

  1. If the maximum idle time is greater than 5 minutes or the idle connection is greater than 5 minutes, remove and turn off the maximum idle time.
  2. If there are free connections, return to the remaining 5 minutes
  3. If there are no idle connections, wait 5 minutes to clean up again
  4. No connection does not clean up
###  RealConnectionPool
// Check if there are any connections in use
private fun pruneAndGetAllocationCount(connection: RealConnection, now: Long): Int {
    val references = connection.transmitters
    var i = 0
    while (i < references.size) {
      val reference = references[i]

      if (reference.get() != null) {
        i++
        continue
      }

      // If there is any leakage from the TransmitterReference, remove it
      val transmitterRef = reference as TransmitterReference
      val message = "A connection to ${connection.route().address.url} was leaked. " +
          "Did you forget to close a response body?"
      Platform.get().logCloseableLeak(message, transmitterRef.callStackTrace)

      references.removeAt(i)
      connection.noNewExchanges = true

      // If the connection queue is out of control, reset the idle time within 5 minutes
      if (references.isEmpty()) {
        connection.idleAtNanos = now - keepAliveDurationNs
        return 0}}// If not 0, the champions league is still in use
    return references.size
  }
Copy the code

For CCD size greater than 1, the connection is multiplexed by multiple requests.

How do I remove a connection from the pool?

### RealConnectionPool
// Get the connection for the corresponding address from the connection pool. If routes are not empty, you may get Http/2 connections because of the merge
fun transmitterAcquirePooledConnection(
    address: Address,
    transmitter: Transmitter,
    routes: List<Route>? , requireMultiplexed:Boolean
  ): Boolean {
    assert(Thread.holdsLock(this))
    for (connection in connections) {
      // Is not multiplexing
      if(requireMultiplexed && ! connection.isMultiplexed)continue
      if(! connection.isEligible(address, routes))continue
      transmitter.acquireConnectionNoEvents(connection)
      return true
    }
    return false
  }



Copy the code

There is one way to look at isEligible

### RealConnection
// Determine whether the connection can point to the data stream of address
  internal fun isEligible(address: Address, routes: List<Route>?: Boolean {
    // / The connection will no longer accept new data streams, false
    if (transmitters.size >= allocationLimit || noNewExchanges) return false

    // Match the non-host part of address
    if (!this.route.address.equalsNonHost(address)) return false

    // Match address's host, return true
    if (address.url.host == this.route().address.url.host) {
      return true // This connection is a perfect match.
    }

    // At this point we don't have a hostname match. But we still be able to carry the request if
    // our connection coalescing requirements are met. See also:
    // https://hpbn.co/optimizing-application-delivery/#eliminate-domain-sharding
    // https://daniel.haxx.se/blog/2016/08/18/http2-connection-coalescing/

    1. The connection must be HTTP/2.
    if (http2Connection == null) return false

    // 2. The IP address matches
    if (routes == null| |! routeMatchesAny(routes))return false

    // 3. Certificate matching
    if(address.hostnameVerifier ! == OkHostnameVerifier)return false
    if(! supportsUrl(address.url))return false

    // 4. Certificates pinning match
    try{ address.certificatePinner!! .check(address.url.host, handshake()!! .peerCertificates) }catch (_: SSLPeerUnverifiedException) {
      return false
    }

    return true // The caller's address can be carried by this connection.
  }
Copy the code

The process of fetching is to traverse the connection pool for a series of matches such as addresses.

At this point, okHttp has covered most of the key points.

Look at the source code with a problem

  1. Okhttp source code flow, thread pool
  2. Okhttp interceptor. AddInterceptor is different from addNetworkdInterceptor
  3. Okhttp responsibility chain mode
  4. What about Okhttp cache
  5. Okhttp connection pooling and Tcp multiplexing

Reference:

Are you familiar with OkHttp?

Interceptor Tip 1: Retry redirection, bridge, cache

OkHttp principle 8 questions