OkHttp Usage Analysis – WebSocket article

Let’s take a look at how to complete a WebSocket request using OKhtttp:

MOkHttpClient = new okHttpClient.builder ().connectTimeout(9 * 10, timeunit.seconds).build(); Request request = new Request.Builder().url(BASE_URL).build(); mWebSocket = mOkHttpClient.newWebSocket(request, this);Copy the code

In particular, open okHttpClient.class and look for the newWebSocket () method:

  /**
   * Uses {@code request} to connect a new web socket.
   */
  @Override public WebSocket newWebSocket(Request request, WebSocketListener listener) {
    RealWebSocket webSocket = new RealWebSocket(request, listener, new Random());
    webSocket.connect(this);
    return webSocket;
  }
Copy the code

The WebSocketListener object is passed in as well as the WebSocketListener. The WebSocketListener object will be described later, but the main flow is the connect() method of realWebSocket.class:

 client = client.newBuilder()
        .protocols(ONLY_HTTP1)
        .build();
Copy the code

We all know that for normal requests the client needs to be bulid, so here we get the OkHttpClient and create it again. Why create it in the first place? Look at this method: protocols(ONLY_HTTP1),

 private static final List<Protocol> ONLY_HTTP1 = Collections.singletonList(Protocol.HTTP_1_1);
Copy the code

Step 2:

 final Request request = originalRequest.newBuilder()
        .header("Upgrade"."websocket")
        .header("Connection"."Upgrade")
        .header("Sec-WebSocket-Key", key)
        .header("Sec-WebSocket-Version"."13")
        .build();
Copy the code

For processing the header of the Request object,

Step 3:

 call = Internal.instance.newWebSocketCall(client, request);
Copy the code

Get the WebSocket call object from OkHttpClient. This Internal. Instance is implemented in OkHttpClient.

 @Override public Call newWebSocketCall(OkHttpClient client, Request originalRequest) {
        return new RealCall(client, originalRequest, true);
      }
Copy the code

Step 4: Search the original enqueue () method using the realCall.class enqueue () method, which is an enqueue method, and it is an asynchronous method. This means that webSocket does not respond to the callback until the connection is established. And if it’s a long connection then the thread stays in the thread pool and never gets released.

call.enqueue(new Callback() {
      @Override public void onResponse(Call call, Response response) {
        try {
          checkResponse(response);
        } catch (ProtocolException e) {
          failWebSocket(e, response);
          closeQuietly(response);
          return;
        }
Copy the code

At this point, the callback is ready to execute, so switch to RealCall

 @Override public void enqueue(Callback responseCallback) {
    synchronized (this) {
      if (executed) throw new IllegalStateException("Already Executed");
      executed = true;
    }
    captureCallStackTrace();
    client.dispatcher().enqueue(new AsyncCall(responseCallback));
  }
Copy the code

I actually have a few questions about the okHTTP synchronization request: 1 I didn’t create the thread in the first place, so is the request in the main thread? 2 If the request is synchronous then if multiple requests at the same time is not if the previous request is executing the later request in the waiting state? That’s where the dispatcher () thread pool comes in.

Enqueue () : enqueue () : enqueue (); RealCall creates AsyncCall. Ok websockets use AsyncCall, and we need to understand that this is just an initial socket. All occur in one thread of the thread pool.

Problem 1: OK websockets are asynchronous, do not block the main thread, and do not require a separate child thread to create a connection. Question 2: Does it block First let’s look again at the thread pool structure of the executorService. Although the dispatcher thread pool has been introduced in the synchronization section, it is still not clear enough in my opinion: first, this is the structure of the Dispatcher thread pool

executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
          new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher".false));
Copy the code

I’ll elaborate here: First, SynchronousQueue is an uncached blocking queue. What does that mean? When there is an element in the queue, no further operations are allowed until the element is removed.

Note 1: It is a blocking queue in which each PUT must wait for a take and vice versa. Synchronous queues do not have any internal capacity, or even the capacity of a queue. Note 2: It is thread-safe and blocks. Note 3: Null elements are not allowed. Note 4: The fair sorting strategy is between the threads calling put or take. Fair sorting policies you can look up fair policies in the ArrayBlockingQueue. So this solves a conundrum that has plagued me for years: How many requests can OKHTTP execute at the same time? * * * * If corePoolSize=0 and maximumPoolSize= integer. MAX_VALUE, * * if corePoolSize=0 and maximumPoolSize=Integer.MAX_VALUE, * * if corePoolSize=0 and maximumPoolSize=Integer.MAX_VALUE, * * if corePoolSize=0 and maximumPoolSize=Integer. KeepAliveTime =60s, and a null blocking queue SynchronousQueue, so that a new thread is created after a task is submitted. Threads idle longer than 60 seconds will be destroyed:

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }


Copy the code

To use a figurative metaphor, a passer creates a runnable to receive tasks that are passed in from the main thread.

Here is the Dispatcher’s asynchronous start method:

 synchronized void enqueue(AsyncCall call) {
    if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
      runningAsyncCalls.add(call);
      executorService().execute(call);
    } else{ readyAsyncCalls.add(call); }}Copy the code

Each execution is recorded. When a Call is added to the executor, the task is placed in the SynchronousQueue according to 2, waiting for the previous request to be fetched before the subsequent request can be executed. Here maxRequests is set to 64. Anything beyond 64 will be put into readyAsyncCalls. How do you pass between ready and running? @override Public Response Execute () throws IOException. AsyncCall is called when asynchronous:

@Override protected void execute() {
      boolean signalledCallback = false;
      try {
        Response response = getResponseWithInterceptorChain();
        if (retryAndFollowUpInterceptor.isCanceled()) {
          signalledCallback = true;
          responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
        } else {
          signalledCallback = true;
          responseCallback.onResponse(RealCall.this, response);
        }
      } catch (IOException e) {
        if (signalledCallback) {
          // Do not signal the callback twice!
          Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
        } else{ responseCallback.onFailure(RealCall.this, e); } } finally { client.dispatcher().finished(this); }}Copy the code

The callback to the event is already in place, and the collection needs to be seen here.

private void promoteCalls() {
    if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
    if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.

    for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
      AsyncCall call = i.next();

      if (runningCallsForHost(call) < maxRequestsPerHost) {
        i.remove();
        runningAsyncCalls.add(call);
        executorService().execute(call);
      }

      if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
    }
  }
