preface

When I looked at the Okhttp source code, I found an AsyncTimeout object in the Transmitter class. After looking at the code, this class is used to do some timeout detection. This paper mainly summarizes the author’s research on AsyncTimeout mechanism.

This article is based on OKHTTP 3.14.9

Github: github.com/square/okht…

Gradle dependency: implementation group: ‘com.squareup.okhttp3’, name: ‘okhttp’, version: ‘3.14.9’

AsyncTimeout

The AsyncTimeout class is located in the Okio library and is integrated from Timeout. The class has the following comment:

/**
 * This timeout uses a background thread to take action exactly when the timeout occurs. Use this to
 * implement timeouts where they aren't supported natively, such as to sockets that are blocked on
 * writing.
 *
 * <p>Subclasses should override {@link #timedOut} to take action when a timeout occurs. This method
 * will be invoked by the shared watchdog thread so it should not do any long-running operations.
 * Otherwise we risk starving other timeouts from being triggered.
 *
 * <p>Use {@link #sink} and {@link#source} to apply this timeout to a stream. The returned value * will apply the timeout to each operation on the wrapped  stream. * * <p>Callers should call {@link #enter} before doing work that is subject to timeouts, and {@link
 * #exit} afterwards. The return value of {@link #exit} indicates whether a timeout was triggered.
 * Note that the call to {@link #timedOut} is asynchronous, and may be called after {@link #exit}.
 */
public class AsyncTimeout extends Timeout {
Copy the code

Here are a few useful bits of information:

  • This is a tool that uses uniform subthreads to detect timeouts, mainly for classes that do not support timeouts natively.
  • It provides atimedOut()Method, asA callback that detects a timeout.
  • Internally suppliedsink()andsource()Method can be adapted to stream read/write timeout detection, which can correspond to stream reads/writes on network requests, as discussed later.
  • provideenter()andexit()As a call to start and end the timer. That is to say,The starting point for execution timing will beenter()happen.

Timeout

With all this talk about timeout detection, where does the timeout time come from? Take a look at the following definition in Timeout:

  /**
   * True if {@code deadlineNanoTime} is defined. There is no equivalent to null
   * or 0 for {@link System#nanoTime}.
   */
  private boolean hasDeadline;
  private long deadlineNanoTime;
  private long timeoutNanos;
Copy the code

Timeout defines deadlineNanoTime, the deadline time; TimeoutNanos Specifies the timeout period. Specifically, the subclass AsyncTimeout uses timeoutNanos to compute timeouts.

The AsyncTimeout property is defined

Let’s look at some of the property definitions of AsyncTimeout,

  private static final int TIMEOUT_WRITE_SIZE = 64 * 1024;

  private static final long IDLE_TIMEOUT_MILLIS = TimeUnit.SECONDS.toMillis(60);
  private static final long IDLE_TIMEOUT_NANOS = TimeUnit.MILLISECONDS.toNanos(IDLE_TIMEOUT_MILLIS);

  static @Nullable AsyncTimeout head;

  private boolean inQueue;

  private @Nullable AsyncTimeout next;

  private long timeoutAt;
Copy the code
  • TimeoutAt: Records the specific time of the timeout. This is calculated by the current time at which the timer started + the above timeoutNanos.
  • One of the above codes appearsheadandnextThe definition of theta, in front ofAsyncTimeoutAs mentioned in the comment, it uses a uniform child thread for timeout detection. And thisheadandnextThe definition of is oneThe structure of a linked listWhich is used to put eachAsyncTimeoutObjects form a queue to facilitate traversal each time timeout detection is triggered. We’ll talk about that later.
  • InQueue:AsyncTimeoutOnce the objectAdd to the listIs set to true.

Use of AsyncTimeout in the network request process

Let’s look at the application of AsyncTimeout in the network request process.

  • There is a built-in AsyncTimeout attribute in the Transmitter, which has a timeoutNanos timeout set in the Transmitter constructor. This is the custom callTimeout set when the OkHttpClient is initialized. The timeout here checks for the total time of the entire request.

    private final AsyncTimeout timeout = new AsyncTimeout() {
        @Override protected void timedOut(a) { cancel(); }}; .public Transmitter(OkHttpClient client, Call call) {
        this.client = client;
        this.connectionPool = Internal.instance.realConnectionPool(client.connectionPool());
        this.call = call;
        this.eventListener = client.eventListenerFactory().create(call);
        this.timeout.timeout(client.callTimeoutMillis(), MILLISECONDS);
    }
    Copy the code

    CallTimeout: Specifies the timeout period of the entire request process. The default value is 0

  • Called when a connection is established to RealConnection. ConnectSocket (), after establishing a connection will create two Okio BufferedSource and BufferedSink object.

