Okhttp series:

You want series: Network request framework OkHttp3 full solution series – (I) basic use of OkHttp

OkHttp: OkHttp: OkHttp: OkHttp: OkHttp: OkHttp

You want series: network request frame OkHttp3 full solution series – (3) the interceptor, rounding 1: retry redirection, Bridges, cache (key)

Network Request framework OkHttp3 Full solution series – (4) Interceptor details 2: Connect, request services (key)


In the previous article in this series, we looked at the first three interceptors in the OkHttp interceptor chain: RetryAndFollowUpInterceptor, BridgeInterceptor, CacheInterceptor, they made some preprocessing before request connection is established.

If the request passes through these three interceptors and then continues, a network request is required (not directly satisfied by the cache), which is what we’ll be analyzing today — the remaining two interceptors: ConnectInterceptor and CallServerInterceptor are responsible for connection establishment and request service reading and writing respectively.

Background – HTTP protocol development

Before getting into interceptors, it’s important to have some background on the HTTP protocol, because okHTTP’s network connection is implemented on this basis. The HTTP protocol goes through the following three versions.

HTTP1.0

In HTTP1.0, a single request establishes a TCP connection, and when the request is complete, the connection is actively disconnected. The advantage of this approach is that it is simple and the requests do not interfere with each other. But each request goes through three handshakes, two or four waves of connection establishment and disconnection — greatly affecting network efficiency and overhead.

HTTP1.1

In HTTP1.1, the connection cannot be reused in HTTP1.0, and persistent connections are supported using keep-alive: After an HTTP request is completed, the TCP connection is not immediately disconnected. If a new HTTP request is sent to the same Host as the last one, the TCP connection is directly reused. This reduces the cost and latency of establishing and closing connections. Keep-alive is turned on by default in HTTP1.1 — add: Connection :keep-alive to the request header. (Keep-alive does not stay connected permanently, it has a hold time that can be set in different server software such as Apache.)

HTTP2.0

In HTTP1.1, the multiplexing of connections is serial: one request establishes a TCP connection, and after the request is complete, the next request to the same host continues to use the connection. However, if the client wants to initiate multiple parallel requests at the same time, it must establish multiple TCP connections. This will cause network latency and increase network overhead.

And HTTP1.1 does not compress request and response headers, resulting in unnecessary network traffic; HTTP1.1 does not support resource priorities, resulting in low utilization of underlying TCP connections. These issues are addressed in HTTP2.0, which has the following features:

  • The new Binary Format: HTTP /1.x uses the plaintext protocol, which consists of three parts: The protocol parsing of request line, header, and body is based on text, but there are natural defects in this way. There are various forms of text expression, and many scenarios must be considered in order to achieve robustness. Binary is different, only recognizing the combination of 0 and 1. Based on this consideration, the protocol resolution of HTTP /2.0 was decided to adopt binary format, which is convenient and robust
  • MultiPlexing: MultiPlexing StreamId is used to distinguish requests. A request corresponds to a stream and an ID is assigned. In this way, a TCP connection can have multiple streams, and the frames of each stream can be randomly intermixed. Recipients can assign frames to different requests based on the stream ID
  • Priority and Dependency: Each stream can have its Priority and Dependency set. Streams with higher Priority are processed by the server and returned to the client first. Streams can also depend on other sub Streams. Priorities and dependencies can be dynamically adjusted. For example, when browsing the list of goods on the APP, the user quickly slides to the bottom, but the previous request has been sent. If the priority behind is not set high, the picture currently browsed will be displayed at the end, which is obviously not conducive to user experience
  • Header compression: Http2.0 uses encoder to reduce the size of the headers that need to be transferred. The communication parties cache a header fields table to avoid duplicate header transmission and reduce the size of the transmission
  • Reset the connection: a lot of demand in the APP has to stop the downloading images, for http1. X, is disconnected, directly lead to the next time to send the request must be to establish a connection; Http2.0 introduces a frame of type RST_STREAM, which can be used to cancel a request stream if the connection is continuously open

Two new concepts are involved:

  • Stream-stream: a logical bidirectional byte stream over a TCP connection that carries bidirectional messages corresponding to a request and its response. Each time a client initiates a request, it establishes a data flow through which all subsequent data of the request and its response is transmitted. Each data stream has a unique identifier and optional priority information.
  • The frame to frame: the smallest unit of HTTP/2 data slicing that holds specific types of data, such as HTTP headers, message payloads, and so on. Frames from different data streams can be sent interleaving and then reassembled according to the data stream identifier of each frame header, enabling the effect of multiple requests or responses being transmitted in parallel at a macro level.

The multiplexing mechanism allows multiple requests to be executed in parallel over the same TCP connection.

Whether HTTP1.1’s keep-alive mechanism or HTTP2.0’s multiplexing mechanism, the implementation of connection pooling is required to maintain network connections. Let’s take a look at the connection pooling implementation in OkHttp, ConnectInterceptor.

ConnectInterceptor

The code for the ConnectInterceptor is as follows:

// Open the connection to the target service and process the next interceptor
public final class ConnectInterceptor implements Interceptor {
  public final OkHttpClient client;

  public ConnectInterceptor(OkHttpClient client) {
    this.client = client;
  }