Copy the code

Here comes the question again. Compare Ok with Volley.

Websocket article:

A webSocket link is created in a child thread. If the link is not closed, the child thread will always exist. Before the link is created, we create a realWebSocket. class.

public RealWebSocket(Request request, WebSocketListener listener, This.writerrunnable = new Runnable() {@override public void run() {try {while (writeOneFrame()) { } } catch (IOException e) { failWebSocket(e, null); }}}; }Copy the code

Here we create a write thread, writerRunnable and look at the connect() method: this time just look at the call callback. According to the current flow, the link is successful, the callback is successful, and the onResponse method of the Call is:

try { listener.onOpen(RealWebSocket.this, response); String name = "OkHttp WebSocket " + request.url().redact(); initReaderAndWriter(name, pingIntervalMillis, streams); streamAllocation.connection().socket().setSoTimeout(0); loopReader(); } catch (Exception e) { failWebSocket(e, null); }}Copy the code

InitReaderAndWriter () initializes the writer. Is this in preparation for interacting with the server?

 this.writer = new WebSocketWriter(streams.client, streams.sink, random);
 this.executor = new ScheduledThreadPoolExecutor(1, Util.threadFactory(name, false));
Copy the code

A Writer is prepared, and a scheduled task (ping — pong) is prepared. What does the method do? private void runWriter() { assert (Thread.holdsLock(this));

if (executor != null) {
  executor.execute(writerRunnable);
}
  }
Copy the code

Ha ha is for the heartbeat link to prepare ah, regularly inform the server I am still in Ha.

2. LoopReader () starts rotation to read messages (be ready to receive messages from the server)

