preface

An important part of OkHttp’s ability to process tasks efficiently is that it maintains three task queues (readyAsyncCalls, runningAsyncCalls, runningSyncCalls) and a thread pool (ThreadPoolExecutor). These four things are regulated by the internal task dispenser, Dispathcer, to achieve the effect of efficient multi-task processing.

The role of the thread pool is self-evident. Its main function is to avoid the efficiency and performance problems caused by starting the thread every time we use it for time-consuming tasks, and then destroying the thread after using it. It can operate on threads multiple times and reuse idle threads without having to open and destroy them each time. About thread knowledge, if you do not know can refer to this article I wrote in Java thread details, in the face of the various types of thread pool and internal operations have a detailed introduction.

OkHttp task queue

The task queue in okHttp consists of two parts:

  • Task dispatcher: Is responsible for helping to find the appropriate task queue for tasks that need to be executed
  • ThreadPoolExecutor, a thread pool used to execute tasks assigned by dispatcher
public final class Dispatcher {
  private int maxRequests = 64;
  private int maxRequestsPerHost = 5;
  private Runnable idleCallback;

  /** Executes calls. Created lazily. */
  private ExecutorService executorService;

  /** Ready async calls in the order they'll be run. */
  private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();

  /** Running asynchronous calls. Includes canceled calls that haven't finished yet. */
  private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();

  /** Running synchronous calls. Includes canceled calls that haven't finished yet. */
  private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();

  public Dispatcher(ExecutorService executorService) {
    this.executorService = executorService;
  }

  public Dispatcher(a) {}public synchronized ExecutorService executorService(a) {
    if (executorService == null) {
      executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
          new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher".false));
    }
    returnexecutorService; }... }Copy the code

The dispatcher instantiates three task queues readyAsyncCalls, runningAsyncCalls and runningSyncCalls, as well as a thread pool, ThreadPoolExecutor.

ReadyAsyncCalls: Queue waiting to execute asynchronous tasks. When there is a task for the dispatcher add it into the thread pool, will first to determine whether a thread pool threads and can perform and if it finds no thread of execution, the first task into the task queue waiting, wait until the idle thread pool thread can perform tasks remove the task from the task queue when the hands of the thread pool.

RunningAsyncCalls: Queue of running asynchronous tasks. Stores tasks that the Dispatcher sends to the thread pool for processing.

RunningSyncCalls: Running synchronization queue. A synchronous queue is different from an asynchronous queue in that it is a serial queue rather than a parallel queue, so this represents a queue running under synchronous operation.

The executeService() method creates a thread pool, ThreadPoolExecutor, in which the first parameter, the number of core threads, is set to 0, indicating that all threads will be destroyed after an idle period.

As you can see, Okhttp builds a thread pool with a threshold of [0, integer.max_value]. It does not keep any minimum number of threads, creates more threads at any time, and only lives for 60 seconds when a thread is idle. It uses a blocking work queue that stores no elements. A thread factory called “OkHttp Dispatcher”.

That is, in actual operation, when 10 concurrent requests are received, 10 threads will be created by the thread pool, and when the work is completed, all threads will be closed in 60 seconds.

The Dispatcher dispenser

The Dispatcher dispatcher is similar to the reverse proxy in Ngnix, which distributes tasks to appropriate idle threads through dispatcher to achieve non-blocking, high availability and high concurrent connection.

A synchronous request

 @Override public Response execute(a) throws IOException {
    synchronized (this) {
      if (executed) throw new IllegalStateException("Already Executed");
      executed = true;
    }
    captureCallStackTrace();
    try {
      client.dispatcher().executed(this);
      Response result = getResponseWithInterceptorChain();
      if (result == null) throw new IOException("Canceled");
      return result;
    } finally {
      client.dispatcher().finished(this); }}Copy the code

You can see that the synchronous request does four things altogether

  1. Determines whether a task is being executed and throws an exception if so. This means that the same task can be executed only once at a time, but not more than once.

  2. Give the task to the task caller and the Dispatcher call executed to perform the task.

  3. Through getResponseWithInterceptorChain chain () call blocker, then will return to the Response task execution results.