  @Override public Response intercept(Chain chain) throws IOException {
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    Request request = realChain.request();
    Transmitter transmitter = realChain.transmitter();

    // We need the network to satisfy this request. Possibly for validating a conditional GET.
    booleandoExtensiveHealthChecks = ! request.method().equals("GET");
    Exchange exchange = transmitter.newExchange(chain, doExtensiveHealthChecks);

    returnrealChain.proceed(request, transmitter, exchange); }}Copy the code

There is very little code, mostly a method called PROCEED on the interceptor chain that gets an Exchange instance using transmitter. NewExchange. Note that the proceed method called by the interceptor analyzed earlier is one-parameter, whereas here it is three-parameter. This is because the next interceptor (the CallServerInterceptor, and the last one if no network interceptor is configured) needs to do real network IO operations, while Exchange (for Exchange) is mainly used for real IO operations: Write request, read response (covered in the next interceptor).

In fact, the logic of retrieving an Exchange instance is encapsulated in Transmitter. As mentioned in the previous article, Transmitter is the “Transmitter”, which transmits the request from the application end to the network layer. It holds the connection, request, response and flow of the request. A request corresponds to a Transmitter instance and a data flow. Take a look at its newExchange method:

  Exchange newExchange(Interceptor.Chain chain, boolean doExtensiveHealthChecks) {
    synchronized (connectionPool) {
      if (noMoreExchanges) {
        throw new IllegalStateException("released");
      }
      if(exchange ! =null) {
        throw new IllegalStateException("cannot make a new request because the previous response "
            + "is still open: please call response.close()");
      }
    }

    ExchangeCodec codec = exchangeFinder.find(client, chain, doExtensiveHealthChecks);
    Exchange result = new Exchange(this, call, eventListener, exchangeFinder, codec);

    synchronized (connectionPool) {
      this.exchange = result;
      this.exchangeRequestDone = false;
      this.exchangeResponseDone = false;
      returnresult; }}Copy the code

If the first request is made, the first two ifs are not entered. You then see that an Exchange Dec instance is obtained using Exchange Finder’s find method, the Exchange instance is built as a parameter, and returned. Well, it looks easy too. Note that this method includes classes such as RealConnectionPool, ExchangeFinder, Exchange codec, and Exchange Management Exchange.

  • RealConnectionPool, a connection pool that manages requested connections, including creation, reuse, and closure, similar in understanding to thread pools.
  • ExchangeCodec, interface class, responsible for the real IO operation – write request, read response, implementation class Http1ExchangeCodec, Http2ExchangeCodec, corresponding to HTTP1.1 protocol, HTTP2.0 protocol respectively.
  • Exchange, which manages IO operations and can be understood as data flows, is a wrapper around Exchange Dec with added event callbacks; Each request corresponds to one Exchange instance. Pass it to the next interceptor, CallServerInterceptor.
  • ExchangeFinder, which looks for available TCP connections (from the connection pool) and returns exchange dec through the connection.

ExchangeFinder

ExchangeFinder, as its name suggests, is essentially an Exchange finder that finds a TCP connection for a request. If a connection is available, use it; if not, open a new one. To execute a network request, a TCP connection pointing to the target service is required before I/O operations such as write request and read response are performed. How is ExchangeFinder found? Keep reading

Let’s start by looking at where exchangeFinder initializes:

  public void prepareToConnect(Request request) {...this.exchangeFinder = new ExchangeFinder(this, connectionPool, createAddress(request.url()),
        call, eventListener);
  }
Copy the code

See here should be thinking of an article in the analysis on RetryAndFollowUpInterceptor mentioned, prepareToConnect effect of this method is to connect to, is to create the ExchangeFinder instance. The main arguments passed in are connectionPool, Address returned by the createAddress method, Call, and eventListener. ConnectionPool connectionPool connectionPool connectionPool connectionPool

  private Address createAddress(HttpUrl url) {
    SSLSocketFactory sslSocketFactory = null;
    HostnameVerifier hostnameVerifier = null;
    CertificatePinner certificatePinner = null;
    if (url.isHttps()) {
      sslSocketFactory = client.sslSocketFactory();
      hostnameVerifier = client.hostnameVerifier();
      certificatePinner = client.certificatePinner();
    }

    return new Address(url.host(), url.port(), client.dns(), client.socketFactory(),
        sslSocketFactory, hostnameVerifier, certificatePinner, client.proxyAuthenticator(),
        client.proxy(), client.protocols(), client.connectionSpecs(), client.proxySelector());
  }
Copy the code

Create an Address instance using the URL and client configuration. Address means the Address of the connection to the service, which can be interpreted as the request Address and its configuration. Address has an important role: HTTP requests with the same Address share the same connection. This can be used as a judgment for HTTP1.1 and HTTP2.0 reuse connection requests mentioned earlier.

Look back at the find methods of exchangeFinder

  public ExchangeCodec find(
      OkHttpClient client, Interceptor.Chain chain, boolean doExtensiveHealthChecks) {
    int connectTimeout = chain.connectTimeoutMillis();
    int readTimeout = chain.readTimeoutMillis();
    int writeTimeout = chain.writeTimeoutMillis();
    int pingIntervalMillis = client.pingIntervalMillis();
    boolean connectionRetryEnabled = client.retryOnConnectionFailure();

    try {
      // Find a healthy connection
      RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
          writeTimeout, pingIntervalMillis, connectionRetryEnabled, doExtensiveHealthChecks);
      // Instantiate an ExchangeCodec object with a connection, returning Http2ExchangeCodec if it is HTTP/2, or Http1ExchangeCodec otherwise
      return resultConnection.newCodec(client, chain);
    } catch (RouteException e) {
      trackFailure();
      throw e;
    } catch (IOException e) {
      trackFailure();
      throw newRouteException(e); }}Copy the code

