This article analyzes two tools mentioned but not mentioned in the previous article, namely Dispatcher and AsyncTimeout

The Dispatcher dispenser

This class is responsible for scheduling asynchronous requests, internally using a thread pool for asynchronous scheduling. And is responsible for collecting synchronous requests in progress. Like a dispatcher, you can get synchronous/asynchronous requests currently in progress and asynchronous requests waiting. You can even call cancelAll() to cancelAll requests.

The external approach to using Dispatcher is simple, as described in the previous article.

  • executed(RealCall call)Called when synchronizing a request
  • finished(RealCall call)Called when the synchronization request ends
  • enqueue(AsyncCall call)Called on an asynchronous request
  • finished(AsyncCall call)Called when an asynchronous request ends
  • cancelAll()Cancel all requests.

AsyncCall encapsulates RealCall and inherits Runnable because it is scheduled using a thread pool. Let’s analyze each function point in detail.

Storage data structure

/** Ready async calls in the order they'll be run. */
private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();

/** Running asynchronous calls. Includes canceled calls that haven't finished yet. */
private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();

/** Running synchronous calls. Includes canceled calls that haven't finished yet. */
private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
Copy the code

The internal storage structure uses a Deque double-ended queue. Both ends can be accessed at the same time. There are three queues to store data. ReadyAsyncCalls/runningAsyncCalls/runningSyncCalls.

  1. ReadyAsyncCalls stores asynchronous requests waiting to be executed
  2. RunningAsyncCalls Stores asynchronous requests that are running
  3. RunningSyncCalls Synchronization requests that are being executed

An asynchronous request

void enqueue(AsyncCall call) {
  synchronized (this) {
    readyAsyncCalls.add(call);
  }
  promoteAndExecute();
}
Copy the code

The asynchronous request calls the enqueue method and is placed directly on the waiting asynchronous request queue. The unified scheduling is directly requested by promoteAndExecute method.

