Hi, I’m Halliday. This is an introduction to the open source project okHTTP Network Library, which will take you from tree trunk to twig and analyze some technical frameworks.

The passage is about 3800 words and takes about 10 minutes to read. If individual big picture is fuzzy, can go to personal site to read.

An overview of

The source code is based on 3.14.9, the latest version of the Java version

First of all on the responsibility diagram, the name of each class can basically see the name of the meaning, do not translate, directly take off ~

The trunk

Let’s take a look at the flight in general,

Okay, so let’s go into the code, introduce the dependencies,

implementation 'com. Squareup. Okhttp3: okhttp: 3.14.9'
Copy the code

Simple use (only analyze asynchronous requests, similar to synchronous requests),

class OkhttpActivity extends AppCompatActivity {
    // Create an airport, usually a singleton
    OkHttpClient mClient = new OkHttpClient();

    void onCreate(Bundle savedInstanceState) {
        String url = "xxx";
        // The builder pattern creates the Request Request and sets the URL (where to fly).
        Request request = new Request.Builder().url(url).build();
        // After knowing the destination, create a Call session (for this flight)
        Call call = mClient.newCall(request);
        // Asynchronously request to join (the plane enters the ready runway)
        call.enqueue(new Callback() {
            @Override
            public void onFailure(Call call, IOException e) {
                // This flight has failed
            }

            @Override
            public void onResponse(Call call, Response response) throws IOException {
                // Arrive destination!
                // Body can only be fetched once, Response will be closed, so use temporary variables to receive
                String result = response.body().string();
                // The callback is in the child thread. If you want to manipulate the UI, you need to switch back to the main threadrunOnUiThread(() -> { mBinding.tv.setText(result); }); }}); }}Copy the code

OkHttpClient and Request can be created using builder mode, of course, if OkHttpClient does not need to be configured, just new. Knowing where you start and where you end, you can create a flight Call,

//OkHttpClient.java
Call newCall(Request request) {
    return RealCall.newRealCall(this, request, false);
}

//RealCall.java
RealCall newRealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
    RealCall call = new RealCall(client, originalRequest, forWebSocket);
    // That's the Transmitter you need to call the captain
    call.transmitter = new Transmitter(client, call);
    return call;
}
Copy the code

So an instance of a visible Call is a RealCall, where once the flight is created, it enters the ready runway,

//RealCall.java
void enqueue(Callback responseCallback) {
    // The captain calls back eventListener to report the status of the flight in real time
    transmitter.callStart();
    // Encapsulate the Callback with AsyncCall, and the airport dispatcher will arrange the Callback to enter the ready runway
    client.dispatcher().enqueue(new AsyncCall(responseCallback));
}
Copy the code

AsyncCall is a Runnable, and the run method calls the execute method,

//AsyncCall.java
void execute(a) {
    try {
        // Get the Response and reach the destination
        Response response = getResponseWithInterceptorChain();
        Response. IsSuccessful () is the real success.
        responseCallback.onResponse(RealCall.this, response);
    } catch (IOException e) {
        / / fail
        responseCallback.onFailure(RealCall.this, e);
    } catch (Throwable t) {
        cancel();
        IOException canceledException = new IOException("canceled due to " + t);
        canceledException.addSuppressed(t);
        / / fail
        responseCallback.onFailure(RealCall.this, canceledException);
        throw t;
    } finally {
        // To end the flight, callsPerHost minus 1, runningAsyncCalls removes AsyncCall
        client.dispatcher().finished(this); }}Copy the code

AsyncCall has an atomic counter,

// How many session calls are currently available per host (domain name)
volatile AtomicInteger callsPerHost = new AtomicInteger(0);
Copy the code

There are two default Max values in Dispatcher,

int maxRequests = 64;  // The maximum number of simultaneous requests is 64
int maxRequestsPerHost = 5;  // The maximum number of simultaneous requests per host is 5
Copy the code

What does that mean? It can be understood that the airport’s control center, limited to a maximum of 64 flights at the same time; There can only be a maximum of 5 flights to the same city at the same time, why the city limit? It has to do with reuse of connection pools, which we’ll talk about later. Let’s take Shanghai as an example.

Let’s see what the enqueue method does,

//Dispatcher.java
enqueue(AsyncCall call) {
    synchronized (this) {
        // The aircraft enters the ready runway
        readyAsyncCalls.add(call);
        if(! call.get().forWebSocket) {// Find the AsyncCall to fly to Shanghai
            AsyncCall existingCall = findExistingCallWithHost(call.host());
            // Use the counter callsPerHost in Shanghai to count flights in the same city
            if(existingCall ! =null) call.reuseCallsPerHostFrom(existingCall); }}// The aircraft enters the take-off runway
    promoteAndExecute();
}
Copy the code

Follow up promoteAndExecute,

//Dispatcher.java
boolean promoteAndExecute(a) {
    // Collect AsyncCall that can be executed
    List<AsyncCall> executableCalls = new ArrayList<>();
    boolean isRunning;
    synchronized (this) {
        for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
            AsyncCall asyncCall = i.next();
			//64 take-off runways are full. Jump out
            if (runningAsyncCalls.size() >= maxRequests) break;
            // There are up to 5 flights to Shanghai, just stay on the ready runway and skip
            if (asyncCall.callsPerHost().get() >= maxRequestsPerHost) continue;
			// Leave the ready runway
            i.remove();
            // Shanghai flight counter +1
            asyncCall.callsPerHost().incrementAndGet();
            // Save AsyncCall
            executableCalls.add(asyncCall);
            // Enter the take-off runway
            runningAsyncCalls.add(asyncCall);
        }
        isRunning = runningCallsCount() > 0;
    }
	// Take off all AsyncCall that can be executed
    for (int i = 0, size = executableCalls.size(); i < size; i++) {
        AsyncCall asyncCall = executableCalls.get(i);
        asyncCall.executeOn(executorService());
    }
    return isRunning;
}
Copy the code

ExecutorService () returns a thread pool,

//Dispatcher.java
synchronized ExecutorService executorService(a) {
    if (executorService == null) {
        executorService =
            new ThreadPoolExecutor(0, Integer.MAX_VALUE, 
                                   60, TimeUnit.SECONDS,
                                   new SynchronousQueue<>(), 
                                   Util.threadFactory("OkHttp Dispatcher".false));
    }
    return executorService;
}
Copy the code

The number of core threads is 0. After 60 seconds of idle time, all threads will be emptied. There is no limit to the number of threads, but there are already dispatchers that limit the number of requests.

Follow up on the executeOn method,

//AsyncCall.java
void executeOn(ExecutorService executorService) {
    boolean success = false;
    try {
        // The thread pool runs Runnable, executes run, and calls the previously mentioned asynccall.execute
        executorService.execute(this);
        success = true;
    } catch (RejectedExecutionException e) {
        InterruptedIOException ioException = new InterruptedIOException("executor rejected");
        ioException.initCause(e);
        transmitter.noMoreExchanges(ioException);
        // Fail callback
        responseCallback.onFailure(RealCall.this, ioException);
    } finally {
        if(! success) {// End the flight
            client.dispatcher().finished(this); }}}Copy the code

As you can see, the callback is done in the child thread, so the Activity has to switch back to the main thread to manipulate the UI. At this point, the core process is over.

twigs

The interceptor chain

Get the Response of the place in front of the getResponseWithInterceptorChain, go in and see,

//RealCall.java
Response getResponseWithInterceptorChain(a) throws IOException {
    List<Interceptor> interceptors = new ArrayList<>();
    // Add a custom interceptor
    interceptors.addAll(client.interceptors());
    // Add a default interceptor
    interceptors.add(new RetryAndFollowUpInterceptor(client));
    interceptors.add(new BridgeInterceptor(client.cookieJar()));
    interceptors.add(new CacheInterceptor(client.internalCache()));
    interceptors.add(new ConnectInterceptor(client));
    if(! forWebSocket) {// Add a custom network interceptor (after the ConnectInterceptor, when the network connection is ready)
        interceptors.addAll(client.networkInterceptors());
    }
    // Add default interceptors, 4+1=5
    interceptors.add(new CallServerInterceptor(forWebSocket));
    // Create a chain of interceptors
    Interceptor.Chain chain =
        new RealInterceptorChain(interceptors, transmitter, null.0,
                                 originalRequest, this, client.connectTimeoutMillis(),
                                 client.readTimeoutMillis(), client.writeTimeoutMillis());
    / / release
    Response response = chain.proceed(originalRequest);
    return response;
}
Copy the code

The interceptor chain is based on the chain of responsibility pattern, that is, different interceptors have different responsibilities, and the interceptors on the chain will be processed one by one, before the Request is sent, before the Response is returned, insert some custom logic, so that the requirements can be easily extended. Of course, the responsibility chain mode also has shortcomings, that is, as long as one link is blocked, it will slow down the overall operation (efficiency); Also, the longer the chain, the more intermediate objects (memory) are generated.

So let’s do the proceed method,

//RealInterceptorChain.java
Response proceed(Request request, Transmitter transmitter,Exchange exchange)
    throws IOException {
    // Pass index + 1 to access the next interceptor
    RealInterceptorChain next = 
        new RealInterceptorChain(interceptors, transmitter, exchange,
                                 index + 1, request, call, connectTimeout, 
                                 readTimeout, writeTimeout);
    Interceptor interceptor = interceptors.get(index);
    // Execute the first interceptor, passing next
    Response response = interceptor.intercept(next);
    // After all interceptors are processed, we can return Response
    return response;
}
Copy the code

Below is a brief analysis of the capabilities of each interceptor.

A, RetryAndFollowUpInterceptor:

Responsible for retries and redirects.

// The maximum number of retries
static final int MAX_FOLLOW_UPS = 20;

Response intercept(Chain chain) throws IOException {
    Request request = chain.request();
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    Transmitter transmitter = realChain.transmitter();
    int followUpCount = 0;
    while (true) {
        // The captain prepares a connection for the Request
        // If the host, port, and protocol are the same, the connection can be reused
        transmitter.prepareToConnect(request);
        // Allow the interceptor to execute
        Response response = realChain.proceed(request, transmitter, null);
        // The next interceptor executes, gets the Response, parses it to see if it needs a retry or redirect, and returns a new Request if it does
        Request followUp = followUpRequest(response, route);
        if (followUp == null) {
            // The new Request is empty and the response is returned directly
            return response;
        }
        RequestBody followUpBody = followUp.body();
        if(followUpBody ! =null && followUpBody.isOneShot()) {
            // Return response if RequestBody has a value and is only allowed to be called once
            return response;
        }
        if (++followUpCount > MAX_FOLLOW_UPS) {
            // Maximum number of retries, end
            throw new ProtocolException("Too many follow-up requests: " + followUpCount);
        }
        // Assign a new request to request and continue the looprequest = followUp; }}Copy the code

Among them, the method followUpRequest will do corresponding processing according to different Response codes of Response, so do not follow.

Second, the BridgeInterceptor:

Bridging, which turns application requests into network requests and network responses into application responses, basically handles some network headers, simplifying the application layer logic.

Response intercept(Chain chain) throws IOException {
    Request userRequest = chain.request();
    Request.Builder requestBuilder = userRequest.newBuilder();
    RequestBody body = userRequest.body();
    if(body ! =null) {
        requestBuilder.header("Content-Type", contentType.toString());
        // Processing content-length, transfer-encoding
        / /...
    }
    // Processing Host, Connection, accept-encoding, Cookie, user-agent,
    / /...
    // Release the new request and pass it down to get the Response
    Response networkResponse = chain.proceed(requestBuilder.build());
    Response.Builder responseBuilder = networkResponse.newBuilder()
        .request(userRequest);
	// Handle the content-encoding, content-length, content-type, gzip of the new Response
    // Return a new Response
    return responseBuilder.build();
}
Copy the code

If the client does not set Accept-Encoding=gzip, okHTTP will automatically enable gzip and unzip the data. If the client has gzip enabled, it will need to unzip the data returned by the server itself.

Three, CacheInterceptor:

Responsible for managing the cache, using Okio to read and write from the cache.

InternalCache cache;

Response intercept(Chain chain) throws IOException {
    // Get the candidate cacheResponse cacheCandidate = cache ! =null
        ? cache.get(chain.request())
        : null;
    // Create a cache policy
    CacheStrategy strategy = 
        new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get();
    // Network request
    Request networkRequest = strategy.networkRequest;
    / / cache the Response
    Response cacheResponse = strategy.cacheResponse;
    // If both the network request and the cache Response are null
    if (networkRequest == null && cacheResponse == null) {
        // Return a Response of 504
        return new Response.Builder().code(504).xxx.build();
    }
    // If the network is not used, return the cache directly
    if (networkRequest == null) {
        return cacheResponse.newBuilder()
            .cacheResponse(stripBody(cacheResponse)).build();
    }
    // Let go, go back
    Response networkResponse = chain.proceed(networkRequest);
    if(cacheResponse ! =null) {
        // Cache response code 304 is obtained, indicating that the cache is available
        if (networkResponse.code() == HTTP_NOT_MODIFIED) {
            Response response = cacheResponse.newBuilder().xxx.build();
            // Update the cache and return
            cache.update(cacheResponse, response);
            returnresponse; }}// Get the network Response
    Response response = networkResponse.newBuilder().xxx.build();
    // Write to cache, return
    cache.put(response);
    return response;
}
Copy the code

The CacheStrategy is discussed in the caching section.

Four, ConnectInterceptor:

Responsible for creating Connection.

Response intercept(Chain chain) throws IOException {
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    Request request = realChain.request();
    Transmitter transmitter = realChain.transmitter();
    booleandoExtensiveHealthChecks = ! request.method().equals("GET");
    // The captain creates an Exchange
    Exchange exchange = transmitter.newExchange(chain, doExtensiveHealthChecks);
    // Pass to the next interceptor
    return realChain.proceed(request, transmitter, exchange);
}
Copy the code

The newExchange method is expanded in the Connection pooling section.

Fifth, CallServerInterceptor:

Responsible for writing requests and reading responses.

Response intercept(Chain chain) throws IOException {
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    Exchange exchange = realChain.exchange();
    Request request = realChain.request();
    // Write the request header
    exchange.writeRequestHeaders(request);
    Response.Builder responseBuilder = null;
    // Handle the request body...
    // Read the response header
    responseBuilder = exchange.readResponseHeaders(false);
    // Build the response
    Response response = responseBuilder
        .request(request)
        .handshake(exchange.connection().handshake())
        .sentRequestAtMillis(sentRequestMillis)
        .receivedResponseAtMillis(System.currentTimeMillis())
        .build();
    // Read the response body
    response = response.newBuilder()
        .body(exchange.openResponseBody(response))
        .build();
    return response;
}
Copy the code

The cache

The implementation of caching is based on the request and response headers. The CacheStrategy is the CacheStrategy. The CacheInterceptor interceptor determines whether to use the network or cache based on its access to networkRequest and cacheResponse cacheResponse.

//CacheStrategy.java
// The internal class factory produces the CacheStrategy
static class Factory {
    // Some fields: servedDate, lastModified, Expires, etag...
    Factory(long nowMillis, Request request, Response cacheResponse) {
        this.nowMillis = nowMillis;
        this.request = request;
        this.cacheResponse = cacheResponse;
        if(cacheResponse ! =null) {
            // Parses cacheResponse and assigns parameters to its own member variables
            this.sentRequestMillis = cacheResponse.sentRequestAtMillis();
            / /...
            Headers headers = cacheResponse.headers();
            for (int i = 0, size = headers.size(); i < size; i++) {
                String fieldName = headers.name(i);
                String value = headers.value(i);
                if ("Date".equalsIgnoreCase(fieldName)) {
                    servedDate = HttpDate.parse(value);
                    servedDateString = value;
                } else if (xxx){
                    / /...}}}}CacheStrategy get(a) {
        CacheStrategy candidate = getCandidate();
        if(candidate.networkRequest ! =null && request.cacheControl().onlyIfCached()) {
            // Return the policy to the interceptor
            return new CacheStrategy(null.null);
        }
        return candidate;
    }

    CacheStrategy getCandidate(a) {
        // According to the header field, get the various policies, and pass them to the interceptor...
        return newCacheStrategy(xxx); }}Copy the code

In getCandidate, you will get various policies based on the header field, which will be handed over to the interceptor for interested readers to read.

So how does the cache write to disk? Follow up the InternalCache interface, which is implemented in the Cache class,

//Cache.java
InternalCache internalCache = new InternalCache() {
    @Override public Response get(Request request) throws IOException {
        return Cache.this.get(request);/ / read
    }

    @Override public CacheRequest put(Response response) throws IOException {
        return Cache.this.put(response);/ / write
    }

    / /...
};

Response get(Request request) {
    String key = key(request.url()); / / key
    DiskLruCache.Snapshot snapshot; // Cache snapshots
    Entry entry;
    snapshot = cache.get(key); // Cache is the DiskLruCache of okHTTP
    if (snapshot == null) {
        return null; // No cache, return directly
    }
    // The snapshot gets the input stream, which is used to create the cache entry
    entry = new Entry(snapshot.getSource(ENTRY_METADATA));
    // Get a response
    Response response = entry.response(snapshot);
    return response;
}

CacheRequest put(Response response) {
    String requestMethod = response.request().method();
    if(! requestMethod.equals("GET")) {
        // Not a GET request, not cached
        return null;
    }
    // Wrap it as a log entry
    Entry entry = new Entry(response);
    DiskLruCache.Editor editor = null;
    editor = cache.edit(key(response.request().url()));
    // Write to the cache
    entry.writeTo(editor);
    return new CacheRequestImpl(editor);
}
Copy the code

Okhttp’s DiskLruCache manages the disk cache according to the least recently used algorithm. It has several similarities to Glide’s DiskLruCache, such as the same log processing and an internal thread pool to clean up the disk, but okHTTP is useful for Okio. Interested readers can refer to the next okhttp3.. Internal cache. DiskLruCache and com. Bumptech. Glide. DiskLruCache. DiskLruCache.

Note: The cache is disabled by default and needs to be enabled by itself:

new OkHttpClient.Builder()
    .cache(new Cache(new File(MyApp.APP.getCacheDir(), "okhttp_cache"), / / path
                     50L * 1024L * 1024L)) / / size
    .build();
Copy the code

The connection pool

Remember the Transmitter? We called him the captain earlier. He is the bridge between the application and the network, managing the connection, request, response and flow. In the interceptors section we know:

Adjust the transmitter in RetryAndFollowUpInterceptor. PrepareToConnect; Preparing a connection

ConnectInterceptor has been adjusted as transmitter. NewExchange; Create a switch

Here are a few additional concepts:

Connection, implemented as RealConnection: Connection, abstract concept, internally maintained Socket

ConnectionPool: holds RealConnectionPool: a pool of connections that manages the reuse of connections

Exchange: Exchange (manages requests and responses, holds Exchange Dec)

ExchangeCodec: Codec used to encode requests and decode responses. Implementations include Http1ExchangeCodec and Http2ExchangeCodec

HTTP 1.1: Introduction of keep-alive mechanism, support for connection alive, multiple requests can reuse a connection, but the requests are serial

HTTP 2.0: Support for multiplexing, multiple requests for a connection can be parallel

See RealConnectionPool first,

//RealConnectionPool.java
// Thread pool used to clean expired connections. A connection pool can run a maximum of one thread
Executor executor =
    new ThreadPoolExecutor(0,Integer.MAX_VALUE,60L,TimeUnit.SECONDS,
                           new SynchronousQueue<>(), 
                           Util.threadFactory("OkHttp ConnectionPool".true));
// The maximum number of idle connections per IP address is 5
int maxIdleConnections;
// The idle connection lifetime is 5 minutes
long keepAliveDurationNs;
// Connection queue
Deque<RealConnection> connections = new ArrayDeque<>();

// Get the connection
boolean transmitterAcquirePooledConnection(Address address, Transmitter transmitter,
                                           List<Route> routes, boolean requireMultiplexed) {
    for (RealConnection connection : connections) {
        // Require multiplexing and skip connections that do not support multiplexing
        if(requireMultiplexed && ! connection.isMultiplexed())continue;
        // If not, skip
        if(! connection.isEligible(address, routes))continue;
        // Assign a connection to the captain
        transmitter.acquireConnectionNoEvents(connection);
        return true;
    }
    return false;
}

// To remove the connection, executor runs cleanupRunnable and calls this method
long cleanup(long now) {
    // Find the connection that was removed, or the time of the next removal
    synchronized (this) {
        for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) {
            / /...
            if(idleDurationNs > longestIdleDurationNs) { longestIdleDurationNs = idleDurationNs; longestIdleConnection = connection; }}if (longestIdleDurationNs >= this.keepAliveDurationNs
            || idleConnectionCount > this.maxIdleConnections) {
            // Remove the connectionconnections.remove(longestIdleConnection); }}/ / close the Socket
    closeQuietly(longestIdleConnection.socket());
}
Copy the code

RealConnection code is a bit heavy, just know that it maintains the Socket internally.

As mentioned earlier, the number of simultaneous requests to the same host is limited to maxRequestsPerHost = 5. Why is this? Requests with the host can share a connection, so presumably to limit traffic? For example, if there are no limit to the number of flights to Shanghai at the same time, will the Shanghai airport be crowded? If you know the answer, please leave a comment

summary

Okhhttp has the following advantages:

  • Simple to use, the design of the interceptor chain is easy to extend
  • Request failure can automatically reconnect and try the host’s other IP, can redirect
  • Gzip can be handled automatically
  • Local caching can avoid duplicate requests
  • Requests to a host can share a Socket, which is maintained by a Connection. The ConnectionPool manages the reuse of connections, avoiding frequent creation and destruction of connections

The end of the

Okhhttp has many wonderful details, such as cookie, route, DNS, TLS and other processing, which is not mentioned in this article, we still need to learn from the source code ah. While looking at the source code process, Hardy also found a lot of things he didn’t understand, such as various protocols and standards. It was also a good opportunity to supplement his knowledge of the network and take off

Series of articles:

  • Glide from the Series “Never Forget”

The resources

  • GitHub & 3.x documentation
  • Network request framework OkHttp3 full solution series (four) interceptor detail 2