Basically, the findHealthyConnection method is used to obtain a connected RealConnection instance, and then the RealConnection newCodec method is used to obtain an Exchange Dec instance. Return Http2ExchangeCodec if it is HTTP/2, Http1ExchangeCodec otherwise, and return.

The findHealthyConnection method name implies that it is used to find available TCP connections, and we suspect that the method has a close connection to the ConnectionPool. Follow up with the findHealthyConnection method:

  private RealConnection findHealthyConnection(int connectTimeout, int readTimeout,
      int writeTimeout, int pingIntervalMillis, boolean connectionRetryEnabled,
      boolean doExtensiveHealthChecks) throws IOException {
    while (true) {
      / / to find connections
      RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout,
          pingIntervalMillis, connectionRetryEnabled);

      // If the connection is new and not HTTP2.0, no check is required
      synchronized (connectionPool) {
        if (candidate.successCount == 0 && !candidate.isMultiplexed()) {
          returncandidate; }}// The physical examination is not healthy, keep looking
      if(! candidate.isHealthy(doExtensiveHealthChecks)) {// The flag is not available
        candidate.noNewExchanges();
        continue;
      }

      returncandidate; }}Copy the code

Loop to find connections: If it is an unhealthy connection, the tag is unavailable (it will be removed after the tag, as discussed later in connection pooling), and then keep looking. Health refers to the connection being able to handle new data streams. The socket is the connection status. Let’s follow up with the findConnection method to find a connection:

  // Find connections to host new data flows. The search sequence is allocated connections, connection pools, and new connections
  private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
      int pingIntervalMillis, boolean connectionRetryEnabled) throws IOException {
    boolean foundPooledConnection = false;
    RealConnection result = null;
    Route selectedRoute = null;
    RealConnection releasedConnection;
    Socket toClose;
    synchronized (connectionPool) {
      // The request has been cancelled (cancel method of Call -> Cancel method of transmitter), and an exception has been changed
      if (transmitter.isCanceled()) throw new IOException("Canceled");
      hasStreamFailure = false; 

      // Try to use the connection that has been allocated to the data stream. (For example, when redirecting a request, the connection from the last request can be reused.)
      releasedConnection = transmitter.connection;
      // If a connection has been allocated but has been restricted to handle new data streams, try to release it (if there is no data stream on the connection) and return the socket to be closed.toClose = transmitter.connection ! =null && transmitter.connection.noNewExchanges
          ? transmitter.releaseConnectionNoEvents()
          : null;

      if(transmitter.connection ! =null) {
        // If it is not empty, the connection is available
        result = transmitter.connection;
        releasedConnection = null;
      }

      if (result == null) {
        // If there are no available connections allocated, try to get them from the connection pool. (More on connection pooling later)
        if (connectionPool.transmitterAcquirePooledConnection(address, transmitter, null.false)) {
          foundPooledConnection = true;
          result = transmitter.connection;
        } else if(nextRouteToTry ! =null) {
          selectedRoute = nextRouteToTry;// There are routes to try
          nextRouteToTry = null;
        } else if (retryCurrentRoute()) {
          selectedRoute = transmitter.connection.route();
        }
      }
    }
    closeQuietly(toClose);// If yes, close the socket to be closed

    if(releasedConnection ! =null) {
      eventListener.connectionReleased(call, releasedConnection);// (if any) callback connection release event
    }
    if (foundPooledConnection) {
      eventListener.connectionAcquired(call, result);// The callback (if any) gets the connection event (from the connection pool)
    }
    if(result ! =null) {
      // If there are any available connections allocated or obtained from the connection pool, end! If not, go through the new connection procedure below.
      return result;
    }

    // If routing information is needed, get it. Yes blocking operation
    boolean newRouteSelection = false;
    if (selectedRoute == null && (routeSelection == null| |! routeSelection.hasNext())) { newRouteSelection =true;
      routeSelection = routeSelector.next();
    }

    List<Route> routes = null;
    synchronized (connectionPool) {
      if (transmitter.isCanceled()) throw new IOException("Canceled");

      if (newRouteSelection) {
        // Now that you have the IP address, try again to get it from the connection pool. Possible matches due to join merge. (routes passed, null passed above)
        routes = routeSelection.getAll();
        if (connectionPool.transmitterAcquirePooledConnection(
            address, transmitter, routes, false)) {
          foundPooledConnection = true; result = transmitter.connection; }}// Create a new connection
      if(! foundPooledConnection) {if (selectedRoute == null) {
          selectedRoute = routeSelection.next();
        }

        // Create a connection and assign it to this allocation immediately. This makes it possible
        // for an asynchronous cancel() to interrupt the handshake we're about to do.
        result = newRealConnection(connectionPool, selectedRoute); connectingConnection = result; }}// If the second attempt from the pool is successful, it ends because the connection in the pool is already connected to the server
    if (foundPooledConnection) {
      eventListener.connectionAcquired(call, result);
      return result;
    }

    // Set up a connection with the server through TCP + TLS handshake. Yes blocking operation
    result.connect(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis,
        connectionRetryEnabled, call, eventListener);
    connectionPool.routeDatabase.connected(result.route());// Remove from the failed list

    Socket socket = null;
    synchronized (connectionPool) {
      connectingConnection = null;
      // The last attempt is to get from the connection pool. Note that the last argument is true, which requires multiplexing (http2.0).
      If this is HTTP2.0, then in order to ensure multiplexing (since the handshake above is not thread-safe) the pool will be rechecked to see if the same connection exists at this time
      if (connectionPool.transmitterAcquirePooledConnection(address, transmitter, routes, true)) {
        // Close the connection we created and return the obtained connection
        result.noNewExchanges = true;
        socket = result.socket();
        result = transmitter.connection;

        // Then the newly connected route can be used for the next attempt
        nextRouteToTry = selectedRoute;
      } else {
        // If the last attempt fails, store the newly created connection to the connection pool
        connectionPool.put(result);
        transmitter.acquireConnectionNoEvents(result);// Attach link to transmitter
      }
    }
    closeQuietly(socket);// If the connection is not available, close it

    eventListener.connectionAcquired(call, result);
    return result;
  }
