Okio is one of Square’s open source frameworks that complements java.io and java.nio to make access, storage, and data processing easier. It was originally one of the Okhttp components.

1. ByteString and Buffer

Okio is built around two classes, ByteString and Buffer, which encapsulate most of its functionality:

  • ByteString: is a similarStringImmutable class, which can be easily implemented inbytewithStringTo convert between. This class provides encoding/decoding methods for HEX, MD5, Base64, and UTF-8.
  • Buffer: is a mutable sequence of bytes. withArrayListAgain, there is no need to resize the buffer beforehand.BufferA bidirectional linked list is maintained internally, with data written from the tail and read from the head.

ByteString and Buffer do something to save CPU and memory. If you encode a String as ByteString, ByteString caches references to that String (space for time), so that if you encode/decode it later, you don’t need to convert between byte and String.

  // The byte data corresponding to the string, avoiding another conversion
  final byte[] data;
  / / string
  transient String utf8; // Lazily computed.
Copy the code

The Buffer internally maintains a bidirectional linked list with segments as nodes. When data is moved from one Buffer to another, only one copy of the data is made, and it reassigns ownership of the Segment rather than recreating the Segment object.

2. Source and Sink

Okio contains its own stream types, called Source and Sink, which work like InputStream and OutputStream, but have the following advantages over Java I/O (see Android learning Note Okio) :

  • OkioImplement I/O read and write timeout mechanism (Timeout) to prevent read/write errors from blocking.
  • N one,OKioSimplified the number of classes for input and output streams
  • Low CPU and memory consumption, introducedSegmentandSegmentPoolReuse mechanism
  • Easy to use.ByteStringDealing with the samebyte.BufferProcess variablebyte.
  • A range of tools are provided.OKioSupports data processing such as MD5, SHA, and Base64

Source and Sink can interoperate with InputStream and OutputStream. We can treat any Source as an InputStream or any InputStream as a Source. The same applies to Sink and InputStream.

3. Okio data reading and writing process