4. Call Dispatcher to finish the task after it is completed.

At this point a synchronous request task is complete. Here about getResponseWithInterceptorChain () performed in some of the interceptor operations, later I will write an article to explain OkHttp principle of interceptor.

Asynchronous operations

synchronized void enqueue(AsyncCall call) {
  if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
      // Add a running request
    runningAsyncCalls.add(call);
       // The thread pool executes the request
    executorService().execute(call);
  } else {
      // Add to the cache queue queue waitingreadyAsyncCalls.add(call); }}Copy the code

In asynchronous operations, the number of tasks in the runningAsyncCalls queue is determined to be greater than the maximum number of requests (maxRequest) which is 64. Then determine whether runningCallsForHost is less than maxRequestsPerHost(a single host request). If one of the two is not met, it means that there are not enough threads in the thread pool to execute the task. In this case, the task is added directly to the cache queue queuing (readyAsyncCalls). When there are threads available, the task is added to the running queue and the thread pool is called to execute the call task.

Take a look at the source code inside execute

@Override protected void execute(a) {
  boolean signalledCallback = false;
  try {
      // Perform time-consuming I/O tasks
    Response response = getResponseWithInterceptorChain(forWebSocket);
    if (canceled) {
      signalledCallback = true;
      // Callback. Note that the callback is in the thread pool, not the presumed main thread callback
      responseCallback.onFailure(RealCall.this.new IOException("Canceled"));
    } else {
      signalledCallback = true;
      // callback, as above
      responseCallback.onResponse(RealCall.this, response); }}catch (IOException e) {
    if (signalledCallback) {
      // Do not signal the callback twice!
      logger.log(Level.INFO, "Callback failure for " + toLoggableString(), e);
    } else {
      responseCallback.onFailure(RealCall.this, e); }}finally {
      // Most critical code
    client.dispatcher().finished(this); }}Copy the code

Can see there is a call the interceptor chain getResponseWithInterceptorChain (), and the task of the results once again return to the Response. They call back different methods based on whether the mission was Cancled. Canceled calls onFailure(0, which handles the failed logic and calls the successful method Response() with the return value. The dispatcher’s Finish method is invoked to terminate the task on success or failure.

Let’s take a closer look at what happens in the Finish method:

 private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
    int runningCallsCount;
    Runnable idleCallback;
    synchronized (this) {
      if(! calls.remove(call))throw new AssertionError("Call wasn't in-flight!");
      if (promoteCalls) promoteCalls();
      runningCallsCount = runningCallsCount();
      idleCallback = this.idleCallback;
    }
 
    if (runningCallsCount == 0&& idleCallback ! =null) { idleCallback.run(); }}Copy the code
  • Free the redundant thread and call promoteCalls to invoke the task to be executed
  • Execute idle notification callback thread (idleCallback) if the entire thread pool is idle

Next check out promoteCalls:

 private void promoteCalls(a) {
    if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
    if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.
 
    for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
      AsyncCall call = i.next();
 
      if (runningCallsForHost(call) < maxRequestsPerHost) {
        i.remove();
        runningAsyncCalls.add(call);
        executorService().execute(call);
      }
 
      if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.}}Copy the code

The logic of promoteCalls is also simple: scan the queue of pending tasks, place the task on the queue of executing tasks, and execute the task.

conclusion

The above is the implementation details of the whole task queue, summed up with the following characteristics:

  1. OkHttp uses Dispatcher technology, similar to Nginx, in conjunction with thread pools to implement highly concurrent, blocked operations.
  2. OkHttp uses queue caching and performs tasks according to the characteristics of first in first out of columns
  3. The highlight of OkHttp is that it calls the finish function in the try/finally, which actively controls wait queue movement rather than locking or wait/notify, greatly reducing coding complexity.

Have interest can pay attention to my small column, learn more knowledge: small column