Copy the code

The code looks long and has been commented out, and the purpose of the method is to find connections to host the new data stream. The search sequence is allocated connections, connection pools, and new connections. The comb is as follows:

  1. The first attempt is to use the connection that has been assigned to the data flow. (A connection has been allocated, such as a redirection request, indicating that a connection was made last time.)
  2. If there are no available connections allocated, try to match them from the connection pool. Because there is no routing information, the matching conditions are as follows: Address is consistent — host, port, proxy, etc., and the matching connection can accept new data flows.
  3. If it is not obtained from the connection pool, it takes the routing information of a proxy (multiple routes, that is, multiple IP addresses) and tries to obtain the routing information from the connection pool again. In this case, it may be matched because of connection merging.
  4. If not, create a RealConnection instance, perform TCP + TLS handshake, and establish a connection with the server.
  5. At this point, to ensure the multiplexing of Http2.0 connections, a third match is made from the connection pool. Because the handshake process for newly established connections is non-thread-safe, it is possible that the same connection is newly stored in the connection pool.
  6. If a match is found for the third time, the existing connection is used to release the newly created connection. If no match is found, the new connection is pooled and returned.

The flow chart is as follows:

See here, kid, do you have a lot of question marks?

  • Initially, if you have an allocated connection, but it has been restricted to carry new data flows, how do you release it?
  • How is proxy routing information obtained?
  • How do I get connections from the connection pool? What difference does three times make?

It doesn’t matter. Take your time. Let’s look at the second question mark, proxy routing information retrieval.

RouteSelector

Let’s start with the Route class:

public final class Route {
  final Address address;
  final Proxy proxy;/ / agent
  final InetSocketAddress inetSocketAddress;// Connect to the destination address

  public Route(Address address, Proxy proxy, InetSocketAddress inetSocketAddress) {...this.address = address;
    this.proxy = proxy;
    this.inetSocketAddress = inetSocketAddress;
  }
Copy the code

Route: describes a specific Route to the server through proxy and InetSocketAddress.

  • Proxy Proxy: You can explicitly configure a proxy server for the client. Otherwise, the ProxySelector ProxySelector is used. Multiple proxies may be returned.
  • IP address: An IP address is required to open socket connections, whether direct or proxy. DNS service may return multiple IP address attempts.

In the findConnection method, routesElector.getall () is used to retrieve the routes set, while routeSelection is obtained via routesElector.next (). RouteSelector create inside the constructor of ExchangeFinder, i.e. routeSelector created in RetryAndFollowUpInterceptor, then we see routeSelector:

  RouteSelector(Address address, RouteDatabase routeDatabase, Call call,
      EventListener eventListener) {
    this.address = address;
    this.routeDatabase = routeDatabase;// Blacklist of routes in the connection pool (failed routes)
    this.call = call;
    this.eventListener = eventListener;

    resetNextProxy(address.url(), address.proxy());
  }
  // Collect proxy servers
  private void resetNextProxy(HttpUrl url, Proxy proxy) {
    if(proxy ! =null) {
      // If the proxy is specified, then this one is available. (which is configured when OkhttpClient is initialized)
      proxies = Collections.singletonList(proxy);
    } else {
      // If the OkhttpClient is initialized without ProxySelector, the default will be used.List<Proxy> proxiesOrNull = address.proxySelector().select(url.uri()); proxies = proxiesOrNull ! =null && !proxiesOrNull.isEmpty()
          ? Util.immutableList(proxiesOrNull)
          : Util.immutableList(Proxy.NO_PROXY);
    }
    nextProxyIndex = 0;
  }
Copy the code

Note that the RouteSelector constructor passes in the routeDatabase, which is a blacklist of routes that failed to connect (more on connection pooling later), and uses the resetNextProxy method to get the list of proxy servers: If no proxy is specified, the proxy list is obtained using ProxySelector. (If no ProxySelector is configured, the system default is used.) Next method:

  // Collect proxy routing information
  public Selection next(a) throws IOException {
    if(! hasNext()) {// There is the next agent
      throw new NoSuchElementException();
    }

    List<Route> routes = new ArrayList<>();
    while (hasNextProxy()) {
      Proxy proxy = nextProxy();
      // Route is assembled by traversing all DNS IP addresses of proxy
      for (int i = 0, size = inetSocketAddresses.size(); i < size; i++) {
        Route route = new Route(address, proxy, inetSocketAddresses.get(i));
        if (routeDatabase.shouldPostpone(route)) {// This route is in the blacklist and saved for last attempt
          postponedRoutes.add(route);
        } else{ routes.add(route); }}if(! routes.isEmpty()) {break; }}if (routes.isEmpty()) {
      // If no route is found, try the route listed above
      routes.addAll(postponedRoutes);
      postponedRoutes.clear();
    }
    //routes returns wrapped in Selection
    return new Selection(routes);
  }
Copy the code

The next method mainly obtains the Proxy information of the next Proxy, that is, multiple routes. Concrete is implemented in resetNextInetSocketAddress method, mainly to DNS for multiple proxy service address IP address, here is not opened, the concrete can be reference to the agency and OkHttp routing.

Okay, so that’s the second question mark. The other two question marks relate to the connection pool RealConnectionPool and the Connection RealConnection.

ConnectionPool

ConnectionPool, or ConnectionPool, is used to manage http1.1/http2.0 connection reuse to reduce network latency. HTTP requests with the same Address can share a connection, and a ConnectionPool implements connection multiplexing.

public final class ConnectionPool {
  final RealConnectionPool delegate;
  // The maximum number of idle connections is 5 and the maximum idle time is 5 minutes
  public ConnectionPool(a) {
    this(5.5, TimeUnit.MINUTES);
  }
  
  public ConnectionPool(int maxIdleConnections, long keepAliveDuration, TimeUnit timeUnit) {
    this.delegate = new RealConnectionPool(maxIdleConnections, keepAliveDuration, timeUnit);
  }
  // Return the number of free connections
  public int idleConnectionCount(a) {
    return delegate.idleConnectionCount();
  }
  // Returns the number of connections in the pool
  public int connectionCount(a) {
    return delegate.connectionCount();
  }
  // Close and remove all idle connections
  public void evictAll(a) { delegate.evictAll(); }}Copy the code

ConnectionPool looks easy to understand. The default configuration is a maximum number of idle connections of 5 and a maximum idle time of 5 minutes (i.e., a connection that is idle for more than 5 minutes is removed). We can also configure this differently when initializing okhttpClient. Note that the ConnectionPool is for the application layer and the actual manager is RealConnectionPool. RealConnectionPool is where connections are actually managed within OKHTTP.

The connection pool manages connections only by saving, retrieving, and deleting. The two question marks above are corresponding to deleting and retrieving respectively.

save

  private final Deque<RealConnection> connections = new ArrayDeque<>();
  
  private final Runnable cleanupRunnable = () -> {
    // Loop cleanup
    while (true) {
      / / clean up
      long waitNanos = cleanup(System.nanoTime());
      if (waitNanos == -1) return;
      if (waitNanos > 0) {
        long waitMillis = waitNanos / 1000000L;
        waitNanos -= (waitMillis * 1000000L);
        synchronized (RealConnectionPool.this) {
          try {
            // Wait until the next cleanup
            RealConnectionPool.this.wait(waitMillis, (int) waitNanos);
          } catch (InterruptedException ignored) {
          }
        }
      }
    }
  };
  / / save
  void put(RealConnection connection) {
    assert (Thread.holdsLock(this));
    if(! cleanupRunning) { cleanupRunning =true;
      executor.execute(cleanupRunnable);
    }
    connections.add(connection);
  }
Copy the code

Connections is a queue Deque for holding connections. See that the thread pool executor performed cleanupRunnable before the add, which means to cleanup the connection. As mentioned above, the connection pool has a limit on the maximum number of free connections and the maximum idle time, so it is necessary to clean up if the limit is not met. And notice that cleaning is a cycle and waiting for waitNanos before cleaning again. Let’s look at the cleanup method:

  long cleanup(long now) {
    int inUseConnectionCount = 0;// The number of connections in use
    int idleConnectionCount = 0;// Number of idle connections
    RealConnection longestIdleConnection = null;// The connection with the longest idle time
    long longestIdleDurationNs = Long.MIN_VALUE;// The longest idle time

    // Walk through the connection: find the connection to clean up, find the next time to clean up (has not reached the maximum idle time)
    synchronized (this) {
      for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) {
        RealConnection connection = i.next();

        // If the connection is in use, continue, the number of connections in use +1
        if (pruneAndGetAllocationCount(connection, now) > 0) {
          inUseConnectionCount++;
          continue;
        }
		// Number of idle connections +1
        idleConnectionCount++;

        // Assign the maximum idle time and the corresponding connection
        long idleDurationNs = now - connection.idleAtNanos;
        if(idleDurationNs > longestIdleDurationNs) { longestIdleDurationNs = idleDurationNs; longestIdleConnection = connection; }}If the maximum idle time is greater than 5 minutes or the number of idle times is greater than 5, remove and close the connection
      if (longestIdleDurationNs >= this.keepAliveDurationNs
          || idleConnectionCount > this.maxIdleConnections) {
        connections.remove(longestIdleConnection);
      } else if (idleConnectionCount > 0) {
        // else, return how much time is left to arrive 5 minutes, then wait this time to clean up again
        return keepAliveDurationNs - longestIdleDurationNs;
      } else if (inUseConnectionCount > 0) {
        // If there is no free connection, try to clean up after 5 minutes.
        return keepAliveDurationNs;
      } else {
        // No connection, no cleanup
        cleanupRunning = false;
        return -1; }}// Close the removed connection
    closeQuietly(longestIdleConnection.socket());

    // Close remove immediately after the next attempt to clean up
    return 0;
  }
Copy the code