private boolean promoteAndExecute() { assert (! Thread.holdsLock(this)); List<AsyncCall> executableCalls = new ArrayList<>(); boolean isRunning; synchronized (this) { for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) { AsyncCall asyncCall = i.next(); if (runningAsyncCalls.size() >= maxRequests) break; // Max capacity. if (runningCallsForHost(asyncCall) >= maxRequestsPerHost) continue; // Host max capacity. i.remove(); executableCalls.add(asyncCall); runningAsyncCalls.add(asyncCall); } isRunning = runningCallsCount() > 0; } for (int i = 0, size = executableCalls.size(); i < size; i++) { AsyncCall asyncCall = executableCalls.get(i); asyncCall.executeOn(executorService()); } return isRunning; }Copy the code

The internal logic first iterates through readyAsyncCalls, which store the incoming AsyncCall that just called enQueue. There are two idle conditions, and if the maximum number of requests is exceeded, execution cannot continue. The isRunning variable indicates whether there are currently synchronous asynchronous requests running. If enQueue is called for an asynchronous request, but the maximum capacity has been exceeded, there is no guarantee that it will be executed.

Private <T> void finished(Deque<T> calls, T call) {... synchronized (this) { if (! calls.remove(call)) throw new AssertionError("Call wasn't in-flight!" ); . } boolean isRunning = promoteAndExecute(); . }Copy the code

The answer is in the FINISHED method, which is called after both synchronous and asynchronous requests have been executed. Advanced remove, and then call promoteAndExecute method for scheduling processing. If the maximum capacity requirement is now met, it will continue running. This ensures that the asynchronous request will definitely be run.

A synchronous request

synchronized void executed(RealCall call) {
  runningSyncCalls.add(call);
}
Copy the code

The synchronous request stores the call directly in runningSyncCalls. The reason for this is that the synchronization request external has directly requested the interceptor chain externally for processing. After the synchronization request is complete, call finished for reclamation.

void finished(RealCall call) {
  finished(runningSyncCalls, call);
}
Copy the code

What is the use of saving synchronous requests?

  • The synchronization request in progress is saved first, and this information is available externally.
  • Secondly, the external can be calledcancelAll()Cancel all requests. This satisfies the cancellation requirement.

Request number limit

private int maxRequests = 64;
private int maxRequestsPerHost = 5;
Copy the code

The maximum number of OKHttp requests is also the responsibility of the Dispatcher.

  1. MaxRequests indicates the maximum number of asynchronous requests that can passsetMaxRequestsMethod to set.
  2. MaxRequestsPerHost: indicates the maximum number of asynchronous requests per host. Here host is defined as a host address in OKHttp, and some examples are given below, not an IP address.
The url host
android.com/ android.com
http://127.0.0.1/ 127.0.0.1
xn--n3h.net/ xn--n3h.net

Idle callback

private @Nullable Runnable idleCallback;
Copy the code

OKHttp internally provides idle callbacks, which means idleCallback is called when neither the current synchronous or asynchronous request is running. You can set the idleCallback with setIdleCallback(@nullable Runnable idleCallback). The code to execute the logic is as follows:

private <T> void finished(Deque<T> calls, T call) { Runnable idleCallback; synchronized (this) { if (! calls.remove(call)) throw new AssertionError("Call wasn't in-flight!" ); idleCallback = this.idleCallback; } boolean isRunning = promoteAndExecute(); if (! isRunning && idleCallback ! = null) { idleCallback.run(); }}Copy the code

After a network request is complete, the finished method is called by the Dispatcher to indicate that the request is complete. The promoteAndExecute method internally gets the number of requests to run through the runningCallsCount() method. Idlecallback.run () is called when neither the current synchronous nor asynchronous request is running.

public synchronized int runningCallsCount() {
  return runningAsyncCalls.size() + runningSyncCalls.size();
}
Copy the code

The thread pool

private @Nullable ExecutorService executorService;
Copy the code

The Dispatcher will load a thread pool internally, or we can customize a thread pool by constructor. However, this thread pool needs to meet the maximum capacity requirements, such as a maximum simultaneous capacity of 10, our thread pool can not have less than 10 total available.

public synchronized ExecutorService executorService() {
  if (executorService == null) {
    executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
        new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
  }
  return executorService;
}
Copy the code

A thread pool created internally by the Dispatcher. There is no core thread, the maximum worker thread is integer. MAX_VALUE, and it will be reclaimed after being idle for 60 seconds.

Cancel request action

The Dispatcher provides an API to cancel all requests

public synchronized void cancelAll() { for (AsyncCall call : readyAsyncCalls) { call.get().cancel(); } for (AsyncCall call : runningAsyncCalls) { call.get().cancel(); } for (RealCall call : runningSyncCalls) { call.cancel(); }}Copy the code

All stored headers are called directly, and the Cancel method is called on the RealCall. The logic is simple.

@Override public void cancel() { retryAndFollowUpInterceptor.cancel(); } public void cancel() { canceled = true; StreamAllocation streamAllocation = this.streamAllocation; if (streamAllocation ! = null) streamAllocation.cancel(); }Copy the code

The underlying request invokes StreamAllocation’s Cancel method. And the emergence of a new class, the class is very important, we speak RetryAndFollowUpInterceptor can speak.

AsyncTimeout Timeout processing

The AsyncTimeout class is responsible for requesting timeouts. In the previous introduction to OkHttpClient, we introduced the timeout fields:

final int callTimeout; Final int connectTimeout; // Socket Connent Connection time limit final int readTimeout; // Time limit for socket data reading final int writeTimeout; // Socket write time limitCopy the code

The following describes the working principle of the timeout field by analyzing AsyncTimeout. AsyncTimeout inherits from okio.Timeout. The okio.Timeout class is mainly responsible for setting the Timeout period in two ways

  • throughtimeout()Method sets the run time, which triggers a timeout
  • throughdeadlineNanoTime()Set the deadline to a time when the deadline will expire

After setting the time data, use the AsyncTimeout method to handle the timeout. AsyncTimeout#enter() is used to start the timer, AsyncTimeout#exit() is used to indicate completion and the timer is stopped. If a timeOut is triggered after the set time limit is exceeded before the execution is complete, AsyncTimeout#timeOut() is called, which is a protected method that needs to override its own custom timeOut logic. That’s the overall timeout process. Let’s look at how to implement timeouts. After the Enter method is called, scheduleTimeout is called internally to enable the timeout thread.

static @Nullable AsyncTimeout head;

/** True if this node is currently in the queue. */
private boolean inQueue;

/** The next node in the linked list. */
private @Nullable AsyncTimeout next;
private static synchronized void scheduleTimeout(
    AsyncTimeout node, long timeoutNanos, boolean hasDeadline) {
  if (head == null) {
    head = new AsyncTimeout();
    new Watchdog().start();
  }

  long now = System.nanoTime();
  if (timeoutNanos != 0 && hasDeadline) {
    node.timeoutAt = now + Math.min(timeoutNanos, node.deadlineNanoTime() - now);
  } else if (timeoutNanos != 0) {
    node.timeoutAt = now + timeoutNanos;
  } else if (hasDeadline) {
    node.timeoutAt = node.deadlineNanoTime();
  } else {
    throw new AssertionError();
  }

  // Insert the node in sorted order.
  long remainingNanos = node.remainingNanos(now);
  for (AsyncTimeout prev = head; true; prev = prev.next) {
    if (prev.next == null || remainingNanos < prev.next.remainingNanos(now)) {
      node.next = prev.next;
      prev.next = node;
      if (prev == head) {
        AsyncTimeout.class.notify();
      }
      break;
    }
  }
}
Copy the code

Internally, a linked list is used for processing, with head representing the head of the list, next representing the next section of the list, and inQueue indicating whether it is currently in the list. Calling scheduleTimeout creates an empty head node if there are no current ones. And start the Watchdog thread to start the timer. In the logic below, insert the current node into the linked list, in order of the current time. A Watchdog is a Thread. The specific execution logic is in run.

public void run() {
  while (true) {
    try {
      AsyncTimeout timedOut;
      synchronized (AsyncTimeout.class) {
        timedOut = awaitTimeout();
        if (timedOut == null) continue;
        if (timedOut == head) {
          head = null;
          return;
        }
      }

      // Close the timed out node.
      timedOut.timedOut();
    } catch (InterruptedException ignored) {
    }
  }
}
Copy the code

The awaitTimeout method takes the current AsyncTimeout and executes its timeOut method to indicate that it has timed out. If the head node is already returned, the entire queue has timed out and the while loop exits.

private static final long IDLE_TIMEOUT_MILLIS = TimeUnit.SECONDS.toMillis(60);
static @Nullable AsyncTimeout awaitTimeout() throws InterruptedException {
  // Get the next eligible node.
  AsyncTimeout node = head.next;

  if (node == null) {
    long startNanos = System.nanoTime();
    AsyncTimeout.class.wait(IDLE_TIMEOUT_MILLIS);
    return head.next == null && (System.nanoTime() - startNanos) >= IDLE_TIMEOUT_NANOS
        ? head  // The idle timeout elapsed.
        : null; // The situation has changed.
  }

  long waitNanos = node.remainingNanos(System.nanoTime());

  // The head of the queue hasn't timed out yet. Await that.
  if (waitNanos > 0) {
    // Waiting is made complicated by the fact that we work in nanoseconds,
    // but the API wants (millis, nanos) in two arguments.
    long waitMillis = waitNanos / 1000000L;
    waitNanos -= (waitMillis * 1000000L);
    AsyncTimeout.class.wait(waitMillis, (int) waitNanos);
    return null;
  }

  // The head of the queue has timed out. Remove it.
  head.next = node.next;
  node.next = null;
  return node;
}
Copy the code

Wait (waitMillis, (int) waitNanos) is used internally. If the list is left with the head node, the thread will wait for the IDLE_TIMEOUT_MILLIS value. After this time, the thread will exit and continue to add elements. Because the list is sorted by time, we are await the list in turn. The logic is simple.

How is exit() implemented to complete the operation?

/** Returns true if the timeout occurred. */ public final boolean exit() { if (! inQueue) return false; inQueue = false; return cancelScheduledTimeout(this); } private static synchronized boolean cancelScheduledTimeout(AsyncTimeout node) { for (AsyncTimeout prev = head; prev ! = null; prev = prev.next) { if (prev.next == node) { prev.next = node.next; node.next = null; return false; } } return true; }Copy the code

The exit() method empties the list directly inside. This will no longer trigger the timeout. The logic of AsyncTimeout is relatively simple. It uses linked lists to process timeout, and the processing of timeout is achieved with await method. Multiple AsyncTimeouts created with a call to Enter are placed after the static head, waiting to be processed. The timeOut method is called to indicate a timeOut. The design is clever.

The following article will examine the interceptors that are at the heart of OKHttp.