    // RealConnection.connectSocket()
    / / RealConnection. Java 275 rows
    source = Okio.buffer(Okio.source(rawSocket));
    sink = Okio.buffer(Okio.sink(rawSocket));
    
    / / Okio. Java 221 rows
    public static Source source(Socket socket) throws IOException {
        if (socket == null) throw new IllegalArgumentException("socket == null");
        if (socket.getInputStream() == null) throw new IOException("socket's input stream == null");
        AsyncTimeout timeout = timeout(socket);
        Source source = source(socket.getInputStream(), timeout);
        return timeout.source(source);
    }
    
    / / Okio. Java 115 rows
    public static Sink sink(Socket socket) throws IOException {
        if (socket == null) throw new IllegalArgumentException("socket == null");
        if (socket.getOutputStream() == null) throw new IOException("socket's output stream == null");
        AsyncTimeout timeout = timeout(socket);
        Sink sink = sink(socket.getOutputStream(), timeout);
        return timeout.sink(sink);
    }
    
    / / RealConnection. Java 542 rows
    ExchangeCodec newCodec(OkHttpClient client, Interceptor.Chain chain) throws SocketException {
        if(http2Connection ! =null) {
          return new Http2ExchangeCodec(client, this, chain, http2Connection);
        } else {
          socket.setSoTimeout(chain.readTimeoutMillis());
          source.timeout().timeout(chain.readTimeoutMillis(), MILLISECONDS);
          sink.timeout().timeout(chain.writeTimeoutMillis(), MILLISECONDS);
          return new Http1ExchangeCodec(client, this, source, sink); }}Copy the code

    When you create a BufferedSource or a BufferedSink object, you need to create an AsyncTimeout, and then you can create a BufferedSource or a BufferedSink object, so this code uses the decorator idea, Then source and sink have timeout ability. Later, when you create an ExchangeCodec, you will set readTimeout and writeTimeout that are customized during OkHttpClient initialization, corresponding to read and write timeouts.

    ReadTimeout: indicates the readTimeout period. The default value is 10 seconds.

    WriteTimeout: writeTimeout. The default value is 10 seconds.

Ps: Because the socket itself has the detection of connection timeout, so connectTimeout does not need to use the AsyncTimeout scheme.

AsyncTimeout Indicates the timeout detection

Join the queue and start the detection

  / / AsyncTimeout. Java 72 rows
  public final void enter(a) {
    if (inQueue) throw new IllegalStateException("Unbalanced enter/exit");
    long timeoutNanos = timeoutNanos();
    boolean hasDeadline = hasDeadline();
    if (timeoutNanos == 0 && !hasDeadline) {
      return; // No timeout and no deadline? Don't bother with the queue.
    }
    inQueue = true;
    scheduleTimeout(this, timeoutNanos, hasDeadline);
  }
Copy the code

The asyncTimeout.Enter () method is shown above, and the timeout detection is officially entered after the call. Focus on the final scheduleTimeout(this, timeoutNanos, hasDeadline); So a static method, remember that AsyncTimeout has a static member variable called head? Let’s look at this method.

  / / AsyncTimeout. Java 83 rows
  private static synchronized void scheduleTimeout(
      AsyncTimeout node, long timeoutNanos, boolean hasDeadline) {
    // Start the watchdog thread and create the head node when the first timeout is scheduled.
    if (head == null) {
      head = new AsyncTimeout();
      new Watchdog().start();
    }

    long now = System.nanoTime();
    if(timeoutNanos ! =0 && hasDeadline) {
      // Compute the earliest event; either timeout or deadline. Because nanoTime can wrap around,
      // Math.min() is undefined for absolute values, but meaningful for relative ones.
      node.timeoutAt = now + Math.min(timeoutNanos, node.deadlineNanoTime() - now);
    } else if(timeoutNanos ! =0) {
      node.timeoutAt = now + timeoutNanos;
    } else if (hasDeadline) {
      node.timeoutAt = node.deadlineNanoTime();
    } else {
      throw new AssertionError();
    }

    // Insert the node in sorted order.
    long remainingNanos = node.remainingNanos(now);
    for (AsyncTimeout prev = head; true; prev = prev.next) {
      if (prev.next == null || remainingNanos < prev.next.remainingNanos(now)) {
        node.next = prev.next;
        prev.next = node;
        if (prev == head) {
          AsyncTimeout.class.notify(); // Wake up the watchdog when inserting at the front.
        }
        break; }}}Copy the code

Ps: the node remainingNanos (now); The interval between the current time and the timeout time is calculated.

The method mainly does three things:

  • A static variableheadIf the value is left blank, global detection is not enabled. You need to enable the detection threadWatchdog. Ps:headIt is actually just a sign of the start of a queue, and does not itself belong to a timeout detection.
  • Calculates the timeout period of the current node that is added to the detection queuetimeoutAt
  • The global detection queue is carried outreorder.In accordance with thetimeoutAtOrder from smallest to largest. Ensure the follow-upWatchdogDetection mechanism. Because it is a linked list structure, you only need to change the point of the next node. The specific order can be seen in the following figure (use the journey map instead of the timeline to understand) :
Journey Title AsyncTimeout Queue order (unit: NS) Now: 0 timeoutAt: 0 timeoutAt2: 0

Watchdog

The Watchdog is the detection thread of the entire AsyncTimeout timeout detection mechanism.

private static final class Watchdog extends Thread {
    Watchdog() {
      super("Okio Watchdog");
      setDaemon(true);
    }

    public void run(a) {
      while (true) {
        try {
          AsyncTimeout timedOut;
          synchronized (AsyncTimeout.class) {
            timedOut = awaitTimeout();

            // Didn't find a node to interrupt. Try again.
            if (timedOut == null) continue;

            // The queue is completely empty. Let this thread exit and let another watchdog thread
            // get created on the next call to scheduleTimeout().
            if (timedOut == head) {
              head = null;
              return; }}// Close the timed out node.
          timedOut.timedOut();
        } catch (InterruptedException ignored) {
        }
      }
    }
  }
Copy the code
  • throughawaitTimeout()Find out which ones are out of timeAsyncTimeoutObject.
  • iftimedOutIf the object is empty, the detection continues.
  • iftimedOutforheadIn the linked listNo detection object exists. Can be linked list.
  • iftimedOutIs a valid timeout object, thenCall ittimedOut()Method to the registered listener.
  • It is worth mentioning that in theWatchDogIs set in the constructor ofsetDaemon(true);Indicates that it is a daemon. See about daemonsSetDaemon,. And the good thing about this is,It can be closed by relying on the thread that started it to close.
  • becauseWatchDogisDetection of natureOf the thread, sotimedOut()In the wayTime-consuming operations should not be performed, so as not to affect the follow-up testing.

awaitTimeout()

Find the expired AsyncTimeout in the Watchdog thread by calling awaitTimeout().

static @Nullable AsyncTimeout awaitTimeout(a) throws InterruptedException {
    // Get the next eligible node.
    AsyncTimeout node = head.next;

    // The queue is empty. Wait until either something is enqueued or the idle timeout elapses.
    if (node == null) {
      long startNanos = System.nanoTime();
      AsyncTimeout.class.wait(IDLE_TIMEOUT_MILLIS);
      return head.next == null && (System.nanoTime() - startNanos) >= IDLE_TIMEOUT_NANOS
          ? head  // The idle timeout elapsed.
          : null; // The situation has changed.
    }

    long waitNanos = node.remainingNanos(System.nanoTime());

    // The head of the queue hasn't timed out yet. Await that.
    if (waitNanos > 0) {
      // Waiting is made complicated by the fact that we work in nanoseconds,
      // but the API wants (millis, nanos) in two arguments.
      long waitMillis = waitNanos / 1000000L;
      waitNanos -= (waitMillis * 1000000L);
      AsyncTimeout.class.wait(waitMillis, (int) waitNanos);
      return null;
    }

    // The head of the queue has timed out. Remove it.
    head.next = node.next;
    node.next = null;
    return node;
  }
Copy the code

From the code, awaitTimeout() will only detect head.next.

  • ifhead.nextfornull, will enter first60sTimeout waiting status.
    • If there is still none, the timeout detection queue is considered empty,WatchdogThe thread will end. It won’t be on until the next time there’s a new test.
    • If it exists, it returnsnull, the external to the next loop.
  • ifhead.nextIf the timeout period is earlier than the current time, theThe difference between the current time and the timeout timeTimeout waiting state. Wake up will also returnnull, the external to the next loop.
  • awaitTimeout()Java multithreaded wait(), notify/notifyAll() mechanism is used. The abovescheduleTimeout(this, timeoutNanos, hasDeadline);Methods in theThe new node is inserted into the linked listWill call afterAsyncTimeout.class.notify();. The purpose of this is toAllocates resources without a timeout.

Wait, notify, and notifyAll

Exit inspection

When the process finishes, you need to call exit() to move the bound AsyncTimeout object off the list. If it is not found in the list, it is time out…

  /** Returns true if the timeout occurred. */
  public final boolean exit(a) {
    if(! inQueue)return false;
    inQueue = false;
    return cancelScheduledTimeout(this);
  }

  /** Returns true if the timeout occurred. */
  private static synchronized boolean cancelScheduledTimeout(AsyncTimeout node) {
    // Remove the node from the linked list.
    for(AsyncTimeout prev = head; prev ! =null; prev = prev.next) {
      if (prev.next == node) {
        prev.next = node.next;
        node.next = null;
        return false; }}// The node wasn't found in the linked list: it must have timed out!
    return true;
  }
Copy the code