public void loopReader() throws IOException { while (receivedCloseCode == -1) { // This method call results in one or more onRead* methods being called on this thread. reader.processNextFrame(); }}Copy the code

This is not just a loop calling reader.processNextFrame();

/** * Process the next protocol frame. * * <ul> * <li>If it is a control frame this will result in a single call to {@link FrameCallback}. * <li>If it is a message frame this will result in a single call to {@link * FrameCallback#onReadMessage}. If the message spans multiple frames, each interleaved * control frame will result in a corresponding call to {@link FrameCallback}. * </ul> */ void processNextFrame() throws IOException { readHeader(); if (isControlFrame) { readControlFrame(); } else { readMessageFrame(); }}Copy the code

A control frame will have a single callback: FrameCallback; a message frame will have a single callback: FrameCallback#onReadMessage

Seeing that the Websocket is basically done, all that’s left is to call the listener. ~~~~~~~~~~~~~~ Supplementary section ~~~~~~~~~~~~~~~

Thank you friends careful guidance, because write this article relatively early (details forget a lot, embarrassing) restore the question: “the framework will automatically send ping packets? How do I set the interval?”

Yes, and OkHttpClient also supports setting heartbeat intervals:

 // Promote the HTTP streams into web socket streams.
        StreamAllocation streamAllocation = Internal.instance.streamAllocation(call);
Copy the code

The number of times a pong is pinged is also recorded:

  initReaderAndWriter(name, pingIntervalMillis, streams);
Copy the code

That’s right, the initializer is traced again, and when initializing the reader there is this sentence:

      if(pingIntervalMillis ! = 0) { executor.scheduleAtFixedRate( new PingRunnable(), pingIntervalMillis, pingIntervalMillis, MILLISECONDS); }Copy the code

If pingIntervalMillis is set to 0, the heartbeat executor will not execute. 2 Executor was also responsible for the scheduled task of heartbeat packets

Let’s see what pingrunnable has to offer:

  private final class PingRunnable implements Runnable {
    PingRunnable() {
    }

    @Override public void run() {
      writePingFrame();
    }
  }

  void writePingFrame() {
    WebSocketWriter writer;
    synchronized (this) {
      if (failed) return; writer = this.writer; } try { writer.writePing(ByteString.EMPTY); } catch (IOException e) { failWebSocket(e, null); }}Copy the code

A runnable calls Writer’s writePing method. After all, writer is needed to send messages, so it is not enough for Writer to have these methods. Writer writer writer writer writer writer

/** Send a ping with the supplied {@code payload}. */ void writePing(ByteString payload) throws IOException { synchronized (this) { writeControlFrameSynchronized(OPCODE_CONTROL_PING, payload); } } /** Send a pong with the supplied {@code payload}. */ void writePong(ByteString payload) throws IOException { synchronized (this) { writeControlFrameSynchronized(OPCODE_CONTROL_PONG, payload); }}Copy the code

By the way, on at Just below a method of transmitting pong, analyze: 1 into the payload is ByteString refs. EMPTY is an EMPTY byte, 2 writeControlFrameSynchronized are ultimately the same way, 3 for the distinction between news: Rely on writeControlFrameSynchronized first into the opcode, 4 writeControlFrameSynchronized this method although there is no comment but Since writing news need to call this method, Compared with this method, writer’s strength is as follows:

  private void writeControlFrameSynchronized(int opcode, ByteString payload) throws IOException {
    assert Thread.holdsLock(this);

    if (writerClosed) throw new IOException("closed");

    int length = payload.size();
    if (length > PAYLOAD_BYTE_MAX) {
      throw new IllegalArgumentException(
          "Payload size must be less than or equal to " + PAYLOAD_BYTE_MAX);
    }

    int b0 = B0_FLAG_FIN | opcode;
    sink.writeByte(b0);

    int b1 = length;
    if (isClient) {
      b1 |= B1_FLAG_MASK;
      sink.writeByte(b1);

      random.nextBytes(maskKey);
      sink.write(maskKey);

      byte[] bytes = payload.toByteArray();
      toggleMask(bytes, bytes.length, maskKey, 0);
      sink.write(bytes);
    } else {
      sink.writeByte(b1);
      sink.write(payload);
    }

    sink.flush();
  }
Copy the code

The operation is too 6, which means that the function can be understood roughly, and it has been written into this sink!!

Here’s the question: What is a sink?

/** and its system must be guarded by its synchronizing'this'. */
  final BufferedSink sink;
Copy the code

This is not explained, but it is reminded that writing sink must be in synchronizing so THAT I understand why the methods of ping and Pong are locked.

Sink, sink, sink, what the hell? Don’t understand… I’ll go with BufferedSink instead:

  • A sink that keeps a buffer internally so that callers can do small writes
  • A receiver that preserves buffers internally so that callers can perform small write operations.
  • without a performance penalty.

Static final long PAYLOAD_BYTE_MAX = 125L; static final long PAYLOAD_BYTE_MAX = 125L; static final long PAYLOAD_BYTE_MAX = 125L;

It’s an interface but it gives us enough useful information to see how this BufferedSink is implemented at creation time, back to where Writer created it originally:

  this.writer = new WebSocketWriter(streams.client, streams.sink, random);
Copy the code

Oh? Obtained from Stream at initialization time. Look up how the stream was created: a Call is returned when the link is successful:

@Override public void onResponse(Call call, Response Response) // Promote the HTTP streams into Web socket streams. // Promote the HTTP streams to initialize the socket streams streamAllocation = Internal.instance.streamAllocation(call); // Prevent connection pooling! / / to prevent the connection sharing streamAllocation. NoNewStreams (); / / create the Stream Streams Streams = streamAllocation. Connection () newWebSocketStreams (streamAllocation);Copy the code

It seems that all the answers lie in RealConnection’s newWebSockerStreams:

 public RealWebSocket.Streams newWebSocketStreams(final StreamAllocation streamAllocation) {
    return new RealWebSocket.Streams(true.source, sink) {
      @Override public void close() throws IOException {
        streamAllocation.streamFinished(true, streamAllocation.codec()); }}; }Copy the code

New RealWebSocket.Streams(true, Source, sink) Sink is given in this way. Let me recall that RealConnection is quite familiar, when was it created? Let me take a closer look at it…