The idea is clear:

  • If there are idle connections, if the longest idle time is greater than 5 minutes or the number of idle connections is greater than 5, remove and close the longest idle connections. If the idle number is less than 5 and the maximum idle time is less than 5 minutes, go back to the remaining 5 minutes and wait for that time to clean up.
  • If there are no free connections, wait 5 minutes and then try to clean up.
  • No connection does not clean up.

The judgment method of connection is using pruneAndGetAllocationCount we’ll look at:

  private int pruneAndGetAllocationCount(RealConnection connection, long now) {
    // Data flow on connection, weak reference list
    List<Reference<Transmitter>> references = connection.transmitters;
    for (int i = 0; i < references.size(); ) {
      Reference<Transmitter> reference = references.get(i);
      if(reference.get() ! =null) {
        i++;
        continue;
      }

      // To this point, the transmitter is leaked, which needs to be removed, and this connection can no longer carry new data flow (the reason for the leakage is the following message).
      TransmitterReference transmitterRef = (TransmitterReference) reference;
      String message = "A connection to " + connection.route().address().url()
          + " was leaked. Did you forget to close a response body?";
      Platform.get().logCloseableLeak(message, transmitterRef.callStackTrace);
      references.remove(i);
      connection.noNewExchanges = true;

      // The connection has no data flow due to leakage, so it can be removed immediately. So set the start idle time to 5 minutes before (cool!)
      if (references.isEmpty()) {
        connection.idleAtNanos = now - keepAliveDurationNs;
        return 0; }}// Returns the number of data streams on the connection, if greater than 0, it is in use.
    return references.size();
  }
Copy the code

The logical comments are indicated and are easy to understand. For CCD whose size is greater than 1, it indicates the data flow on the connection.

In addition, in findConnection, use connectionPool. Put (result) connection, and call transmitter. AcquireConnectionNoEvents method, Chou:

  void acquireConnectionNoEvents(RealConnection connection) {
    assert (Thread.holdsLock(connectionPool));
    if (this.connection ! =null) throw new IllegalStateException();
    this.connection = connection;
    connection.transmitters.add(new TransmitterReference(this, callStackTrace));
  }
Copy the code

First, attach the connection to the transmitter, indicating that the data flow transmitter is attached to the connection; For high-speed contact not short enough, the fpga adds a weak reference to the transmitter, for which connection.

Ok, so that’s it. Basically, you queue connections, and you start a cycle of trying to clean up expired connections.

take

  // Obtain the connection corresponding to address from the connection pool for TRANSMITTER. If routes are not empty, you may get an HTTP/2 connection because of connection merge (reuse).
  boolean transmitterAcquirePooledConnection(Address address, Transmitter transmitter,
      @Nullable List<Route> routes, boolean requireMultiplexed) {
    assert (Thread.holdsLock(this));
    for (RealConnection connection : connections) {
      if(requireMultiplexed && ! connection.isMultiplexed())continue;
      if(! connection.isEligible(address, routes))continue;
      transmitter.acquireConnectionNoEvents(connection);
      return true;
    }
    return false;
  }
Copy the code

Save the method name is put, but you find in the name of the method is not a get, transmitterAcquirePooledConnection mean for the transmitter from the connection pool for connection, actually represents a data flow transmitter, That’s an HTTP request. Note that in the traversal After judgment is also a transmitter acquireConnectionNoEvents method, namely to match to the assignment of the connection to the transmitter. So the method names are pretty vivid.

If requireMultiplexed is false, the isEligible method returns true, which indicates that the match was successful:

  // Determine whether the connection can host data flows to address
  boolean isEligible(Address address, @Nullable List<Route> routes) {
    // The connection does not accept new data streams, false
    if (transmitters.size() >= allocationLimit || noNewExchanges) return false;

    // Match the non-host part of address
    if(! Internal.instance.equalsNonHost(this.route.address(), address)) return false;

    // Match address's host, return true
    if (address.url().host().equals(this.route().address().url().host())) {
      return true; // This connection is a perfect match.
    }

    // There is no hostname match up to this point, but there is still a chance to return true: join merge
    1. The connection must be HTTP/2.
    if (http2Connection == null) return false;

    // 2. The IP address matches
    if (routes == null| |! routeMatchesAny(routes))return false;

    // 3. Certificate matching
    if(address.hostnameVerifier() ! = OkHostnameVerifier.INSTANCE)return false;
    if(! supportsUrl(address.url()))return false;

    // 4. Certificates pinning match
    try {
      address.certificatePinner().check(address.url().host(), handshake().peerCertificates());
    } catch (SSLPeerUnverifiedException e) {
      return false;
    }

    return true; // The caller's address can be carried by this connection.
  }

  private boolean routeMatchesAny(List<Route> candidates) {
    for (int i = 0, size = candidates.size(); i < size; i++) {
      Route candidate = candidates.get(i);
      if (candidate.proxy().type() == Proxy.Type.DIRECT
          && route.proxy().type() == Proxy.Type.DIRECT
          && route.socketAddress().equals(candidate.socketAddress())) {
        return true; }}return false;
  }
Copy the code

I’m just going to go through the pool, do a bunch of matching, address matching, and so on, and I’ve solved the third question mark here.

delete

  // Remove to close idle connections
  public void evictAll(a) {
    List<RealConnection> evictedConnections = new ArrayList<>();
    synchronized (this) {
      for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) {
        RealConnection connection = i.next();
        if (connection.transmitters.isEmpty()) {
          connection.noNewExchanges = true; evictedConnections.add(connection); i.remove(); }}}for(RealConnection connection : evictedConnections) { closeQuietly(connection.socket()); }}Copy the code