With a brief introduction to Okio, let’s look at how to use it.

    // Okio implements image copying
    public void copyImage(File sinkFile, File sourceFile) throws IOException {
        // The code inside the try is standard Okio writing and cannot be changed
        try (Sink sink = Okio.sink(sinkFile);
             BufferedSink bufferedSink = Okio.buffer(sink);
             // Read data from a file
             Source source = Okio.source(sourceFile);
             BufferedSource bufferedSource = Okio.buffer(source)) {
            // Copy the image
            bufferedSink.write(bufferedSource.readByteArray());
            // Set the timeout period to 1 second,
            sink.timeout().deadline(1, TimeUnit.SECONDS);
            // Write data, write string in UTF-8 format. Okio handles utF-8 specifically
            bufferedSink.writeUtf8(entry.getKey())
                     .writeUtf8("=")
                     .writeUtf8(entry.getValue())
                     .writeUtf8("\n");
            // Read data
            String str=bufferedSource.readUtf8();
            // Reads data and returns a ByteStringByteStringstr=bufferedSource.readByteString(); }}Copy the code

As mentioned earlier, Okio is very easy to use. Because Java strings are encoded in UTF-16, and most development uses UTF-8, Okio makes a special case for string encoding.

3.1 Okio read data principle analysis

Source means water, which corresponds to the input stream, and a Source object is obtained in Okio using the okio. Source method.

  // There are quite a few methods for source overloading in the Okio class
  public static Source source(File file) throws FileNotFoundException {
    if (file == null) throw new IllegalArgumentException("file == null");
    return source(new FileInputStream(file));
  }
  public static Source source(InputStream in) {
    return source(in, new Timeout());
  }
  private static Source source(final InputStream in, final Timeout timeout) {...// This is where the data is actually read
    return new Source() {
      @Override public long read(Buffer sink, long byteCount) throws IOException {...try {
          // Each time data is written, check whether timeout is set first. By default, timeout is not set
          timeout.throwIfReached();
          // Get the last node of the list
          Segment tail = sink.writableSegment(1);
          // Since the SIZE of each Segment is 8KB, each copy cannot exceed this value
          int maxToCopy = (int) Math.min(byteCount, Segment.SIZE - tail.limit);
          // Read data through InputStream
          int bytesRead = in.read(tail.data, tail.limit, maxToCopy);
          // Data has been read
          if (bytesRead == -1) return -1;
          // The writable position is moved backwards
          tail.limit += bytesRead;
          // Total bytes read
          sink.size += bytesRead;
          // Returns the number of bytes currently read
          return bytesRead;
        } catch(AssertionError e) { ... }}... }; }Copy the code

As you can see, the Source of this is an anonymous object. Once you have the Source object, you pass it to BufferedSource through the okio.buffer method. BufferedSource is an interface whose implementation class is RealBufferedSource.

In the example above, readByteArray method of RealBufferedSource is called to read the data. Let’s see how this method is implemented.

  //RealBufferedSource corresponding Buffer
  public final Buffer buffer = new Buffer();
  @Override public byte[] readByteArray() throws IOException {
    // Write data to buffer
    buffer.writeAll(source);
    // Return all data as a byte array
    return buffer.readByteArray();
  }
Copy the code

In the readByteArray method, data is first written to Buffer and a bidirectional linked list is generated.

  @Override public long writeAll(Source source) throws IOException {
    if (source == null) throw new IllegalArgumentException("source == null");
    long totalBytesRead = 0;
    // Source is the anonymous source object created in Okio earlier
    for (long readCount; (readCount = source.read(this, Segment.SIZE)) ! = -1;) { totalBytesRead += readCount; }return totalBytesRead;
  }
Copy the code

After writing data to Buffer, Buffer’s readByteArray method is called to generate an array of bytes and return it.

    @Override
    public byte[] readByteArray() {
        try {
            // When reading data, we get the size of size
            return readByteArray(size);
        } catch (EOFException e) {
            throw newAssertionError(e); }}@Override
    public byte[] readByteArray(long byteCount) throws EOFException {
        checkOffsetAndCount(size, 0, byteCount); .// Create a byte array of size
        byte[] result = new byte[(int) byteCount];
        // Write the read data to the array
        readFully(result);
        return result;
    }
    @Override
    public void readFully(byte[] sink) throws EOFException {
        int offset = 0;
        while (offset < sink.length) {
            // Write data to sink array continuously
            int read = read(sink, offset, sink.length - offset);
            if (read == -1) throw newEOFException(); offset += read; }}@Override
    public int read(byte[] sink, int offset, int byteCount) {
        checkOffsetAndCount(sink.length, offset, byteCount);

        Segment s = head;
        if (s == null) return -1;
        int toCopy = Math.min(byteCount, s.limit - s.pos);
        // Copy the data
        System.arraycopy(s.data, s.pos, sink, offset, toCopy);

        s.pos += toCopy;
        size -= toCopy;
        // Release Segment and put it into buffer pool
        if (s.pos == s.limit) {
            head = s.pop();
            SegmentPool.recycle(s);
        }

        return toCopy;
    }
Copy the code

This writes the data to a new array and reinitializes and pools all segments from the list.

3.2 Okio write data principle analysis

Sink means Sink, and it corresponds to the output stream. Get a sink object with okio. sink.

  public static Sink sink(File file) throws FileNotFoundException {
    if (file == null) throw new IllegalArgumentException("file == null");
    return sink(new FileOutputStream(file));
  }
  public static Sink sink(OutputStream out) {
    return sink(out, new Timeout());
  }

  private static Sink sink(final OutputStream out, final Timeout timeout) {...// Create an anonymous Sink object
    return new Sink() {
      @Override public void write(Buffer source, long byteCount) throws IOException {
        checkOffsetAndCount(source.size, 0, byteCount);
        // Write data
        while (byteCount > 0) {
          // Each time data is written, check whether timeout is set first. By default, timeout is not set
          timeout.throwIfReached();
          // Get the header
          Segment head = source.head;
          // The smallest byte that can be copied
          int toCopy = (int) Math.min(byteCount, head.limit - head.pos);
          // Write data via OutputStream
          out.write(head.data, head.pos, toCopy);
          // The readable position is moved backwards
          head.pos += toCopy;
          // Reduce the number of bytes that can be written
          byteCount -= toCopy;
          // Reduce the number of bytes in the buffer
          source.size -= toCopy;
          // Reach the maximum writable position
          if (head.pos == head.limit) {
            // Release the nodesource.head = head.pop(); SegmentPool.recycle(head); }}}... }; }Copy the code

After obtaining the Sink object, the object is passed to BufferedSink. BufferedSink is an interface whose concrete implementation is RealBufferedSink.

  public static BufferedSink buffer(Sink sink) {
    return new RealBufferedSink(sink);
  }
Copy the code

In Section 3.1, we looked at reading data through InputStream and returning an array of bytes. Here we write this array to the new file using the Write method of RealBufferedSink.

  @Override public BufferedSink write(byte[] source) throws IOException {
    if (closed) throw new IllegalStateException("closed");
    buffer.write(source);
    return emitCompleteSegments();
  }
Copy the code

Writing data is basically the same as reading data. The data needs to be written to Buffer first.

    @Override
    public Buffer write(byte[] source) {
        if (source == null) throw new IllegalArgumentException("source == null");
        return write(source, 0, source.length);
    }
    @Override
    public Buffer write(byte[] source, int offset, int byteCount) {...int limit = offset + byteCount;
        while (offset < limit) {
            Segment tail = writableSegment(1);

            int toCopy = Math.min(limit - offset, Segment.SIZE - tail.limit);
            // Copy the data
            System.arraycopy(source, offset, tail.data, tail.limit, toCopy);

            offset += toCopy;
            tail.limit += toCopy;
        }

        size += byteCount;
        return this;
    }
Copy the code

As mentioned above, Buffer maintains a linked list, so it writes data to a linked list. Since the Segment object is re-initialized and pooled after the data is read, it does not need to create a new Segment object. After writing Buffer successfully, the emitCompleteSegments method is called, which writes data from Buffer to a new file.

  @Override public BufferedSink emitCompleteSegments(a) throws IOException {
    if (closed) throw new IllegalStateException("closed");
    long byteCount = buffer.completeSegmentByteCount();
    if (byteCount > 0) sink.write(buffer, byteCount);
    return this;
  }
Copy the code

Sink is the anonymous object created in Okio where data is written to the new file via OutputStream.

The overall process is as follows.

4. Segment and SegmentPool

Segment is a very important part of Okio. It can be said that it is the carrier of data in Buffer. The size is 8KB and the head is head.

final class Segment {
  // The maximum size of the Segment is 8KB
  static final int SIZE = 8192;

  // If the number of bytes in the Segment is greater than SHARE_MINIMUM, the Segment can be shared and cannot be added to the SegmentPool
  static final int SHARE_MINIMUM = 1024;
  // The stored data
  final byte[] data;

  // Start position of the next read
  int pos;

 // Start position of write
  int limit;

  // Whether the current Segment can be shared
  boolean shared;

  //data specifies whether the Segment is unique to the current Segment
  boolean owner;

  // Subsequent nodes
  Segment next;

  // The precursor nodeSegment prev; .// Remove the current Segment
  public final @Nullable Segment pop(a) { Segment result = next ! =this ? next : null;
    prev.next = next;
    next.prev = prev;
    next = null;
    prev = null;
    return result;
  }

  // Add a new node after the current one
  public final Segment push(Segment segment) {
    segment.prev = this;
    segment.next = next;
    next.prev = segment;
    next = segment;
    return segment;
  }

  // Split the current Segment into two segments. Pos ~limit = pos+byteCount pos~limit = pos+byteCount pos~limit = pos+byteCount
  public final Segment split(int byteCount) {
    if (byteCount <= 0 || byteCount > limit - pos) throw new IllegalArgumentException();
    Segment prefix;

    // If the number of bytes is greater than SHARE_MINIMUM, split it into shared nodes
    if (byteCount >= SHARE_MINIMUM) {
      prefix = sharedCopy();
    } else {
      prefix = SegmentPool.take();
      System.arraycopy(data, pos, prefix.data, 0, byteCount);
    }

    prefix.limit = prefix.pos + byteCount;
    pos += byteCount;
    prev.push(prefix);
    return prefix;
  }

  // The current Segment and prev precursors are merged into one Segment, and then the current Segment is removed from the bidirectional linked list and added to the SegmentPool for reuse. The sum of the two segments does not exceed 8K. Pos and limit may be moved after the merger
  public final void compact(a) {
    if (prev == this) throw new IllegalStateException();
    if(! prev.owner)return; // Cannot compact: prev isn't writable.
    int byteCount = limit - pos;
    int availableByteCount = SIZE - prev.limit + (prev.shared ? 0 : prev.pos);
    if (byteCount > availableByteCount) return; // Cannot compact: not enough writable space.
    writeTo(prev, byteCount);
    pop();
    SegmentPool.recycle(this);
  }

  // Move byteCount bytes from the current node to sink
  public final void writeTo(Segment sink, int byteCount) {
    if(! sink.owner)throw new IllegalArgumentException();
    if (sink.limit + byteCount > SIZE) {
      // We can't fit byteCount bytes at the sink's current position. Shift sink first.
      if (sink.shared) throw new IllegalArgumentException();
      if (sink.limit + byteCount - sink.pos > SIZE) throw new IllegalArgumentException();
      System.arraycopy(sink.data, sink.pos, sink.data, 0, sink.limit - sink.pos);
      sink.limit -= sink.pos;
      sink.pos = 0; } System.arraycopy(data, pos, sink.data, sink.limit, byteCount); sink.limit += byteCount; pos += byteCount; }}Copy the code

A SegmentPool is a Segment pool that maintains a unidirectional linked list of segments with a capacity of 64kb (8 segments).

final class SegmentPool {
    // Maximum capacity of SegmentPool
    static final long MAX_SIZE = 64 * 1024; // 64 KiB.

    // Subsequent nodes
    static Segment next;

    // Total number of bytes in the current pool
    static long byteCount;

    private SegmentPool(a) {}// Get a Segment object from the pool
    static Segment take(a) {
        synchronized (SegmentPool.class) {
            if(next ! =null) {
                Segment result = next;
                next = result.next;
                result.next = null;
                byteCount -= Segment.SIZE;
                returnresult; }}return new Segment(); // Pool is empty. Don't zero-fill while holding a lock.
    }
    // Initialize the Segment state and pool it
    static void recycle(Segment segment) {
        if(segment.next ! =null|| segment.prev ! =null) throw new IllegalArgumentException();
        if (segment.shared) return; // This segment cannot be recycled.
        synchronized (SegmentPool.class) {
            if (byteCount + Segment.SIZE > MAX_SIZE) return; // Pool is full.
            byteCount += Segment.SIZE;
            segment.next = next;
            segment.pos = segment.limit = 0; next = segment; }}}Copy the code

When reading data from InputStream, the read data is written to a bidirectional list with Segment as a node. If the Segment size is insufficient (greater than 8KB), a Segment object is taken from the SegmentPool and added to the end of the bidirectional list.

When OutputStrem is used to write data, the data will be read from the head node of the bidirectional linked list. When the data in the Segment is read, it will be removed from the bidirectional linked list and recycled to the SegmentPool for reuse.

5. Timeout mechanism

One of Okio’s highlights is the addition of a timeout mechanism, which prevents I/O from blocking all the time due to accidents. The default timeout mechanism is synchronous. AsyncTimeout is the implementation of the asynchronous timeout mechanism in Okio. It is a single linked list, and the nodes are sorted according to the waiting time from smallest to largest. Head is a head node and acts as a placeholder. A WatchDog background thread is used to iterate over all nodes, and if a node times out it is removed from the list and the Socket is closed.

AsyncTimeout provides three methods enter, exit, and timeout, which are respectively invoked when a flow operation starts, ends, and times out.

public class AsyncTimeout extends Timeout {
    // the header is used as a placeholder
    static
    AsyncTimeout head;

    // Whether it is in the linked list
    private boolean inQueue;

    // Subsequent nodes
    private
    AsyncTimeout next;

    // The timeout period
    private long timeoutAt;
    // Add the current AsyncTimeout object to the node
    public final void enter(a) {... scheduleTimeout(this, timeoutNanos, hasDeadline);
    }

    private static synchronized void scheduleTimeout(
            AsyncTimeout node, long timeoutNanos, boolean hasDeadline) {
        // Create placeholder headers and start child threads
        if (head == null) {
            head = new AsyncTimeout();
            newWatchdog().start(); }...// Insert into the linked list, sort by the length of time, wait longer events later
        for (AsyncTimeout prev = head; true; prev = prev.next) {
            if (prev.next == null || remainingNanos < prev.next.remainingNanos(now)) {
                node.next = prev.next;
                prev.next = node;
                if (prev == head) {
                    AsyncTimeout.class.notify(); // Wake up the watchdog when inserting at the front.
                }
                break; }}}// Remove the node from the list
    public final boolean exit(a) {
        if(! inQueue)return false;
        inQueue = false;
        return cancelScheduledTimeout(this);
    }

    // Perform the actual remove operation
    private static synchronized boolean cancelScheduledTimeout(AsyncTimeout node) {
        // Remove the node from the linked list.
        for(AsyncTimeout prev = head; prev ! =null; prev = prev.next) {
            if (prev.next == node) {
                prev.next = node.next;
                node.next = null;
                return false; }}// The node wasn't found in the linked list: it must have timed out!
        return true;
    }

    // Override this method in a subclass to close the socket
    protected void timedOut(a) {}// Listen for the child thread of the node to timeout
    private static final class Watchdog extends Thread {
        Watchdog() {
            super("Okio Watchdog");
            setDaemon(true);
        }

        public void run(a) {
            while (true) {
                try {
                    AsyncTimeout timedOut;
                    synchronized (AsyncTimeout.class) {
                        timedOut = awaitTimeout();
                        // Indicates that the successor node of the header has timed out,
                        if (timedOut == null) continue;
                        // There is no other node except the header
                        if (timedOut == head) {
                            head = null;
                            return; }}/ / close the socket
                    timedOut.timedOut();
                } catch (InterruptedException ignored) {
                }
            }
        }
    }

    
    static AsyncTimeout awaitTimeout(a) throws InterruptedException {
        AsyncTimeout node = head.next;
        // There is no other node except the header
        if (node == null) {
            long startNanos = System.nanoTime();
            AsyncTimeout.class.wait(IDLE_TIMEOUT_MILLIS);
            return head.next == null && (System.nanoTime() - startNanos) >= IDLE_TIMEOUT_NANOS
                    ? head  // The idle timeout elapsed.
                    : null; // The situation has changed.
        }

        long waitNanos = node.remainingNanos(System.nanoTime());

        // Wait
        if (waitNanos > 0) {
            / / wait for
            long waitMillis = waitNanos / 1000000L;
            waitNanos -= (waitMillis * 1000000L);
            AsyncTimeout.class.wait(waitMillis, (int) waitNanos);
            return null;
        }

        // Indicates that the node has timed out
        head.next = node.next;
        node.next = null;
        returnnode; }}Copy the code

The default timeout is not set, we need to set our own, synchronous and asynchronous timeout Settings are the same, through the following code.

    sink.timeout().deadline(1, TimeUnit.SECONDS);
    source.timeout().deadline(1,TimeUnit.MILLISECONDS);
Copy the code
6. Producer/consumer model

Pipe can be used in Okio to implement a producer/consumer model. Pipe maintains a Buffer of some size. When the Buffer reaches its maximum size, the thread waits until the Buffer is free.

public final class Pipe {
  // The maximum capacity of Pipe
  final long maxBufferSize;
  //Pipe corresponds to Buffer
  final Buffer buffer = new Buffer();
  boolean sinkClosed;
  boolean sourceClosed;
  // Write to the stream, corresponding to the producer
  private final Sink sink = new PipeSink();
  // Read the stream, corresponding to the consumer
  private final Source source = new PipeSource();

  public Pipe(long maxBufferSize) {
    // The maximum capacity cannot be less than 1
    if (maxBufferSize < 1L) {
      throw new IllegalArgumentException("maxBufferSize < 1: " + maxBufferSize);
    }
    this.maxBufferSize = maxBufferSize; }...// Write data to the Pipe
  final class PipeSink implements Sink {
    final Timeout timeout = new Timeout();

    @Override public void write(Buffer source, long byteCount) throws IOException {
      synchronized (buffer) {
        ...

        while (byteCount > 0) {...long bufferSpaceAvailable = maxBufferSize - buffer.size();
          if (bufferSpaceAvailable == 0) {
            // In the buffer, there is no free space waiting for consumers to consume
            timeout.waitUntilNotified(buffer); // Wait until the source drains the buffer.
            continue;
          }

          long bytesToWrite = Math.min(bufferSpaceAvailable, byteCount);
          buffer.write(source, bytesToWrite);
          byteCount -= bytesToWrite;
          // Notify buffer that there is new data,
          buffer.notifyAll(); // Notify the source that it can resume reading.}}}... }// Read data from Pipe
  final class PipeSource implements Source {
    final Timeout timeout = new Timeout();

    @Override public long read(Buffer sink, long byteCount) throws IOException {
      synchronized (buffer) {
        ...

        while (buffer.size() == 0) {
          if (sinkClosed) return -1L;
          //Pipe has no data waiting to be written by the producer
          timeout.waitUntilNotified(buffer); // Wait until the sink fills the buffer.
        }

        long result = buffer.read(sink, byteCount);
        buffer.notifyAll(); // Notify the sink that it can resume writing.
        returnresult; }}... }}Copy the code

The Pipe code is relatively small. Here’s how to use Pipe.

    public void pipe(a) throws IOException {
        // Set the capacity of Pipe to 1024 bytes, i.e. 1KB
        Pipe pipe = new Pipe(1024);
        new Thread(new Runnable() {
            @Override
            public void run(a) {
                try (BufferedSource bufferedSource = Okio.buffer(pipe.source())) {
                    // Write the data in Pipe to env4. TXT
                    bufferedSource.readAll(Okio.sink(new File("file/env4.txt")));
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }).start();

        new Thread(new Runnable() {
            @Override
            public void run(a) {
                try (BufferedSink bufferedSink = Okio.buffer(pipe.sink())) {
                    // Write data from env3.txt to Pipe
                    bufferedSink.writeAll(Okio.source(new File("file/env3.txt")));
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }
Copy the code
7,

I haven’t covered all of Okio’s features (such as GZip), but I think I’ve got a pretty good idea of Okio. This will be very helpful in using Okio proficiently in future development. It is important to note that while Okio works well, it is packaged and optimized over Java I/O and NIO and does not have non-blocking I/O features. For non-blocking I/O check out netty’s library.

【 References 】

Dismantling wheel series: Dismantling Okio

OkHttp Okio source code analysis (three) Okio read and write process comb

Android Learning Notes — Okio

Deep understanding of Okio’s optimization ideas