Synchronize task thread scheduling

Synchronization task thread scheduling is relatively simple

  • Client. The dispatcher. Executed (this) synchronous execution, place the task to runningSyncCalls queue
  • GetResponseWithInterceptorChain began to perform a task
  • Client. The dispatcher. The finished (this) over
try {
  client.dispatcher.executed(this)
  return getResponseWithInterceptorChain()
} finally {
  client.dispatcher.finished(this)}Copy the code

There is an idleCallback in the FINISHED method and I don't understand its function.

The promoteAndExecute() method is called to see if there is an asynchronous task, and if there is an asynchronous task, let’s do it again.

Asynchronous task thread scheduling

The trigger

The entry is in enQueue, the asynchronous calling method of RealCall, which first wraps realCall as AsyncCall that implements Runnable

override fun enqueue(responseCallback: Callback) {
    synchronized(this) { check(! executed) {"Already Executed" }
      executed = true
    }
    transmitter.callStart()
    client.dispatcher.enqueue(AsyncCall(responseCallback))
  }
Copy the code

Dispatcher

Then go to the enQueue method in the Dispatcher and add the call to the readyAsyncCalls queue

internal fun enqueue(call: AsyncCall) {
    synchronized(this) {
      readyAsyncCalls.add(call)
    }
    promoteAndExecute()
  }
Copy the code

It then traverses readyAsyncCalls to transfer executable tasks to the runningAsyncCalls queue, which can then be executed in the thread pool executorService

private fun promoteAndExecute(a): Boolean{ assert(! Thread.holdsLock(this))

    val executableCalls = mutableListOf<AsyncCall>()
    val isRunning: Boolean
    synchronized(this) {
      val i = readyAsyncCalls.iterator()
      while (i.hasNext()) {
        val asyncCall = i.next()

        if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity.
        if (asyncCall.callsPerHost().get() > =this.maxRequestsPerHost) continue // Host max capacity.

        i.remove()
        asyncCall.callsPerHost().incrementAndGet()
        executableCalls.add(asyncCall)
        runningAsyncCalls.add(asyncCall)
      }
      isRunning = runningCallsCount() > 0
    }

    for (i in 0 until executableCalls.size) {
      val asyncCall = executableCalls[i]
      asyncCall.executeOn(executorService)
    }

    return isRunning
  }
Copy the code

The thread pool

As with the executorService thread pool, each Client creates a Dispather, which initializes a ThreadPoolExecutor thread pool

private var executorServiceOrNull: ExecutorService? = null

@get:Synchronized
@get:JvmName("executorService") val executorService: ExecutorService
get() {
  if (executorServiceOrNull == null) {
    executorServiceOrNull = ThreadPoolExecutor(0.Int.MAX_VALUE, 60, TimeUnit.SECONDS,
        SynchronousQueue(), threadFactory("OkHttp Dispatcher".false))}return executorServiceOrNull!!
}
Copy the code

perform

The asynchronous thread AsyncCall will execute in this thread pool, and the call execution calls the run method, which was described in the previous section