Traverses the connection pool, and if the data flow on the connection is empty, it is removed from the pool and closed.

We retrospect the Transmitter releaseConnectionNoEvents method, also is the first question, if the connection is no longer accepting new data flow, would call this method:

  // Remove transmitter from connection.
  @Nullable Socket releaseConnectionNoEvents(a) {
    assert (Thread.holdsLock(connectionPool));

    int index = -1;
    // Find index by traversing all data streams on the connection to which this data stream is attached
    for (int i = 0, size = this.connection.transmitters.size(); i < size; i++) {
      Reference<Transmitter> reference = this.connection.transmitters.get(i);
      if (reference.get() == this) {
        index = i;
        break; }}if (index == -1) throw new IllegalStateException();
	For CCD to remove the data flow
    RealConnection released = this.connection;
    released.transmitters.remove(index);
    this.connection = null;
	// If there is no more data streaming on the connection, it is idle (waiting to be cleaned up) and returns the socket to be closed
    if (released.transmitters.isEmpty()) {
      released.idleAtNanos = System.nanoTime();
      if (connectionPool.connectionBecameIdle(released)) {
        returnreleased.socket(); }}return null;
  }
Copy the code

The main idea is to try to release the connection, close the socket if there is no data flow on the connection and wait to be cleaned up.

Well, that concludes the analysis of connection pool management.

From connection lookup to connection pool management, that’s what the ConnectInterceptor is all about.

CallServerInterceptor

Oops, finally the last interceptor!

The request service interceptor actually reads and writes network IO — writing the HEADER and body of the HTTP request and reading the header and body of the response.

The ConnectInterceptor above mainly describes how to find connections and how connection pools manage them. After getting the connection, RealConnection’s newCodec method ExchangeCodec instance is called, then Exchange instance is created using ExchangeCodec instance and passed in the CallServerInterceptor. As mentioned above, Exchange Dec is responsible for IO reads and writes for requests and responses. Let’s take a look at the Exchange Dec creation process — RealConnection’s newCodec method:

  ExchangeCodec newCodec(OkHttpClient client, Interceptor.Chain chain) throws SocketException {
    if(http2Connection ! =null) {
      return new Http2ExchangeCodec(client, this, chain, http2Connection);
    } else {
      socket.setSoTimeout(chain.readTimeoutMillis());
      source.timeout().timeout(chain.readTimeoutMillis(), MILLISECONDS);
      sink.timeout().timeout(chain.writeTimeoutMillis(), MILLISECONDS);
      return new Http1ExchangeCodec(client, this, source, sink); }}Copy the code

Create Http2ExchangeCodec if http2Connection is not empty, or Http1ExchangeCodec if not. EstablishProtocol (); http2Connection: establishProtocol (); http2Connection: establishProtocol ();

  private void establishProtocol(ConnectionSpecSelector connectionSpecSelector,
      int pingIntervalMillis, Call call, EventListener eventListener) throws IOException {
    // For HTTP requests, an Http2 connection is opened if the configured Protocol contains protocol.h2_prior_knowledge
    if (route.address().sslSocketFactory() == null) {
      if (route.address().protocols().contains(Protocol.H2_PRIOR_KNOWLEDGE)) {
        socket = rawSocket;
        protocol = Protocol.H2_PRIOR_KNOWLEDGE;
        startHttp2(pingIntervalMillis);
        return;
      }

      socket = rawSocket;
      protocol = Protocol.HTTP_1_1;
      return;
    }
	// For HTTPS requests, the Protocol () is obtained based on the platform after the TLS handshake. If the Protocol is protocol.http_2, the Http2 connection is opened
    eventListener.secureConnectStart(call);
    connectTls(connectionSpecSelector);
    eventListener.secureConnectEnd(call, handshake);

    if(protocol == Protocol.HTTP_2) { startHttp2(pingIntervalMillis); }}private void startHttp2(int pingIntervalMillis) throws IOException {
    socket.setSoTimeout(0); // HTTP/2 connection timeouts are set per-stream.
    http2Connection = new Http2Connection.Builder(true)
        .socket(socket, route.address().url().host(), source, sink)
        .listener(this)
        .pingIntervalMillis(pingIntervalMillis)
        .build();
    http2Connection.start();
  }
Copy the code

Well, I won’t go into details here, but you can refer to HTTP 2.0 and OkHttp for further information. So at this point, Exchange Dec is created, then wrapped as Exchange, and finally passed in the CallServerInterceptor.

Here’s a look at the last interceptor:

public final class CallServerInterceptor implements Interceptor {
  private final boolean forWebSocket;

  public CallServerInterceptor(boolean forWebSocket) {
    this.forWebSocket = forWebSocket;
  }

