OkHttp source code analysis (1)


integration

Integrate the latest version of [OkHttp][https://github.com…] with the [Gradle][gradle.org] build.

dependencies {
    implementation "Com. Squareup. Okhttp3: okhttp: 4.9.0"
}
Copy the code

use

Initialize the

    // Initialize the client
    val httpClient:OkHttpClient = OkHttpClient.Builder()
        .callTimeout(Duration.ofMillis(TIMEOUT_CALL))
        .connectTimeout(Duration.ofMillis(TIMEOUT_CONNECT))
        .readTimeout(Duration.ofMillis(TIMEOUT_READ))
        .writeTimeout(Duration.ofMillis(TIMEOUT_WRITE))
        .build()

    // Instantiate the request body
    val request = Request.Builder()
        .url("https://github.com/api")
        .header("Content-Type"."application/json")
        .build()

    // instantiate Call
    val call =httpClient.newCall(request)

Copy the code

If synchronous request is used

    valresposne = call.execute() resposne? .let { response -> TODO("Do something")}Copy the code

If asynchronous request is used

   // If asynchronous request is used
    call.enqueue(object :Callback{
        override fun onFailure(call: Call, e: IOException) {
            TODO("Not yet implemented")}override fun onResponse(call: Call, response: Response) {
            TODO("Not yet implemented")}})Copy the code

Related classes

OkHttpClient
open class OkHttpClient internal constructor(
  builder: Builder
) : Cloneable, Call.Factory, WebSocket.Factory {
 	// Asynchronous task management class, with an internal thread pool
    val dispatch:Dispatch;
    
    // HTTP connection pool, used for connection reuse
    val connectionPool:ConectionPool;
    
    // Apply interceptor and network interceptor
    val interceptors:List<Interceptor>;
    val networkInterceptors:List<Interceptor>;
    
    / / parameters
    val connectTimeoutMillis:Int;
    val readTimeoutMillis:Int;
    val writeTimeoutMillis:Int;
    
    / / other
}    
Copy the code

The dispatch seen in the OkHttpClient class is used to manage asynchronous tasks and reuse thread pools, while the connectionPool is used to reuse Http connection pools. These two resources are properly configured to achieve OkHttp’s characteristics of high concurrency and low consumption.

Also,OkHttpClient implements the call.factory interface to the Call Factory method

  fun interface Factory {
    fun newCall(request: Request): Call
  }
Copy the code

RealCall objects can be obtained through newCall. The Call interface class implements the Cloneable interface, but the OkHttpClient implementation does not use object pooling.

  override fun newCall(request: Request): Call = RealCall(this, request, forWebSocket = false)
Copy the code
Request

Request is an abstraction of the Request body.

class Request internal constructor(
  @get:JvmName("url") val url: HttpUrl,
  @get:JvmName("method") val method: String,
  @get:JvmName("headers") val headers: Headers,
  @get:JvmName("body") valbody: RequestBody? .internal val tags: Map<Class<*>, Any>
) {
}
Copy the code

note

  • methodFields implementGET,POST,HEAD,PUT,PATCH,DELETE
  • ifbodyIf the field is empty or is itself mutable, thenrequestObjects are mutable
Response

Response is an abstraction of the Response body

class Response internal constructor(
	val request:Request,
    val message:String,
    val code:Int.val headers:Headers,
    val body:ResponseBody,
    ...
)
Copy the code

note

  • The object is immutable. becausebodyFields are one-time outputs. That is, the state is closed after reading.
Call

Call is a request ready to be executed.

interface Call : Cloneable {
	
    fun request(a):Request
    
    fun execute(a):Response
    
    fun enqueue(responseCallback:callback)
    
    fun cancel(a)
    
    fun isExecuted(a):Boolean
    
    fun isCanceled(a):Boolean
    
    fun timeout(a):Timeout
    
    public override fun clone(a):Call
    
}
Copy the code

The following points can be seen from the above method

  1. CallIt can be cancelled
  2. CallThere areFailed.withHas been performedTwo states, he can’t or be executed twice
  3. CallThere is a timeout state, that is, its execution time is limited. Or the waiting time in the queue is limited.
  4. CallTo achieve theClonableInterface, guess whether to use the meta mode, in fact, not.
  5. CallIt’s just the interface, but the implementation isRealCallwithAsyncRealCall.
Dispatch
class Dispatcher constructor() {
    // Maximum number of requests
    var maxRequests = 64
    
    var maxRequestsPerHost=5
    
    // A thread pool for external input
    var executorServiceOrNull:ExecutorService? = null
    
    // Internal thread pool
    val executorService:ExecutorService
    	get() {if(executorServiceOrNull == null){
                executorService = ThreadPoolExecutor(
                	0.Int.MAX_VALUE,
                    60,
                    TimeUnit.SECONDS,
                    SynchronousQueue(),
                    threadFactory("$okHttpName Dispatcher".false))}}// Prepare an asynchronous request
    val readyAsyncCalls = ArrayDeque<AsyncCall>()
    
    // An asynchronous request is running
    val runningAsyncCalls = ArrayDeque<AsyncCall>()
    
    // A synchronization request in progress
    val runningSyncCalls = ArrayDeque<RealCall>()
     
}
Copy the code

Dispatch is used to manage asynchronous requests. If there is no external thread pool for incoming users, an internal thread pool of its own is enabled, constructed as follows

executorService = ThreadPoolExecutor(
                	0.Int.MAX_VALUE,
                    60,
                    TimeUnit.SECONDS,
                    SynchronousQueue(),
                    threadFactory("$okHttpName Dispatcher".false))Copy the code
  • The core thread is 0, that is, the resident thread is 0.

  • The maximum number of threads in the thread pool is int.max_value, but the actual number of threads is affected by maxRequests and maxRequestsPerHost.

  • 60, timeUnit. SECONDS If the thread is idle for 60 SECONDS, it will be destroyed.

  • The container for holding tasks is SyschronousQueue, which is a thread-safe queue container, which is important.

  • Finally, thread factories.

Synchronous Flow Analysis

Synchronous calls are simpler and run internally without child threads, so calling call.execute() directly from the UI thread blocks the thread. Note that the real implementation class for the Call interface at this point is RealCall. The implementation of the internal Execute of the class is as follows

 override fun execute(a): Response {
    // Use atomic variables to ensure that a Call is executed only once;
    check(executed.compareAndSet(false.true)) { "Already Executed" }
    // Turn on the timer and cancel the request when the connection times out;
    timeout.enter()
    // Event callback, call stack record
    callStart()
     
    try {
      // Queue the request
      client.dispatcher.executed(this)
      // The core method, through a series of interceptors, initiates a request and returns a response;
      return getResponseWithInterceptorChain()
    } finally {
      // Remove the request from the queue
      client.dispatcher.finished(this)}}Copy the code

Each RealCall instance object has an atomic operation variable inside it

private val executed = AtomicBoolean()
Copy the code

The first thing you do in the execute method is call executed.compareAndSet(false, true) to implement a race conditioned lock that ensures that each RealCall is executed only once. This atomic operation also reads objects to determine whether a RealCall has been executed

override fun isExecuted(a): Boolean = executed.get(a)Copy the code

After a synchronous lock is implemented, RealCall starts to be timed in order to automatically cancel a connection if the connection times out, which is implemented by its internal AsyncTimeout object

  private val timeout = object : AsyncTimeout() {
    override fun timedOut(a) {
      // Cancel the connection due to timeout
      cancel()
    }
  }.apply {
      // Initialize the connection time
    timeout(client.callTimeoutMillis.toLong(), MILLISECONDS)
  }
Copy the code

When the timer is turned on, Dispatch adds the Call object to the synchronization queue.

  /** Used by [Call.execute] to signal it is in-flight. */
  @Synchronized internal fun executed(call: RealCall) {
      	// Synchronize the request queue
    	runningSyncCalls.add(call)
  }
Copy the code

At this point, the Call is logged as running in the Dispatch class.

And at the end of the run, the request is removed from the queue, regardless of success or exception or timeout.

// Dispatch
  internal fun finished(call: RealCall) {
    finished(runningSyncCalls, call)
  }

  private fun <T> finished(calls: Deque<T>, call: T) {
    val idleCallback: Runnable?
    synchronized(this) {
      // If it is not in the queue, an exception is raised
      if(! calls.remove(call))throw AssertionError("Call wasn't in-flight!")
      idleCallback = this.idleCallback
    }
	// Driver async event check to confirm whether idle, no request
    val isRunning = promoteAndExecute()

    // Idle callback
    if(! isRunning && idleCallback ! =null) {
      idleCallback.run()
    }
  }
Copy the code

In this code, the FINISHED (Call :RealCall) method calls the FINISHED (Calls :Deque

, Call T) method and passes in the synchronous request queue runningSyncCalls as an argument. In the finished(Calls :Deque, Call T) method, the executed RealCall is removed from the queue using the calls.remove(call) method.

Note that after the synchronous request ends, Dispatch proactively checks the asynchronous queue to see if there are any pending asynchronous requests. This process is implemented using the promoteAndExecute() method. If no request exists at Dispatch at this time, idleCallback is executed.

In a synchronous request, RealCall through getResponseWithInterceptorChain method to get the final response. The inner part of the method is to load a series of interceptor objects in the mode of responsibility chain, and finally achieve the ultimate goal of network access to obtain the response of the server. Interceptors are at the heart of OkHttp, and several of the interceptors implemented internally encapsulate OkHttp’s core functionality.

The flow chart is as follows

Graph TD start([start])--> pushQuenen --> startTimeout -->timeout{ Timeout --Yes-->cancelCall cancelCall-->stop startTimeout --> RUNNING [interceptor connects to server for response] RUNNING -->rmQuenen[remove synchronization queue] RmQuenen -->scanQuenen[scan synchronous asynchronous queue to execute request in queue] scanQuenen--> stop([end])
The interceptor

The interceptor interface is

fun interface Interceptor { @Throws(IOException::class) fun intercept(chain: Chain): Response Companion Object {// The default implementation of the interceptor is to call the next interceptor. inline operator fun invoke(crossinline block: (chain: Chain) -> Response): Interceptor = Interceptor {block(it)}} Request @Throws(IOException::class) fun proceed(request: Request): Response fun connection(): Connection? fun call(): Call .... }}Copy the code

The Interceptor interface parameter is the Chain object. Through the Chain instance method request (), you can obtain the request object modified by the last Interceptor. Meanwhile, through the process (request) method, you pass the request modified by the current Interceptor to the next Interceptor and get the response. Thus the chain of responsibility pattern is implemented.

In a synchronous request analysis, found in the RealCall is ultimately through getResponseWithInterceptorChain method to obtain the response.

 internal fun getResponseWithInterceptorChain(a): Response {
    // Put all application interceptors into the container as parameters
    val interceptors = mutableListOf<Interceptor>()
     // User-defined interceptor
    interceptors += client.interceptors
     // The system implements the core function of the interceptor
    interceptors += RetryAndFollowUpInterceptor(client)
    interceptors += BridgeInterceptor(client.cookieJar)
    interceptors += CacheInterceptor(client.cache)
    interceptors += ConnectInterceptor
    if(! forWebSocket) { interceptors += client.networkInterceptors } interceptors += CallServerInterceptor(forWebSocket)// The core component of the responsibility Chain mode, 'Interceptor.Chain' interface implementation
    val chain = RealInterceptorChain(
        call = this,
        interceptors = interceptors,
        index = 0,
        exchange = null,
        request = originalRequest,
        connectTimeoutMillis = client.connectTimeoutMillis,
        readTimeoutMillis = client.readTimeoutMillis,
        writeTimeoutMillis = client.writeTimeoutMillis
    )

    var calledNoMoreExchanges = false
    try {
      // The responsibility chain finally gets the corresponding content
      val response = chain.proceed(originalRequest)
      if (isCanceled()) {
        response.closeQuietly()
        throw IOException("Canceled")}return response
    } catch (e: IOException) {
      calledNoMoreExchanges = true
      throw noMoreExchanges(e) as Throwable
    } finally {
      if(! calledNoMoreExchanges) { noMoreExchanges(null)}}}Copy the code

GetResponseWithInterceptorChain method of core is the core of initializing a series of applications of interceptors and user-defined interceptors create RealInterceptorChain instance, and take the initiative to call processed method. This method is the beginning of the chain of responsibility. The processed implementation of RealInterceptorChain is as follows, which is the beginning and important part of the chain of responsibility

@Throws(IOException::class)
  override fun proceed(request: Request): Response {
      // indexout
      check(index < interceptors.size)
    calls++
    // The exchange property of the application interceptor must be null,
    if(exchange ! =null) {
      check(exchange.finder.sameHostAndPort(request.url)) {
        "network interceptor ${interceptors[index - 1]} must retain the same host and port"
      }
      check(calls == 1) {
        "network interceptor ${interceptors[index - 1]} must call proceed() exactly once"}}// Create a new Chain
    val next = copy(index = index + 1, request = request)
    // Retrieve the next interceptor
    val interceptor = interceptors[index]

    // Pass in the next interceptor
    @Suppress("USELESS_ELVIS")
    val response = interceptor.intercept(next) ?: throw NullPointerException(
        "interceptor $interceptor returned null")

    if(exchange ! =null) {
      check(index + 1 >= interceptors.size || next.calls == 1) {
        "network interceptor $interceptor must call proceed() exactly once"} } check(response.body ! =null) { "interceptor $interceptor returned a response with no body" }

    return response
  }
Copy the code

From the source code, you can see that OkHttp’s core functionality is encapsulated in the following interceptors

  1. RetryAndFollowUpInterceptor
  2. BridgeInterceptor
  3. CacheInterceptor
  4. ConnectInterceptor
  5. CallServerinterceptor

How to achieve the specific later specific research, this paper only focuses on the process;

Asynchronous process analysis

Asynchronous processes are roughly the same as synchronous processes. The main difference is asynchronous request management with Dispatch. Again, the steps start with an external call.

    call.enqueue(object :Callback{
        override fun onFailure(call: Call, e: IOException) {
            TODO("Not yet implemented")}override fun onResponse(call: Call, response: Response) {
            TODO("Not yet implemented")}})Copy the code

After call.enQueue (callback) is invoked, the AsyncCall object is added to the asynchronous queue by Dispatch.

  override fun enqueue(responseCallback: Callback) {
    / / synchronization locks
    check(executed.compareAndSet(false.true)) { "Already Executed" }
	// Event send and stack record
    callStart()
    // Queue AsyncCall objects, not RealCall objects
    client.dispatcher.enqueue(AsyncCall(responseCallback))
  }
Copy the code

The AsyncCall object does not implement the Call interface; it is a Runnable object. This is where thread pools in Dispatch come in handy.

  inner class AsyncCall(
    private val responseCallback: Callback
  ) : Runnable {}
Copy the code

The enqueue logic for asynchronous requests in the Dispatch class is as follows

  internal fun enqueue(call: AsyncCall) {
    synchronized(this) {
      // Join the ready queue, because the maximum number of requests and the maximum number of requests per host,
      // So the request to join the team cannot be executed immediately;
      readyAsyncCalls.add(call)

      // the same host.
      if(! call.call.forWebSocket) {val existingCall = findExistingCallWithHost(call.host)
        if(existingCall ! =null) call.reuseCallsPerHostFrom(existingCall)
      }
    }
    // 
    promoteAndExecute()
  }
Copy the code

After joining the prepare queue, promoteAndExecute performs a series of filters and executes the part of the request that meets the criteria.

  private fun promoteAndExecute(a): Boolean {
    // Make sure the current thread is not locked
    this.assertThreadDoesntHoldLock()
	// Filter the container
    val executableCalls = mutableListOf<AsyncCall>()
    val isRunning: Boolean
    synchronized(this) {
      val i = readyAsyncCalls.iterator()
      while (i.hasNext()) {
        val asyncCall = i.next()
		// The number of requests exceeds the maximum number of connections
        if (runningAsyncCalls.size >= this.maxRequests) break 
        // The number of connections to a server exceeds the upper limit
        if (asyncCall.callsPerHost.get() > =this.maxRequestsPerHost) continue 
		// If the condition is met, it is removed from the standby queue
        i.remove()
        // Join the asynchronous execution queue
        asyncCall.callsPerHost.incrementAndGet()
        executableCalls.add(asyncCall)
        runningAsyncCalls.add(asyncCall)
      }
      isRunning = runningCallsCount() > 0
    }

    // Execute a qualified asynchronous request
    for (i in 0 until executableCalls.size) {
      val asyncCall = executableCalls[i]
      asyncCall.executeOn(executorService)
    }

    return isRunning
  }
Copy the code

The conditional asynchronous request invocation was executed

asyncCall.executeOn(executorService)
Copy the code

The executorService is a thread pool object that was analyzed in Dispatch. AsyncCall object is a Runnable object. View the executeOn method as follows

    fun executeOn(executorService: ExecutorService) {
      // Thread lock, a request cannot be executed twice
      client.dispatcher.assertThreadDoesntHoldLock()

      var success = false
      try {
        // The thread pool executes the runnable object method
        executorService.execute(this)
        success = true
      } catch (e: RejectedExecutionException) {
        // If an exception is raised, the request fails
        val ioException = InterruptedIOException("executor rejected")
        ioException.initCause(e)
        noMoreExchanges(ioException)
        responseCallback.onFailure(this@RealCall, ioException)
      } finally {
        // If no exception is raised, the request succeeds.
        if(! success) { client.dispatcher.finished(this)}}}Copy the code

The core of this method is to call executorService.execute(this), which is the Runable object. Do the exception at the same time, if no exception is triggered, it means that the asynchronous request is successful, anyway, it fails.

Next, look at Runnable’s core method, run

override fun run(a) {
      threadName("OkHttp ${redactedUrl()}") {
        var signalledCallback = false
        // Start the timer to automatically cancel the request when the network times out
        timeout.enter()
        try {
          // Get the response from the interceptor.
          val response = getResponseWithInterceptorChain()
          signalledCallback = true
          responseCallback.onResponse(this@RealCall, response)
        } catch (e: IOException) {
          if (signalledCallback) {
            // Do not signal the callback twice!
            Platform.get().log("Callback failure for ${toLoggableString()}", Platform.INFO, e)
          } else {
            responseCallback.onFailure(this@RealCall, e)
          }
        } catch (t: Throwable) {
          cancel()
          if(! signalledCallback) {val canceledException = IOException("canceled due to $t")
            canceledException.addSuppressed(t)
            responseCallback.onFailure(this@RealCall, canceledException)
          }
          throw t
        } finally {
          client.dispatcher.finished(this)}}}Copy the code

The logic here is similar to synchronous requests

  1. Enable a thread lock to prevent a request from being executed more than once

  2. If the timer expires, the link will be cancelled

  3. Through getResponseWithInterceptorChain get a response

    The difference is that the result is a callback back to the upper layer.

    Finally, the asynchronous request is removed from the queue via client.dispatcher.finish(this). As with synchronous requests, at the time of removal from the asynchronous queue, the promoteAndExecute method is used to check again for the existence of a qualified asynchronous request that can be executed.