OkHttp request flow

The following code simply demonstrates the process of sending a POST request:

   // Send a POST request
    private fun sendPostRequest(a){
        // Request the address
        val address = "Request address"
        // Create the request body
        val requestBody =  FormBody.Builder().let {
            it.add("username"."test")
            it.add("password"."123456")
            it.build()
        }
        // Create the request information
        val request = Request.Builder().let {
            it.url(address)
            it.post(requestBody)
            it.build()
        }
        // Create the client
        val client = OkHttpClient()
        client.newCall(request = request).enqueue(object : Callback{
            override fun onFailure(call: Call, e: IOException) {
                Logs.e("Request error:$e")}override fun onResponse(call: Call, response: Response) {
                Logs.e("Request completed:${response.body!! .string()}")}}}Copy the code

With the above code, we can send a POST request to the server.

The basic flow

The focus of this study note is to comb through the entire process of a request and get a general understanding of what a network request goes through. The process of network request is relatively simple to describe, just like take express is the same, take you take express message (request body), to express point (request address), get back your express (response body).

However, this is difficult to implement. Normally, we would first create a Request, write RequestBody to the Request, send the Request to the server through OkHttpClient, and receive the data returned by the server. The following study will not explain the possible exceptions, the default request is completed normally, mainly look at OkHttpClient in the process of what to do.

createOkHttpClient

Create an OkHttpClient using the following code:

val client = OkHttpClient()
Copy the code

The OkHttpClient constructor is called with no arguments. Here is the source code for the method:

constructor() : this(Builder())
Copy the code

As you can see, the Builder pattern is used here. The Builder() method is used to build an OkHttpClient with default parameters. The Builder() method is used to set the values for the parameters used.

The important thing to note here is that we create a Builder with default arguments through the okHttpClient.Builder () constructor, and then pass this Builder to OkHttpClient(Builder) to return the created OkHttpClient object. The OkHttpClient(Builder) method also calls the init{} block to set the specific parameters.

OkHttpClient.newCall()

After the OkHttpClient object is created, the newCall() method is called, passing the Request created above into the method. Here is the source code for the method:

Client.newcall (request = request) calls the following method:
override fun newCall(request: Request): Call = RealCall(this = OkHttpClient, request, forWebSocket = false)
Copy the code

As you can see, here is the object that created the RealCall directly:

class RealCall(
  val client: OkHttpClient,
  /** The application's original request unadulterated by redirects or auth headers. */
  val originalRequest: Request,
  val forWebSocket: Boolean
) : Call{
	......
}
Copy the code

The current OkHttpClient object, originalRequest, is saved in the RealCall.

RealCall.enqueue(Callback)

After obtaining the RealCall, the enqueue(Callback) method is executed. The source code for this method is as follows:

  override fun enqueue(responseCallback: Callback) {
    check(executed.compareAndSet(false.true)) { "Already Executed" }

    callStart()
    client.dispatcher.enqueue(AsyncCall(responseCallback))
  }
Copy the code

In this method, you first check whether the current RealCall has been executed through the check() method and raise an exception if it has. This is done with executed.compareAndSet(false,true). Here is the definition of the executed variable:

private val executed = AtomicBoolean()
Copy the code

So by definition, this excuted guarantees atomic operations in the case of multiple threads, The compareAndSet(Boolean Expect, Boolean Update) method means that if the current value in AtomicBoolean is the same as the value represented by Expect, it updates the current value to the value represented by update, and if not, it returns false.

In the above code, we determine whether the executed value is false, and if it is false, we update it to true. Since this is the first time the method was called, the value executed is 0, or false.

When this method is executed, the current RealCall has been executed.

It is important to note that if the current RealCall has already been executed, then the current Call cannot be repeated, otherwise an exception will be caused.

RealCall.callStart()

If the current Call is not executed, then the callStart() method is called to continue the operation. The source code for this method is as follows:

  private fun callStart(a) {
    this.callStackTrace = Platform.get().getStackTraceForCloseable("response.body().close()")
    eventListener.callStart(this)}Copy the code

As you can see, the first according to the different information platform, access to current platform of the stack trace, through getStackTraceForCloseable comments as you can see () method, this method returns an object, the object stored in the execution of this method is created when the stack trace, This is specifically for java.io.Closeable objects and is used in conjunction with LogCloseableLeak. This method will return null or a Throwable

After the stack trace object is obtained, the eventListener.callstart (this(Call)) method is called. Here is the declaration of the variable:

internal val eventListener: EventListener = client.eventListenerFactory.create(this)
Copy the code

As you can see, the object here is generated by fetching the eventListenerFfactory object from OkHttpClient and then calling the create(Call) method. Here is how the variable is defined in OkHttpClient:

val eventListenerFactory: EventListener.Factory = builder.eventListenerFactory
Copy the code

Is called directly to this object in okHttpClient. Builder:

internal var eventListenerFactory: EventListener.Factory = EventListener.NONE.asFactory()
Copy the code

Go to EventListener:

    @JvmField
    val NONE: EventListener = object : EventListener() {
    }
Copy the code

Here you can see, creating the EventListener object directly, and then return to the OkHttpClient EventListener. NONE. AsFactory () method, the following is asFactory source code:

/ / EventListener. The definition of the Factory
fun interface Factory {
    /** * Creates an instance of the [EventListener] for a particular [Call]. The returned * [EventListener] instance will be used during the lifecycle of [call]. * * This method is invoked after [call] is created. See [OkHttpClient.newCall]. * * **It is an error for implementations to issue any mutating operations on the [call] instance * from this method.** * /
    fun create(call: Call): EventListener
  }

// The asFactory method defines:
fun EventListener.asFactory(a) = EventListener.Factory { this }
Copy the code

As you can see, there is only one create(Call) method in eventListener.factory. This method takes a Call argument and returns an EventListener, which is extended by Kotlin’s extension function. Returns the current EventListener directly.

EventListener is a class that monitors the status of a Call. Evenetlistener.factory creates an EventListener via Call via the create() method, or binds the Call to an EventListener, so the execution flow of the above code is:

  • throughEventListener.NONETo create aEventListener
  • By extension functionEventListener.asFactoryTo create aEventListener.FactoryAnd in thecreate()Method returns the currentEventListener
  • inRealCallIn the callEventListener.Factorythecreate(this)Method will be currentCallBound to theEventListenerGo on, so,CallYou can call itEventListenerTo expose their state.

Note that eventListener. Factory is a fun interface modifier. It is not clear what this modifier does, but it is not possible to use lambda expressions. Eventlistener. Factory, but asFactory in the source code is created using a lambda expression ~~~

client.dispatcher.enqueue(AsyncCall(Callback))

After the execution of the callStart (), will be executed next client. The dispatcher. The enqueue (AsyncCall (the Callback) method, the following is the client. The source of the dispatcher method:

val dispatcher: Dispatcher = builder.dispatcher
Copy the code

As you can see, here is the object directly obtained from okHttpClient. Builder:

internal var dispatcher: Dispatcher = Dispatcher()
Copy the code

This is the default created a Dispatcher () object, of course, we can also through OkHttpClient. Builder. The Dispatcher (the Dispatcher) pass himself to create the Dispatcher.

AsyncCall(Callback) AsyncCall(Callback) AsyncCall(Callback)

  internal inner class AsyncCall(
    private val responseCallback: Callback
  ) : Runnable {
       // Omit the contents of the class
  }
Copy the code

This is the inner class of RealCall that implements the Runnable() interface, so we can see that this class will be used in worker threads.

After creating the AsyncCall() object, we then call the dispatcher.enqueue (AsyncCall) method. Here is the source code for this method:

  internal fun enqueue(call: AsyncCall) {
    synchronized(this) {
      readyAsyncCalls.add(call)

      // Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to
      // the same host.
      if(! call.call.forWebSocket) {val existingCall = findExistingCallWithHost(call.host)
        if(existingCall ! =null) call.reuseCallsPerHostFrom(existingCall)
      }
    }
    promoteAndExecute()
  }
Copy the code

As you can see, synchronous code blocks are used first to prevent security problems caused by multithreading. We then add the AsyncCall we created in the previous step to readyAsyncCalls, which is a double-ended queue. In addition to this, we declare two other double-ended queues in this class:

  /** Prepare calls for asynchronous calls, in the order in this container */
  private val readyAsyncCalls = ArrayDeque<AsyncCall>()

  /** Async calls that are running, including calls that have been cancelled but are not finished */
  private val runningAsyncCalls = ArrayDeque<AsyncCall>()

  /** Synchronizing calls that are running, including calls that have been cancelled but are not finished */
  private val runningSyncCalls = ArrayDeque<RealCall>()
Copy the code

Next, determine whether the current AsyncCall is forWebSocket, as shown in the following code:

// Check whether the call is forWebSocket if (! call.call.forWebSocket)
// The first call is the AsyncCall we pass in
// Call. Call is a call in AsyncCall
    val call: RealCall
        get() = this@RealCall
// We specify this when we create the 'RealCall'. // We specify this when we create the 'RealCall'
// The parameter is false, so the condition is met and the following is executed
val existingCall = findExistingCallWithHost(call.host)
if(existingCall ! =null) call.reuseCallsPerHostFrom(existingCall)

Copy the code

Next, check whether AsyncCall already exists by findExistingCallWithHost(asynccall.host).

Asynccall. host source code is as follows:

    val host: String
      get() = originalRequest.url.host
Copy the code

As you can see, what we get directly is originalRequest.url.host, and originalRequest is the Request we sent when we created the RealCall. Here is findExsitingCallWithHost(call.host) :

  private fun findExistingCallWithHost(host: String): AsyncCall? {
    for (existingCall in runningAsyncCalls) {
      if (existingCall.host == host) return existingCall
    }
    for (existingCall in readyAsyncCalls) {
      if (existingCall.host == host) return existingCall
    }
    return null
  }
Copy the code

As you can see, this is looking from the above two two-way queues. The way to find this is by iterating to see if there is a consistent data host. Since we initially added the AsyncCall we need to request to the readyAsyncCalls, this should at least find itself. (My personal understanding here is: The current AsyncCall may have been added to the queue before the current AsyncCall was added to the queue, but the current AsyncCall is added to the end of the queue, and the traverse starts at the head of the queue, so the previous AsyncCall should be found first.

Next, if the step above query to the existing AsyncCall, will call AsyncCall. ReuseCallsPerHostFrom (existingCall) method to reuse existing AsyncCall, source of this method is as follows:

    @Volatile var callsPerHost = AtomicInteger(0)

	fun reuseCallsPerHostFrom(other: AsyncCall) {
      this.callsPerHost = other.callsPerHost
    }
Copy the code

As you can see, the value of the callsPerHost parameter is set to the value of the found callsPerHost. In the previous step, it is possible that there is no previous AsyncCall with the same host, so it will look up itself after the loop is complete. When you set callsPerHost, you set your own callsPerHost, and then any AsyncCall that joins the AsyncCall with the same host will perform the same traversal, CallsPerHost = callsPerHost = callsPerHost = callsPerHost = callsPerHost = callsPerHost

The other thing to note is that the data here is AtomicInteger, which means that the data here is actually the number, the number with the same host.

promoteAndExecute()

After setting the callsPerHost parameter of AsyncCall, the PromoteAndExecute() method is then executed:

  private fun promoteAndExecute(a): Boolean {
    this.assertThreadDoesntHoldLock()

    val executableCalls = mutableListOf<AsyncCall>()
    val isRunning: Boolean
    synchronized(this) {
      val i = readyAsyncCalls.iterator()
      while (i.hasNext()) {
        val asyncCall = i.next()

        if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity.
        if (asyncCall.callsPerHost.get() > =this.maxRequestsPerHost) continue // Host max capacity.

        i.remove()
        asyncCall.callsPerHost.incrementAndGet()
        executableCalls.add(asyncCall)
        runningAsyncCalls.add(asyncCall)
      }
      isRunning = runningCallsCount() > 0
    }

    for (i in 0 until executableCalls.size) {
      val asyncCall = executableCalls[i]
      asyncCall.executeOn(executorService)
    }

    return isRunning
  }
Copy the code

In this method, it iterates over the elements in the waiting asynchronous set to determine whether to execute:

  • If the number of elements in the queue currently executing exceeds the maximum number of requests allowedmaxRequests, the default value is 64, we can set the size of this value.
  • If the current requesthostThere have been at least five of themhostIf the current request is executing, the current request will not be executed

If we find that the current request is valid, we will remove the request from readyAsyncCalls and add 1 to the callsPerHost of the request that has the same host as the request. The current request is then added to executableCalls and runningAsyncCalls.

Next, we determine whether a request is executing by calling the runningCallsCount() method:

fun runningCallsCount(a): Int = runningAsyncCalls.size + runningSyncCalls.size
Copy the code

As you can see, this method determines whether a request is executing by determining the size of the elements in the synchronous queue and asynchronous queue collections.

The request is then executed by traversing the executableCalls collection, calling the executeOn(executorService) method for each element (AsyncCall) in it.

executorService

As we saw in the previous step, we can see that we end up using thread pools to perform asynchronous tasks. The definition of an executorService is as follows:

  private var executorServiceOrNull: ExecutorService? = null

  @get:Synchronized
  @get:JvmName("executorService") val executorService: ExecutorService
    get() {
      if (executorServiceOrNull == null) {
        executorServiceOrNull = ThreadPoolExecutor(0.Int.MAX_VALUE, 60, TimeUnit.SECONDS,
            SynchronousQueue(), threadFactory("$okHttpName Dispatcher".false))}return executorServiceOrNull!!
    }
Copy the code

As you can see, it first determines whether or not executorServiceOrNull is empty. If it is not, it uses the ExecutorService directly. If it is empty, it creates an ExecutorService with a pool of zero core threads. The maximum number of threads allowed is int.max_value. Threads are stored in a SynchronousQueue and generated using the threadFactory() method.

We can also set our own thread pool when we create OkHttpClient by calling the Dispatcher(ExcutorService) constructor. Here is the source code for the Dispatcher constructor:

  constructor(executorService: ExecutorService) : this() {
    this.executorServiceOrNull = executorService
  }
Copy the code

As you can see, we will be the incoming thread pool object set for this. ExcutorServiceOrNull variable, so that we in the calling excutorService variables because excutorServiceOrNull when not empty, can directly use the incoming thread pool.

AsyncCallIn theexecuteOn(ExecutorService)

We have created the thread pool and are ready to execute the current request by calling AsyncCall’s executeOn(ExcutorService). Here is the source code for this method:

    fun executeOn(executorService: ExecutorService) {
      client.dispatcher.assertThreadDoesntHoldLock()

      var success = false
      try {
        executorService.execute(this)
        success = true
      } catch (e: RejectedExecutionException) {
        val ioException = InterruptedIOException("executor rejected")
        ioException.initCause(e)
        noMoreExchanges(ioException)
        responseCallback.onFailure(this@RealCall, ioException)
      } finally {
        if(! success) { client.dispatcher.finished(this) // This call is no longer running!}}}Copy the code

In this method, we first create a variable that holds whether the current request was successful, which defaults to false, and then execute the contents of the try block, in this case directly calling the exCutorService.excute (this) method. Because the AsyncCall class itself implements the Runnable interface, executing this method here will execute the run() method of AsyncCall.

AsyncCall.run()

After the above steps, we are ready to execute the run() method. The source code for this method is as follows:

    override fun run(a) {
      threadName("OkHttp ${redactedUrl()}") {
        var signalledCallback = false
        timeout.enter()
        try {
          val response = getResponseWithInterceptorChain()
          signalledCallback = true
          responseCallback.onResponse(this@RealCall, response)
        } catch (e: IOException) {
          if (signalledCallback) {
            // Do not signal the callback twice!
            Platform.get().log("Callback failure for ${toLoggableString()}", Platform.INFO, e)
          } else {
            responseCallback.onFailure(this@RealCall, e)
          }
        } catch (t: Throwable) {
          cancel()
          if(! signalledCallback) {val canceledException = IOException("canceled due to $t")
            canceledException.addSuppressed(t)
            responseCallback.onFailure(this@RealCall, canceledException)
          }
          throw t
        } finally {
          client.dispatcher.finished(this)}}}Copy the code

The threadName(name,block) method is executed first. The source code for this method is as follows:

inline fun threadName(name: String, block: () -> Unit) {
  val currentThread = Thread.currentThread()
  val oldName = currentThread.name
  currentThread.name = name
  try {
    block()
  } finally {
    currentThread.name = oldName
  }
}
Copy the code

The function of this method is to name the current thread. Since threads in the thread pool can be reused, a new name is set during the execution of the current thread, and then the original name is set again after the execution.

getResponseWithInterceptorChain()

In the above code execution is completed, the next will perform getResponseWithInterceptorChain (), this method returns a Response, that is here to truly perform network request, the method of the source code is as follows:

  @Throws(IOException::class)
  internal fun getResponseWithInterceptorChain(a): Response {
    // Build a full stack of interceptors.
    val interceptors = mutableListOf<Interceptor>()
    interceptors += client.interceptors
    interceptors += RetryAndFollowUpInterceptor(client)
    interceptors += BridgeInterceptor(client.cookieJar)
    interceptors += CacheInterceptor(client.cache)
    interceptors += ConnectInterceptor
    if(! forWebSocket) { interceptors += client.networkInterceptors } interceptors += CallServerInterceptor(forWebSocket)val chain = RealInterceptorChain(
        call = this,
        interceptors = interceptors,
        index = 0,
        exchange = null,
        request = originalRequest,
        connectTimeoutMillis = client.connectTimeoutMillis,
        readTimeoutMillis = client.readTimeoutMillis,
        writeTimeoutMillis = client.writeTimeoutMillis
    )

    var calledNoMoreExchanges = false
    try {
      val response = chain.proceed(originalRequest)
      if (isCanceled()) {
        response.closeQuietly()
        throw IOException("Canceled")}return response
    } catch (e: IOException) {
      calledNoMoreExchanges = true
      throw noMoreExchanges(e) as Throwable
    } finally {
      if(! calledNoMoreExchanges) { noMoreExchanges(null)}}}Copy the code

In the above method, we first create a list of interceptors to hold all the interceptors. We first get the list of interceptors in OkHttpClient, and then add the interceptors already defined by the system. Finally, call RealInterceptorChain(), and then execute realchain.process (originalRequest) to get the final Response. After that, if the current request is not cancelled, the Response currently requested will be returned. The finally block is then called, executing the noMoreExchanges(NULL) method.

When we get the Response that completes the request, we return it to the previous run() method, and then call the responseCallback(RealCall, Response) method to call the data back to the Callback we passed earlier. Finally performs the finally block in the client. The dispatcher. The finished (this) code, finished () method of the source code is as follows:

  internal fun finished(call: AsyncCall) {
    call.callsPerHost.decrementAndGet()
    finished(runningAsyncCalls, call)
  }
    private fun <T> finished(calls: Deque<T>, call: T) {
    val idleCallback: Runnable?
    synchronized(this) {
      if(! calls.remove(call))throw AssertionError("Call wasn't in-flight!")
      idleCallback = this.idleCallback
    }

    val isRunning = promoteAndExecute()

    if(! isRunning && idleCallback ! =null) {
      idleCallback.run()
    }
  }
Copy the code

As you can see from the method name, this method is called after the request is complete. In this case, it first removes the current AsyncCall from the queue and then determines if there are any more requests in progress. If there are no AsyncCall requests in progress and the idleCallback parameter is set, Then idlecallback.run () will be executed. IdleCallback is a variable of type Runnable, which we can set to execute the Runnable after all requests have finished, doing something.

conclusion

So far, the basic request process for OkHttp has a more clear understanding, mainly to learn the creation of OkHttpClient; And what happens between calling okHttpClient.newCall ().enqueue(Callback) and finally getting the Response; What role EventListener plays in this; How to set up thread pools that use your own project. How to deal with Interceptor later will be learned in the next note.