  @Override public Response intercept(Chain chain) throws IOException {
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    Exchange exchange = realChain.exchange();// The exchange passed in by the last interceptor
    Request request = realChain.request();

    long sentRequestMillis = System.currentTimeMillis();
	// Write the request header
    exchange.writeRequestHeaders(request);

    boolean responseHeadersStarted = false;
    Response.Builder responseBuilder = null;
    // Request with body
    if(HttpMethod.permitsRequestBody(request.method()) && request.body() ! =null) {
      // If the request header contains "Expect: 100-continue", wait for the server to return a response with "HTTP/1.1 100 continue" before sending the request body.
      // If the response is not received (such as 4xx), the body is not sent.
      if ("100-continue".equalsIgnoreCase(request.header("Expect"))) {
        exchange.flushRequest();
        responseHeadersStarted = true;
        exchange.responseHeadersStart();
        responseBuilder = exchange.readResponseHeaders(true);
      }
	  // the responseBuilder is null, indicating that the server has returned 100 and is ready to continue sending the body
      if (responseBuilder == null) {
        if (request.body().isDuplex()) {// The default is false
          // Prepare a duplex body so that the application can send a request body later.
          exchange.flushRequest();
          BufferedSink bufferedRequestBody = Okio.buffer(
              exchange.createRequestBody(request, true));
          request.body().writeTo(bufferedRequestBody);
        } else {
          // If "Expect: 100-continue" is satisfied, write request body
          BufferedSink bufferedRequestBody = Okio.buffer(
              exchange.createRequestBody(request, false)); request.body().writeTo(bufferedRequestBody); bufferedRequestBody.close(); }}else {
       // If "Expect: 100-continue" is not met, the request is sent
        exchange.noRequestBody();
        if(! exchange.connection().isMultiplexed()) {// If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection
          // from being reused. Otherwise we're still obligated to transmit the request body to
          // leave the connection in a consistent state.exchange.noNewExchangesOnConnection(); }}}else {
     // Without body, the request is sent
      exchange.noRequestBody();
    }

	// The request is sent
    if (request.body() == null| |! request.body().isDuplex()) { exchange.finishRequest(); }// Callback reads the response header start event (if none above)
    if(! responseHeadersStarted) { exchange.responseHeadersStart(); }// Read the response header (if there is none above)
    if (responseBuilder == null) {
      responseBuilder = exchange.readResponseHeaders(false);
    }
	/ / build the response
    Response response = responseBuilder
        .request(request)
        .handshake(exchange.connection().handshake())
        .sentRequestAtMillis(sentRequestMillis)
        .receivedResponseAtMillis(System.currentTimeMillis())
        .build();

    int code = response.code();
    if (code == 100) {
      // The server returns another 100 and tries to get the real response ()
      response = exchange.readResponseHeaders(false)
          .request(request)
          .handshake(exchange.connection().handshake())
          .sentRequestAtMillis(sentRequestMillis)
          .receivedResponseAtMillis(System.currentTimeMillis())
          .build();

      code = response.code();
    }
	// The callback reads the response header
    exchange.responseHeadersEnd(response);
	// Get the response body
    if (forWebSocket && code == 101) {
      // Connection is upgrading, but we need to ensure interceptors see a non-null response body.
      response = response.newBuilder()
          .body(Util.EMPTY_RESPONSE)
          .build();
    } else {
      response = response.newBuilder()
          .body(exchange.openResponseBody(response))
          .build();
    }
	// Connection is close in the request header, indicating that the Connection will be closed after the request completes
    if ("close".equalsIgnoreCase(response.request().header("Connection"))
        || "close".equalsIgnoreCase(response.header("Connection"))) {
      exchange.noNewExchangesOnConnection();
    }
	//204 (no content), 205 (recharge content), body should be empty
    if ((code == 204 || code == 205) && response.body().contentLength() > 0) {
      throw new ProtocolException(
          "HTTP " + code + " had non-zero Content-Length: " + response.body().contentLength());
    }
	
    returnresponse; }}Copy the code

As you can see, the entire content is just that: the header and body for writing the HTTP request, and the header and body for reading the response. I won’t explain it here.

Here we can see that both the request and the response are written using the corresponding Exchange method. As mentioned above, Exchange is understood to be a wrapper around Exchange Dec. In addition to the event callback and some parameter fetching, the core work is done by Exchange dec objects, and Exchange ecodec actually uses Okio. Okio actually uses sockets.

The implementation classes for Exchange Ecodec are http1Exchange Ecodec for Http1.1 and Http2Exchange ecodec for Http2.0. Where HTTP2Exchange Dec uses the concept of data frames in Http2.0 to complete the read and write of the request response. About http1Exchange DEC, http2Exchange ECOdec specific implementation principle involves OKIO which will not be expanded.

Last but not least, the Intercept method of the CallServerInterceptor does not call the proceed method of the connector Chain because it is the last interceptor.

Well, that’s it for the last interceptor!

conclusion

This article analyzes the functions and principles of ConnectInterceptor and CallServerInterceptor. The ConnectInterceptor is responsible for getting connections, which involves the concept of connection pools. The CallServerInterceptor is a true NETWORK I/O read and write. The ConnectInterceptor is the core of Okhttp. In combination with the previous article, we have analyzed all the interceptors within Okhttp, and finally presented the overall architecture of Okhttp.

At this point, the source code parsing part of Okhttp is really over. It’s been a long process! With these four articles, from usage to workflow to specific interceptors, you should be familiar with Okhttp. There is also a fifth final chapter planned to cover some of the most frequently asked questions and advanced uses of Okhttp, so stay tuned!


Thanks and Reference:

Okhttp source code parsing

Connect to OKHttp3 ConnectInterceptor

OkHttp source deep parsing

OkHttp source code parsing (3) — Proxy and routing

Data interchange stream HTTPCodec

Last but not least, welcome to leave a comment, if you like this series, or if you think it is well written, please help to like, favorites and forward, thank you!

Welcome to my public account: